阿里云数加平台开荒实践


阿里云数加平台开荒实践,利用MaxCompute和数加的数据集成实现增量ETL并进行TopN统计。

阿里云数加平台开荒实践

写在前面

文章写于20180317,关于阿里云数加的问题属于此前版本,之后迭代视情况更新此文。

需求分析

需求来源

输入

MySQL存在数据如下:

USER_ID(人员id) ADDRESS_ID(所属社区) TALK_AT(谈话日期) TALKER_NAME(谈话人) CREATED_AT(创建日期)
9401 520001001006056 2015-06-08 00:00:00 王应 2016-05-05 17:49:29
6920 520001001001003 2015-12-10 00:00:00 唐莹 2016-04-23 16:31:57
3420 520001001027154 2015-09-12 00:00:00 张鹏,卢锋 2016-04-05 15:39:52
5854 520001001002013 2015-09-05 00:00:00 普桂兰 2016-04-20 12:48:05
6991 520001010006048 2016-02-25 00:00:00 张丽 2016-04-14 14:20:19
110080 520001003005042 2016-09-07 00:00:00 李荣昌 2017-07-03 16:31:06

建表语句:

1
2
3
4
5
6
7
CREATE TABLE `TALK_RECORDS` (
`USER_ID` varchar(32) DEFAULT NULL,
`ADDRESS_ID` varchar(32) DEFAULT NULL,
`TALK_AT` datetime DEFAULT NULL,
`TALKER_NAME` varchar(32) DEFAULT NULL,
`CREATED_AT` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

其中TALKER_NAME比较特殊,存在使用中文分隔的字符串。

MySQL的数据每日都在递增,根据CREATED_AT来标明数据创建日期。

输出

在mongodb 中写入如下数据:

  • 详细数据

    其特点在于TALKER_NAME会通过逗号被拆分成多行,并且在mongo的形式为{"data":object}形式。(这里先提一下,数加平台的ETL不支持mongo的object类型)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {
    "data": {
    "USER_ID": "xx",
    "ADDRESS_ID": "xx",
    "TALK_AT": "xx",
    "TALKER_NAME": "xx",
    "CREATED_AT": "xx"
    }
    }
  • 统计数据

    统计数据要求,计算聊天最多的TALKER_NAME的前10个,即TopN算法。其特点在于需要统计聊天的详细数据,即和每个USER_ID的聊天次数。如下:

    1
    2
    3
    4
    5
    6
    7
    8
    {
    "talker_name": "xxx", // 谈话人姓名
    "talk_count": 123, // 谈话总数
    "info": {
    "$userid": $userid_talk_count, // key 是 USER_ID 的值,value 是 这个 USER_ID 的谈话次数
    "$userid": $userid_talk_count
    }
    }

输出限制

  • 自动化任务
  • 每天增量提取并计算

踩坑

  • 数加不提供mongo-hadoop类似的工具,所以需要自己实现
  • MaxCompute,即MR,在数加MR里的每个Mapper创建mongo connection,然后写入。
    • 分布式计算创建connection,mongo面临巨大压力
    • 数加MR存在Java沙箱限制,不允许在MR中访问网络。
    • 终上所述,此做法不可取。
  • MaxCompute Studio(MS)的Console和DataWorks(DW)的IDE不一致
    • MS上传文件只能上传至根目录,但是DW却可以创建文件夹;
    • MS可以上传很大的文件和任意的文件名,但DW只能上传10M的文件,并且文件不能包含横杠-
    • MS上传的资源在DW不可见;DW上传的文件根目录对MS可见。
    • DW不能引用那些不可见的文件。
    • 这里非常坑,两边数据完全不共享!!!
  • 数据集成对于mongodb不支持object类型

Max Compute开发

环境初始化

odps有3种任务类型](https://help.aliyun.com/document_detail/50129.html?spm=a2c4g.11186623.2.4.7SUpbF)
分别为:OPEN_MR 任务、ODPS_MR 任务和ODPS_SQL 任务。
ODPS_SQL 经分析split函数并不能简单完成需求,所以使用ODPS_MR(官方推荐) 。

以ODPS_MR 为例:
数据开发->新建任务
安装MaxCompute Studio,然后new Project的时候,选择MaxCompute Studio;再new Module,选择MaxCompute Java。
然后通过MaxCompute Studio连接阿里云数加平台方便开发。
通过odps_config.ini初始化Account和配置信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
project_name=
access_id=<accessid>
access_key=<accesskey>
end_point=http://service.odps.aliyun.com/api
log_view_host=http://logview.odps.aliyun.com
https_check=true
# confirm threshold for query input size(unit: GB)
data_size_confirm=100.0
# this url is for odpscmd update
update_url=http://repo.aliyun.com/odpscmd
# download sql results by instance tunnel
use_instance_tunnel=true
# the max records when download sql results by instance tunnel
instance_tunnel_max_record=10000
# IMPORTANT:
# If leaving tunnel_endpoint untouched, console will try to automatically get one from odps service, which might charge networking fees in some cases.
# Please refer to https://help.aliyun.com/document_detail/34951.html
# tunnel_endpoint=

MR任务运行

数加平台的MR,在页面不支持上传超过10M的程序,同时对名字也限制了横杠-,要知道Maven的jar大部分都有横杠的,需要替换为下划线。
询问工程师,建议通过客户端进行资源管理。但是注意:目前客户端和DataWorks的数据是不共享的!!!
客户端安装
资源操作命令
当然,如果安装了MaxCompute Studio是不需要安装客户端的,直接通过odps_config.ini登陆后,直接打开Console操作客户端。

工作流任务开发

创建工作流任务,人物开发-》新建-》新建任务,选择工作流任务即可。

MySQL数据集成至odps

目的

目的是将TALK_RECORDS的数据复制至odps表,因为数加MR只支持odps的数据源。

创建数据源

数据集成->数据源->新增数据源

  • 创建MySQL数据源,这个数据源便是数据来源,即TALK_RECORDS表。

  • 创建odps数据源,表结构和MySQL一致。

    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE odps_talk_records (
    user_id STRING COMMENT '人员id',
    address_id STRING COMMENT '所属社区',
    talk_at DATETIME COMMENT '谈话日期',
    talker_name STRING COMMENT '谈话人',
    created_at DATETIME COMMENT '创建日期'
    )
    LIFECYCLE 100000;

数据集成

  • 之后可以通过数据集成,配置数据同步(可以是向导或者脚本模式)

    需要注意的是,采用truncate模式,即每次同步都会清空原数据。并且增量集成。

    脚本如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    {
    "configuration": {
    "reader": {
    "plugin": "mysql",
    "parameter": {
    "datasource": "paleo_etl_test",
    "column": [
    "user_id",
    "address_id",
    "talk_at",
    "talker_name",
    "created_at"
    ],
    "where": "CREATED_AT>=DATE_SUB(curdate(),INTERVAL 1 DAY) and CREATED_AT<curdate()",
    "connection": [
    {
    "datasource": "paleo_etl_test",
    "table": [
    "TALK_RECORDS"
    ]
    }
    ],
    "splitPk": "",
    "connectionTable": "TALK_RECORDS"
    }
    },
    "writer": {
    "plugin": "odps",
    "parameter": {
    "partition": "",
    "truncate": true,
    "datasource": "paleo_odps_talk_records",
    "column": [
    "user_id",
    "address_id",
    "talk_at",
    "talker_name",
    "created_at"
    ],
    "table": "odps_talk_records"
    }
    },
    "setting": {
    "speed": {
    "concurrent": "1",
    "mbps": "1"
    }
    }
    },
    "type": "job",
    "version": "1.0"
    }

分行写入odps

目的

将第一步集成到odps的数据,根据TALKER_NAME的进行拆分,拆成多行数据,并写入新的odps(之后会基于这个表进行统计操作)

创建ODPS_MR任务

Max Compute的开发环境和工程都有描述,这里的目的是分行,那么只需要一个Map Only的MR就行了,关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Mapper
public class SplitRowsMapper extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
}
@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
String talker_name = record.getString("talker_name");
if(talker_name.contains(",")){
String[] talkerNames = talker_name.split(",");
for (String talkerName : talkerNames) {
Record tmpRecord = record.clone();
tmpRecord.setString("talker_name",talkerName);
context.write(tmpRecord);
}
}else{
context.write(record);
}
}
@Override
public void cleanup(TaskContext context) throws IOException {
}
}
// 主函数
public class SplitRowsMain {
public static void main(String[] args) throws OdpsException {
AppConfig appConfig = AppConfigInit.getConf();
JobConf job = new JobConf();
InputUtils.addTable(TableInfo.builder().tableName(appConfig.getTableConf().getIn()).build(), job);
OutputUtils.addTable(TableInfo.builder().tableName(appConfig.getTableConf().getOut()).build(), job);
job.setMapperClass(SplitRowsMapper.class);
job.setNumReduceTasks(0); //Map Only Task
//非覆盖模式
job.setOutputOverwrite(false);
RunningJob rj = JobClient.runJob(job);
rj.waitForCompletion();
}
}

