阿里云数加平台开荒实践,利用MaxCompute和数加的数据集成实现增量ETL并进行TopN统计。
阿里云数加平台开荒实践
写在前面
文章写于20180317,关于阿里云数加的问题属于此前版本,之后迭代视情况更新此文。
需求分析
输入
MySQL存在数据如下:
USER_ID(人员id) | ADDRESS_ID(所属社区) | TALK_AT(谈话日期) | TALKER_NAME(谈话人) | CREATED_AT(创建日期) |
---|---|---|---|---|
9401 | 520001001006056 | 2015-06-08 00:00:00 | 王应 | 2016-05-05 17:49:29 |
6920 | 520001001001003 | 2015-12-10 00:00:00 | 唐莹 | 2016-04-23 16:31:57 |
3420 | 520001001027154 | 2015-09-12 00:00:00 | 张鹏,卢锋 | 2016-04-05 15:39:52 |
5854 | 520001001002013 | 2015-09-05 00:00:00 | 普桂兰 | 2016-04-20 12:48:05 |
6991 | 520001010006048 | 2016-02-25 00:00:00 | 张丽 | 2016-04-14 14:20:19 |
110080 | 520001003005042 | 2016-09-07 00:00:00 | 李荣昌 | 2017-07-03 16:31:06 |
建表语句:
|
|
其中TALKER_NAME比较特殊,存在使用中文,
分隔的字符串。
MySQL的数据每日都在递增,根据CREATED_AT来标明数据创建日期。
输出
在mongodb 中写入如下数据:
详细数据
其特点在于TALKER_NAME会通过逗号
,
被拆分成多行,并且在mongo的形式为{"data":object}
形式。(这里先提一下,数加平台的ETL不支持mongo的object类型)123456789{"data": {"USER_ID": "xx","ADDRESS_ID": "xx","TALK_AT": "xx","TALKER_NAME": "xx","CREATED_AT": "xx"}}统计数据
统计数据要求,计算聊天最多的TALKER_NAME的前10个,即TopN算法。其特点在于需要统计聊天的详细数据,即和每个USER_ID的聊天次数。如下:
12345678{"talker_name": "xxx", // 谈话人姓名"talk_count": 123, // 谈话总数"info": {"$userid": $userid_talk_count, // key 是 USER_ID 的值,value 是 这个 USER_ID 的谈话次数"$userid": $userid_talk_count}}
输出限制
- 自动化任务
- 每天增量提取并计算
踩坑
- 数加不提供mongo-hadoop类似的工具,所以需要自己实现
- MaxCompute,即MR,在数加MR里的每个Mapper创建mongo connection,然后写入。
- 分布式计算创建connection,mongo面临巨大压力
- 数加MR存在Java沙箱限制,不允许在MR中访问网络。
- 终上所述,此做法不可取。
- MaxCompute Studio(MS)的Console和DataWorks(DW)的IDE不一致
- MS上传文件只能上传至根目录,但是DW却可以创建文件夹;
- MS可以上传很大的文件和任意的文件名,但DW只能上传10M的文件,并且文件不能包含横杠-
- MS上传的资源在DW不可见;DW上传的文件根目录对MS可见。
- DW不能引用那些不可见的文件。
- 这里非常坑,两边数据完全不共享!!!
- 数据集成对于mongodb不支持object类型
Max Compute开发
环境初始化
odps有3种任务类型](https://help.aliyun.com/document_detail/50129.html?spm=a2c4g.11186623.2.4.7SUpbF)
分别为:OPEN_MR 任务、ODPS_MR 任务和ODPS_SQL 任务。
ODPS_SQL 经分析split函数并不能简单完成需求,所以使用ODPS_MR(官方推荐) 。
以ODPS_MR 为例:
数据开发->新建任务
安装MaxCompute Studio,然后new Project的时候,选择MaxCompute Studio;再new Module,选择MaxCompute Java。
然后通过MaxCompute Studio连接阿里云数加平台方便开发。
通过odps_config.ini初始化Account和配置信息。
|
|
MR任务运行
数加平台的MR,在页面不支持上传超过10M的程序,同时对名字也限制了横杠-,要知道Maven的jar大部分都有横杠的,需要替换为下划线。
询问工程师,建议通过客户端进行资源管理。但是注意:目前客户端和DataWorks的数据是不共享的!!!
客户端安装
资源操作命令
当然,如果安装了MaxCompute Studio是不需要安装客户端的,直接通过odps_config.ini登陆后,直接打开Console操作客户端。
工作流任务开发
创建工作流任务,人物开发-》新建-》新建任务,选择工作流任务即可。
MySQL数据集成至odps
目的
目的是将TALK_RECORDS的数据复制至odps表,因为数加MR只支持odps的数据源。
创建数据源
创建MySQL数据源,这个数据源便是数据来源,即
TALK_RECORDS
表。创建odps数据源,表结构和MySQL一致。
12345678CREATE TABLE odps_talk_records (user_id STRING COMMENT '人员id',address_id STRING COMMENT '所属社区',talk_at DATETIME COMMENT '谈话日期',talker_name STRING COMMENT '谈话人',created_at DATETIME COMMENT '创建日期')LIFECYCLE 100000;
数据集成
之后可以通过数据集成,配置数据同步(可以是向导或者脚本模式)
需要注意的是,采用truncate模式,即每次同步都会清空原数据。并且增量集成。
脚本如下:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152{"configuration": {"reader": {"plugin": "mysql","parameter": {"datasource": "paleo_etl_test","column": ["user_id","address_id","talk_at","talker_name","created_at"],"where": "CREATED_AT>=DATE_SUB(curdate(),INTERVAL 1 DAY) and CREATED_AT<curdate()","connection": [{"datasource": "paleo_etl_test","table": ["TALK_RECORDS"]}],"splitPk": "","connectionTable": "TALK_RECORDS"}},"writer": {"plugin": "odps","parameter": {"partition": "","truncate": true,"datasource": "paleo_odps_talk_records","column": ["user_id","address_id","talk_at","talker_name","created_at"],"table": "odps_talk_records"}},"setting": {"speed": {"concurrent": "1","mbps": "1"}}},"type": "job","version": "1.0"}
分行写入odps
目的
将第一步集成到odps的数据,根据TALKER_NAME的,
进行拆分,拆成多行数据,并写入新的odps(之后会基于这个表进行统计操作)
创建ODPS_MR任务
Max Compute的开发环境和工程都有描述,这里的目的是分行,那么只需要一个Map Only的MR就行了,关键代码如下:
|
|
通过以上代码,便可以将第一步的odps表数据,分行输出到新的odps表里面去了,并且使用的是追加模式。
创建一个ODPS_MR任务,并填入如下运行脚本:
|
|
需要注意的是:
splitrows.jar为数加MR打包的fat jar。并上传到DataWorks的资源管理处。
统计TopN
目的
目的就是需求,计算出Top10的结果,将结果写入odps,之后会同步到mongodb。
创建ODPS_SQL
注意是使用insert overwrite
,每次都应覆写数据。此sql运行的结果便是根据talker_name
,talk_count
,user_id
分组,计算出user_talk_count
。
|
|
分行结果数据导出mongo
目的
将结果由odps落地到常用的存储介质,便于其他程序读取使用。
创建数据集成
|
|
TopN统计结果导出mongo
目的
将结果由odps落地到常用的存储介质,便于其他程序读取使用。
最后一步会将此数据进行格式化,使其符合需求。
创建数据集成
|
|
格式化TopN数据
目的
使数据符合需求。
Java程序
2个同步出来的odps表,都会进行相应处理,mongo里面也对应有3个集合:临时数据集合,格式化后的数据集合以及历史数据集合。
TopN涉及到一定的逻辑计算。
反正一切看代码吧!
关键代码如下:
|
|
创建Shell脚本
通过创建数加的Shell任务去调用上面的Java程序,Java程序通过rest暴露服务。
任务流程图
完成上面的步骤后,会得到一个工作流任务,然后通过配置调度任务,每日凌晨调度一次,即可统计昨天的数据了。
最终结果如下: