Mongo在DataWorks中的使用
一、query语句,制定数据同步规则。
同步2021-07-05一天的数据
"{\'createTime\':{\'$gt\':NumberLong(\'1625414400000\'),\'$lt\':NumberLong(\'1625500800000\')}}"
二、
您可以通过该配置型来限制返回MongoDB数据范围,仅支持时间类型。例如您可以配置 "query":"{\'operationTime\':{\'$gte\':ISODate(\'${last_day}T00:00:00.424+0800\')}}",限制返回operationTime大于等于${last_day}零点的数据。此处${last_day}为DataWorks调度参数,其中 last_day格式为 yyyy-mm-dd。您可以根据需要具体使用其它MongoDB支持的条件操作符号($gt、$lt、$gte和$lte等),逻辑操作符(and和or等),函数(max、min、sum、avg和ISODate等)。
三、
同步json脚本
1.官方案例
#官方案例 { "type":"job", "version":"2.0",//版本号。 "steps":[ { "category": "reader", "name": "Reader", "parameter": { "datasource": "datasourceName", //数据源名称。 "collectionName": "tag_data", //集合名称。 "query": "", // 数据查询过滤。 "column": [ { "name": "unique_id", //字段名称。 "type": "string" //字段类型。 }, { "name": "sid", "type": "string" }, { "name": "user_id", "type": "string" }, { "name": "auction_id", "type": "string" }, { "name": "content_type", "type": "string" }, { "name": "pool_type", "type": "string" }, { "name": "frontcat_id", "type": "array", "splitter": "" }, { "name": "categoryid", "type": "array", "splitter": "" }, { "name": "gmt_create", "type": "string" }, { "name": "taglist", "type": "array", "splitter": " " }, { "name": "property", "type": "string" }, { "name": "scorea", "type": "int" }, { "name": "scoreb", "type": "int" }, { "name": "scorec", "type": "int" }, { "name": "a.b", "type": "document.int" }, { "name": "a.b.c", "type": "document.array", "splitter": " " } ] }, "stepType": "mongodb" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//错误记录数。 }, "speed":{ "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。 "concurrent":1 //作业并发数。 "mbps":"12"//限流 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
2.同步mongon表,按时间增量同步,并做分区表
同步mongon表,按时间增量同步,并做分区表 同步脚本如下: { "type": "job", "version": "2.0", "steps": [ { "stepType": "mongodb", "parameter": { "datasource": "mongo_member", "envType": 0, "cursorTimeoutInMs": "600000", "query": "{\'reportTimeStr\':{\'$gte\':ISODate(\'${last_day}T00:00:00.424+0800\')}}", "column": [ { "name": "_id", "type": "string" }, { "name": "_class", "type": "string" }, { "name": "did", "type": "string" }, { "name": "stage", "type": "string" }, { "name": "platform", "type": "string" }, { "name": "channel", "type": "string" }, { "name": "appVersion", "type": "string" }, { "name": "reportWay", "type": "string" }, { "name": "reportTime", "type": "long" }, { "name": "reportTimeStr", "type": "string" }, { "name": "reportDate", "type": "string" } ], "tableComment": "This kind of datasource dosen\'t support get table comment. This is a comment produced by di.", "batchSize": "1000", "collectionName": "device_install_app_info" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "dt=${bizdate}", "truncate": true, "datasource": "odps_first", "envType": 0, "column": [ "id", "class", "did", "stage", "platform", "chanel", "appversion", "reportway", "reporttime", "reporttimestr", "reportdate" ], "emptyAsNull": false, "table": "ods_lx_mg_member_device_install_app_info" }, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "" }, "speed": { "throttle": false, "concurrent": 2 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
3.脚本2
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mongodb", "parameter": { "datasource": "mongo_member", "envType": 0, "cursorTimeoutInMs": "600000", "query": "{\'createTime\':{\'$gt\':NumberLong(\'1625414400000\')}}", "column": [ { "name": "_id", "type": "string" }, { "name": "userId", "type": "string" }, { "name": "summaryDate", "type": "string" }, { "name": "year", "type": "string" }, { "name": "month", "type": "string" }, { "name": "day", "type": "string" }, { "name": "stage", "type": "string" }, { "name": "platform", "type": "string" }, { "name": "channel", "type": "string" }, { "name": "isPaidUser", "type": "string" }, { "name": "isOldUser", "type": "string" }, { "name": "createTime", "type": "string" }, { "name": "_class", "type": "string" } ], "tableComment": "This kind of datasource dosen\'t support get table comment. This is a comment produced by di.", "batchSize": "1000", "collectionName": "user_request_summary" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": true, "datasource": "odps_first", "envType": 0, "column": [ "id", "class", "userid", "summarydate", "year", "month", "day", "stage", "platform", "channel", "ispaiduser", "createtime", "isolduser" ], "emptyAsNull": false, "table": "ods_lx_mg_member_user_request_summary" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
版权声明:本文为lxzcloud原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。