通过以上代码,便可以将第一步的odps表数据,分行输出到新的odps表里面去了,并且使用的是追加模式。

创建一个ODPS_MR任务,并填入如下运行脚本:

1
2
--@resource_reference{"splitrows.jar"}
jar -libjars splitrows.jar -classpath splitrows.jar com.maxplus1.main.SplitRowsMain;

需要注意的是:

splitrows.jar为数加MR打包的fat jar。并上传到DataWorks的资源管理处。

统计TopN

目的

目的就是需求,计算出Top10的结果,将结果写入odps,之后会同步到mongodb。

创建ODPS_SQL

注意是使用insert overwrite,每次都应覆写数据。此sql运行的结果便是根据talker_name,talk_count,user_id分组,计算出user_talk_count

1
2
3
4
5
6
insert overwrite table odps_topn
select t1.talker_name,t1.talk_count,t2.user_id,t2.user_talk_count,getdate() as create_at from (select talker_name,count(1) as talk_count from odps_talk_records_out group by talker_name order by talk_count desc limit 10) t1
inner join
(select talker_name,user_id,count(1) as user_talk_count from odps_talk_records_out group by talker_name,user_id) t2
on
t1.talker_name = t2.talker_name ;

分行结果数据导出mongo

目的

将结果由odps落地到常用的存储介质,便于其他程序读取使用。

