1.DataX介绍

DataX

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

Features

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

安装

Download DataX下载地址

解压后即可使用,运行脚本如下

python27 datax.py ..\job\test.json

2.DataX数据同步

2.1 从MySQL到MySQL

建表语句

DROP TABLE IF EXISTS `tb_dmp_requser`;
CREATE TABLE `tb_dmp_requser` (
  `reqid` varchar(50) NOT NULL COMMENT \'活动编号\',
  `exetype` varchar(50) DEFAULT NULL COMMENT \'执行类型\',
  `allnum` varchar(11) DEFAULT NULL COMMENT \'全部目标用户数量\',
  `exenum` varchar(11) DEFAULT NULL COMMENT \'执行的目标用户数据\',
  `resv` varchar(50) DEFAULT NULL,
  `createtime` datetime DEFAULT NULL
) 

将dmp数据库的tb_dmp_requser表拷贝到dota2_databank的tb_dmp_requser表

job_mysql_to_mysql.json如下

{
    "job": {
        "content": [{
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "column": [
                        "allnum", "reqid"
                    ],
                    "connection": [{
                        "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/dmp"],
                        "table": ["tb_dmp_requser"]
                    }],
                    "password": "123456",
                    "username": "root"
                }
            },
            "writer": {
                "name": "mysqlwriter",
                "parameter": {
                    "column": [
                        "allnum", "reqid"
                    ],
                            "preSql": [
                        "delete from tb_dmp_requser"
                    ],
                    "connection": [{
                        "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/dota2_databank",
                        "table": ["tb_dmp_requser"]
                    }],
                    "password": "123456",
                    "username": "root",
                    "writeMode": "replace"
                }
            }
        }],
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}

 2.2 从Oracle到Oracle

将scott用户下的test表拷贝到test用户下的test表

建表语句

drop table TEST;

CREATE TABLE TEST (
ID NUMBER(32) NULL,
NAME VARCHAR2(255 BYTE) NULL
)
LOGGING
NOCOMPRESS
NOCACHE;

job_oracle_oracle.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "oraclereader",
                    "parameter": {
                        "column": ["id","name"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:oracle:thin:@localhost:1521:ORCL"],
                                "table": ["test"]
                            }
                        ],
                        "password": "tiger",
                        "username": "scott",
                        "where":"rownum < 1000"
                    }
                },
                "writer": {
                    "name": "oraclewriter",
                    "parameter": {
                        "column": ["id","name"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:ORCL",
                                "table": ["test"]
                            }
                        ],
                        "password": "test",
                        "username": "test"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

2.3 从HBase到本地

将HBase的”LXW”表拷贝到本地路径../job/datax_hbase

建表语句,添加两条数据

hbase(main):046:0> create \'LXW\',\'CF\'
0 row(s) in 1.2120 seconds

=> Hbase::Table - LXW
hbase(main):047:0> put \'LXW\',\'row1\',\'CF:NAME\',\'lxw\'
0 row(s) in 0.0120 seconds

hbase(main):048:0> put \'LXW\',\'row1\',\'CF:AGE\',\'18\'
0 row(s) in 0.0080 seconds

hbase(main):049:0> put \'LXW\',\'row1\',\'CF:ADDRESS\',\'BeijingYiZhuang\'
0 row(s) in 0.0070 seconds

hbase(main):050:0> put \'LXW\',\'row2\',\'CF:ADDRESS\',\'BeijingYiZhuang2\'
0 row(s) in 0.0060 seconds

hbase(main):051:0> put \'LXW\',\'row2\',\'CF:AGE\',\'18\'
0 row(s) in 0.0050 seconds

hbase(main):052:0> put \'LXW\',\'row2\',\'CF:NAME\',\'lxw2\'
0 row(s) in 0.0040 seconds

hbase(main):053:0> exit

job_hbase_to_local.json

hbase高可用集群配置参考https://www.cnblogs.com/Java-Starter/p/10756647.html

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hbase11xreader", 
                    "parameter": {
                       "hbaseConfig": {
                            "hbase.zookeeper.quorum": "CentOS7Five:2181,CentOS7Six:2181,CentOS7Seven:2181"
                        },
                        "table": "LXW",
                        "encoding": "utf-8",
                        "mode": "normal",
                        "column": [
                        {
                          "name":"rowkey",
                          "type":"string"
                        },
                        {
                          "name":"CF:NAME",
                          "type":"string"
                        },
                        {
                          "name":"CF:AGE",
                          "type":"string"
                        },
                        {
                          "name":"CF:ADDRESS",
                          "type":"string"
                        }

                        ], 
                      
                        "range": {
                            "endRowkey": "", 
                            "isBinaryRowkey": false, 
                            "startRowkey": ""
                        }
                  
                    }
                }, 
                "writer": {
                    "name": "txtfilewriter", 
                    "parameter": {
                        "dateFormat": "yyyy-MM-dd", 
                        "fieldDelimiter": "\t", 
                        "fileName": "LXW", 
                        "path": "../job/datax_hbase", 
                        "writeMode": "truncate"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 5
            }
        }
    }
}

