1 全量同步工具

img

同步组件工具地址:

https://github.com/makemyownlife/shardingsphere-jdbc-demo/tree/main/swift-data-transfer/data-sync

如图中,同步组件会将老订单库的三张表的数据通过 shardingsphere-proxy 数据桥梁均匀分布到新的 4 个 订单 MySQL 库里。

img

2 实战演示

img

首先,我们编辑配置文件:

img

配置文件包含三个元素:

  1. 同步表:哪些表需要同步
  2. 源数据源:旧 MySQL 订单库
  3. 目标数据源:shardingsphere-proxy 虚拟库 (背后是 4 个 MySQL 新分库)

然后启同步服务 Main 函数 :

img

启动成功之后,我们可以看到表的执行结果:

img

3 组件原理

img

1、获取表的元数据

因此最终我们需要将表的数据 insert 到目标数据库,所以,我们必须知道表包含了哪些列,以及列对应的类型

1
LinkedHashMap<String, Integer> columnTypes = Utils.getColumnTypes(sourceDataSource, tableName);

在 Utils 工具类里,定义 getColumnTypes 方法:

img

2、执行 SQL 语句(游标模式)

img

首先我们获取源数据源的 JDBC 连接,然后通过预编译执行查询语句 , 预编译定义为游标模式。

为了满足游标模式,我们在 JDBC url 连接需要如下配置:

img

3、得到源数据库查询结果集

img

开始执行查询 SQL 后, 得到结果集,并遍历结果集,让结果集中的数据插入到 目标数据库表。

这里面分为三个步骤:

  1. 将数据转换成 RowData 对象
  2. 组装 Insert SQL
  3. 写入到另一个数据源

4、将数据转换成 RowData 对象

img

5、组装 Insert SQL

img

6、写入到目标数据源表

img

这一段是非常典型的 JDBC 执行 UPDATE 写法,最核心的如何动态赋值

以下是动态赋值部分代码截图:

img

4 思维升级

写到这里,我们展示了如何简单设计一个全量同步的工具。

但还是非常粗糙,还需要从如下几点进行优化:

  1. 任务模型:可否并行支持多个任务并行处理
  2. checkpoint 机制: 当任务执行时,假如突发宕机或者线程执行异常,需要保存执行点,以便下次调度重新执行
  3. 更灵活的 ETL 机制

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.