创建数据集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
{
"configuration": {
"reader": {
"plugin": "odps",
"parameter": {
"partition": "",
"datasource": "paleo_odps_talk_records_out",
"column": [
"USER_ID",
"ADDRESS_ID",
"TALK_AT",
"TALKER_NAME",
"CREATED_AT"
],
"table": "odps_talk_records_out"
}
},
"writer": {
"plugin": "mongodb",
"parameter": {
"datasource": "paleo_mgdb",
"column": [
{
"name": "USER_ID",
"type": "string"
},
{
"name": "ADDRESS_ID",
"type": "string"
},
{
"name": "TALK_AT",
"type": "date"
},
{
"name": "TALKER_NAME",
"type": "string"
},
{
"name": "CREATED_AT",
"type": "date"
}
],
"writeMode": {
"isReplace": "false"
},
"collectionName": "drugRows"
}
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": "1",
"mbps": "1"
}
}
},
"type": "job",
"version": "1.0"
}

TopN统计结果导出mongo

目的

将结果由odps落地到常用的存储介质,便于其他程序读取使用。

最后一步会将此数据进行格式化,使其符合需求。

创建数据集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
{
"configuration": {
"reader": {
"plugin": "odps",
"parameter": {
"partition": "",
"datasource": "odps_first",
"column": [
"talker_name",
"talker_count",
"user_id",
"user_talker_count"
],
"table": "odps_topn"
}
},
"writer": {
"plugin": "mongodb",
"parameter": {
"datasource": "paleo_mgdb",
"column": [
{
"name": "talker_name",
"type": "string"
},
{
"name": "talker_count",
"type": "int"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "user_talker_count",
"type": "int"
}
],
"writeMode": {
"isReplace": "false"
},
"collectionName": "drugTopN"
}
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": "1",
"mbps": "1"
}
}
},
"type": "job",
"version": "1.0"
}

格式化TopN数据

目的

使数据符合需求。

Java程序

2个同步出来的odps表,都会进行相应处理,mongo里面也对应有3个集合:临时数据集合,格式化后的数据集合以及历史数据集合。

TopN涉及到一定的逻辑计算。

反正一切看代码吧!