../job/datax_hbase路径下生成文件LXW__e647d969_d2c6_47ad_9534_15c90d696099

文件内容如下

row1    lxw    18    BeijingYiZhuang
row2    lxw2    18    BeijingYiZhuang2

2.4 从本地到HBase

将本地文件导入到HBase的LXW表中

源数据source.txt

row3,jjj1,150,BeijingYiZhuang3
row4,jjj2,150,BeijingYiZhuang4

job_local_to_hbase.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 5
      }
    },
    "content": [
      {
        "reader": {
          "name": "txtfilereader",
          "parameter": {
            "path": "../job/datax_hbase/source.txt",
            "charset": "UTF-8",
            "column": [
              {
                "index": 0,
                "type": "String"
              },
              {
                "index": 1,
                "type": "string"
              },
              {
                "index": 2,
                "type": "string"
              },
              {
                "index": 3,
                "type": "string"
              }
            ],
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "hbase11xwriter",
          "parameter": {
            "hbaseConfig": {
              "hbase.zookeeper.quorum": "CentOS7Five:2181,CentOS7Six:2181,CentOS7Seven:2181"
            },
            "table": "LXW",
            "mode": "normal",
            "rowkeyColumn": [
                {
                  "index":0,
                  "type":"string"
                }
            ],
            "column": [
              {
                    "index":1,
                  "name":"CF:NAME",
                  "type":"string"
                },
                {
                    "index":2,
                  "name":"CF:AGE",
                  "type":"string"
                },
                {
                    "index":3,
                  "name":"CF:ADDRESS",
                  "type":"string"
                }
            ],
            "versionColumn":{
              "index": -1,
              "value":"123456789"
            },
            "encoding": "utf-8"
          }
        }
      }
    ]
  }
}

导入过后可以看到,新增的数据

hbase(main):241:0* get \'LXW\',\'row3\'
COLUMN                    CELL                                                                  
 CF:ADDRESS               timestamp=123456789, value=BeijingYiZhuang3                           
 CF:AGE                   timestamp=123456789, value=150                                        
 CF:NAME                  timestamp=123456789, value=jjj1 

2.5 从本地到HDFS/Hive

HDFS导入到本地不支持高可用,所以这里不做实验

Hive高可用配置参考https://www.cnblogs.com/Java-Starter/p/10756528.html

将本地数据文件导入到HDFS/Hive,在Hive上建表才可以导入

因为路径的问题,只能在Linux端操作

源数据source.txt

3,1,150,33
4,2,150,44

建表语句

 create table datax_test(
  col1 varchar(10),
 col2 varchar(10),
   col3 varchar(10),
  col4 varchar(10)
 )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY \',\'
   STORED AS ORC;

fileType要orc,text类型必须要压缩,有可能乱码

