DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库( MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

1. DataX原理

为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。

img

DataX 本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

img

  1. Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
  2. Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
  3. Framework:Framework 用于连接 reader 和 writer ,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,下图按一个 DataX 作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

img

  1. DataX 完成单个数据同步的作业,我们称之为 Job,DataX 接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子 Task)、TaskGroup 管理等功能。
  2. DataX Job 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便于并发执行。Task 便是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。
  3. 切分多个 Task 之后,DataX Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为 5。
  4. 每一个 Task 都由 TaskGroup 负责启动,Task 启动后,会固定启动 Reader -> Channel -> Writer 的线程来完成任务同步工作。
  5. DataX 作业运行起来之后,Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非 0。

举例来说,用户提交了一个 DataX 作业,并且配置了 20 个并发,目的是将一个 100 张分表的 mysql 数据同步到 odps 里面。

DataX的调度决策思路是:

  1. DataX Job 根据分库分表切分成了 100 个 Task。
  2. 根据 20 个并发,DataX 计算共需要分配 4 个 Task Group。
  3. 4 个 Task Group 平分切分好的 100 个 Task,每一个 Task Group 负责以 5 个并发共计运行 25 个 Task。

2. 安装DataX软件并验证

1
2
3
4
5
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

tar zxf datax.tar.gz -C /data/public/

rm -rf /data/public/datax/plugin/*/._* # 需要删除隐藏文件 (重要)

验证:

1
2
3
cd /data/public/datax/bin

python datax.py ../job/job.json # 用来验证是否安装成功

输出:
img

3. 配置示例:从stream读取数据并打印到控制台

1、创建作业的配置文件( json 格式)

可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

执行如下命令:

1
2
3
cd /data/public/datax ## 进入 datax 安装目录

python datax.py -r streamreader -w streamwriter

输出:img根据模板编写 json 文件 :
img

2、执行任务

1
2
3
$ cd /data/public/datax #  进入 datax 目录

$ python datax.py ./stream2stream.json

输出:

img

4. Mysql旧订单库同步到4个Mysql分库

1、生成 MySQL 到 MySQL 同步的模板

因为这里是 MySQL 同步到 MySQL (shardingsphere proxy 实现了 MySQL 通讯协议), 所以读是 mysqlreader , 写是 mysqlwriter 。

1
2
3
cd /data/public/datax

python datax.py -r mysqlreader -w mysqlwriter

img

2、编写 MySQL 到 MySQL 同步的模板

在 终端执行

1
vim ordertransfer.json  

内容如下:
img

3、执行任务

1
2
3
cd /data/public/datax 

python datax.py ordertransfer.json

img

接下来查看 shardingsphere proxy 的四个订单分库 :
img

原来老订单库测试订单数据已迁移到目标分片中,测试成功。


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.