关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@Service
public class ETLService implements IETLService {
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private MongoOperations mongoOperations;
@Value("${mongo.collection.splitRows}")
private String splitRows;
@Value("${mongo.collection.rowsHistory}")
private String rowsHistory;
@Value("${mongo.collection.rowsFormate}")
private String rowsFormate;
@Value("${mongo.collection.topN}")
private String topN;
@Value("${mongo.collection.topNHistory}")
private String topNHistory;
@Value("${mongo.collection.topNFormate}")
private String topNFormate;
@Value("${mongo.batch.size}")
private Integer batchSize;
@Override
public void formate2mongo() {
//读数据
DBCollection splitRowsCollection = mongoTemplate.getCollection(splitRows);
DBCollection rowsFormateCollection = mongoTemplate.getCollection(rowsFormate);
DBCollection rowsHistoryCollection = mongoTemplate.getCollection(rowsHistory);
List<DBObject> formateList = new ArrayList<>(batchSize);
List<DBObject> historyList = new ArrayList<>(batchSize);
splitRowsCollection.find().forEach(row->{
//格式化
DBObject dbObject = new BasicDBObject();
Map map = new HashMap();
row.keySet().forEach(key->{
map.put(key,row.get(key));
});
dbObject.put("data",map);
formateList.add(dbObject);
//写数据
if(formateList.size()>=batchSize){
rowsFormateCollection.insert(formateList);
formateList.clear();
}
//备份历史数据
historyList.add(row);
if(historyList.size()>=batchSize){
rowsHistoryCollection.insert(historyList);
historyList.clear();
}
});
//最后一轮循环的数据
if(formateList.size()>0){
rowsFormateCollection.insert(formateList);
formateList.clear();
}
if(historyList.size()>0){
rowsHistoryCollection.insert(historyList);
historyList.clear();
}
//删除当前数据
splitRowsCollection.drop();
//对于TopN重复以上流程
//读数据
DBCollection topNCollection = mongoTemplate.getCollection(topN);
DBCollection topNHistoryCollection = mongoTemplate.getCollection(topNHistory);
// DBCollection topNFormateCollection = mongoTemplate.getCollection(topNFormate);
// List<TopNBean> topNBeans = mongoOperations.find(null, TopNBean.class, topN);
Map<TopNKey,List<ChatInfo>> map = new HashMap<>();
topNCollection.find().forEach(row->{
//格式化
addTopNBean(map,row);
//备份历史数据
historyList.add(row);
if(historyList.size()>=batchSize){
topNHistoryCollection.insert(historyList);
historyList.clear();
}
});
//最后一轮循环的数据
if(historyList.size()>0){
topNHistoryCollection.insert(historyList);
historyList.clear();
}
//写数据
List<TopNBean> topNBeans = new ArrayList<>();
map.forEach((k,v)->{
TopNBean topNBean = new TopNBean();
topNBean.setTalkerName(k.getTalkerName());
topNBean.setTalkerCount(k.getTalkerCount());
topNBean.setInfo(v);
topNBeans.add(topNBean);
});
mongoOperations.insert(topNBeans,topNFormate);
//删除当前数据
topNCollection.drop();
}
private void addTopNBean(Map<TopNKey,List<ChatInfo>> map ,DBObject row){
TopNKey topNKey = new TopNKey();
topNKey.setTalkerName((String) row.get("talker_name"));
topNKey.setTalkerCount((Integer) row.get("talker_count"));
List<ChatInfo> list = null;
if(map.containsKey(topNKey)){
List<ChatInfo> chatInfos = map.get(topNKey);
ChatInfo chatInfo = new ChatInfo();
chatInfo.setUserId((String) row.get("user_id"));
chatInfo.setUserTalkerCount((Integer) row.get("user_talker_count"));
chatInfos.add(chatInfo);
}else {
list = new ArrayList<>();
ChatInfo chatInfo = new ChatInfo();
chatInfo.setUserId((String) row.get("user_id"));
chatInfo.setUserTalkerCount((Integer) row.get("user_talker_count"));
list.add(chatInfo);
map.put(topNKey,list);
}
}
}
@Data
public class TopNKey {
private String talkerName;
private Integer talkerCount;
}
@Data
@Document
public class TopNBean {
@Field("talker_name")
private String talkerName;
@Field("talker_count")
private Integer talkerCount;
@Field("info")
private List<ChatInfo> info;
}
@Data
public class TopNKey {
private String talkerName;
private Integer talkerCount;
}

创建Shell脚本

通过创建数加的Shell任务去调用上面的Java程序,Java程序通过rest暴露服务。

1
curl http://xxxxxxx/etl/formate2mongo

任务流程图

完成上面的步骤后,会得到一个工作流任务,然后通过配置调度任务,每日凌晨调度一次,即可统计昨天的数据了。
最终结果如下:
数加任务图