job_local_to_hdfs.json

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["../job/datax_hbase/source.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "String"
                            },
                            {
                                "index": 1,
                                "type": "String"
                            },
                            {
                                "index": 2,
                                "type": "String"
                            },
                            {
                                "index": 3,
                                "type": "String"
                            }
                        ],
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                "defaultFS": "hdfs://ns1/",
                         "hadoopConfig":{
                                 "dfs.nameservices": "ns1",
                                 "dfs.ha.namenodes.ns1": "nn1,nn2",
                                 "dfs.namenode.rpc-address.ns1.nn1": "CentOS7One:9000",
                                 "dfs.namenode.rpc-address.ns1.nn2": "CentOS7Two:9000",
                                 "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                         },
                        "fileType": "orc",
                        "path": "/user/hive/warehouse/datax_test",
                        "fileName": "datax_test",
                        "column": [
                            {
                                "name": "col1",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col2",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col3",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col4",
                                "type": "VARCHAR"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": ",",
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

 导入完毕,查看hive

Time taken: 0.105 seconds
hive> 
    > 
    > 
    > select *from datax_test;
OK
3       1       150     33
4       2       150     44
Time taken: 0.085 seconds, Fetched: 2 row(s)

2.6 从txt到oracle

txt,dat,csv等格式均可,该dat文件16G,一亿八千万条记录。

建表语句

CREATE TABLE T_CJYX_HOMECOUNT (
"ACYC_ID" VARCHAR2(4000 BYTE) NULL ,
"ADDRESS_ID" VARCHAR2(4000 BYTE) NULL ,
"ADDRESS_NAME" VARCHAR2(4000 BYTE) NULL ,
"ADDRESS_LEVEL" VARCHAR2(4000 BYTE) NULL ,
"CHECK_TARGET_NUM" VARCHAR2(4000 BYTE) NULL ,
"CHECK_VALUE" VARCHAR2(4000 BYTE) NULL ,
"TARGET_PHONE" VARCHAR2(4000 BYTE) NULL ,
"NOTARGET_PHONE" VARCHAR2(4000 BYTE) NULL ,
"PARENT_ID" VARCHAR2(4000 BYTE) NULL ,
"BCYC_ID" VARCHAR2(4000 BYTE) NULL 
)

job_txt_to_oracle.json文件如下

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 11
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["E:/opt/srcbigdata2/di_00121_20190427.dat"],
                        "encoding": "UTF-8",
                        "nullFormat": "",
                        "column": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            },
                                                        {
                                "index": 2,
                                "type": "string"
                            },
                            {
                                "index": 3,
                                "type": "string"
                            },
                                                        {
                                "index": 4,
                                "type": "string"
                            },
                            {
                                "index": 5,
                                "type": "string"
                            },
                                                        {
                                "index": 6,
                                "type": "string"
                            },
                            {
                                "index": 7,
                                "type": "string"
                            },
                                                        {
                                "index": 8,
                                "type": "string"
                            },
                            {
                                "index": 9,
                                "type": "string"
                            },
                     
                        ],
                        "fieldDelimiter": "$"
                    }
                },
                "writer": {
                    "name": "oraclewriter",
                    "parameter": {
                        "column": ["acyc_id","address_id","address_name","address_level","check_target_num","check_value","target_phone","notarget_phone","parent_id","bcyc_id"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:oracle:thin:@localhost:1521:ORCL",
                                "table": ["T_CJYX_HOMECOUNT"]
                            }
                        ],
                        "password": "test",
                        "username": "test"
                    }
                }
            }
        ]
    }
}

脚本

python27 datax.py ../job/job_txt_to_oracle.json

 

效率比oracle自带的sqlldr快很多,只需要117分钟,就导入了一亿八千万数据,sqlldr需要41小时。

2.7 从txt到txt

job_txt_to_txt.json如下

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["../job/data_txt/a.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            },
                     
                        ],
                        "fieldDelimiter": "$"
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "../job/data_txt/",
                        "fileName": "luohw",
                        "writeMode": "truncate",
                        "format": "yyyy-MM-dd"
                    }
                }
            }
        ]
    }
}

 导入完毕生成文件如下

 

版权声明:本文为Java-Starter原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/Java-Starter/p/10869324.html