1. 介绍

canal [kə’næl],基于数据库增量日志解析,提供增量数据订阅&消费。

1.1 MYSQL主备复制实现

  1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  2. slave将master的binary log events拷贝到它的中继日志(relay log);
  3. slave重做中继日志中的事件,将改变反映它自己的数据。

img

1.2 Canal工作原理

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

img

1.3 Canal架构

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)

img

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

1.4 EventParser

image-20250429213306061

  1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
  2. Connection建立链接,发送BINLOG_DUMP指令
    // 0. write command number
    // 1. write 4 bytes bin-log position to start at
    // 2. write 2 bytes bin-log flags
    // 3. write 4 bytes server id of the slave
    // 4. write bin-log file name
  3. Mysql开始推送Binaly Log
  4. 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
    // 补充字段名字,字段类型,主键信息,unsigned类型处理
  5. 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
  6. 存储成功后,定时记录Binaly Log位置

1.5 EventSink

image-20250429213342520

  • 数据过滤:支持通配符的过滤模式,表名,字段内容等
  • 数据路由/分发:解决1:n (1个parser对应多个store的模式)
  • 数据归并:解决n:1 (多个parser对应1个store)
  • 数据加工:在进入store之前进行额外的处理,比如join

1.6 EventStore

  1. 目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式

  2. 借鉴了Disruptor的RingBuffer的实现思路

RingBuffer设计:

image-20250429213408885

定义了3个cursor

  • Put : Sink模块进行数据存储的最后一次写入位置
  • Get : 数据订阅获取的最后一次提取位置
  • Ack : 数据消费成功的最后一次消费位置

借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
image-20250429213444004

实现说明:

  • Put/Get/Ack cursor用于递增,采用long型存储
  • buffer的get操作,通过取余或者与操作。(与操作: cusor & (size - 1) , size需要为2的指数,效率比较高)

1.7 Instance

image-20250429213536203

instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。

抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:

  • manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
  • spring方式:基于spring xml + properties进行定义,构建spring配置.

1.8 Server

image-20250429214302258

server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现

  • Embeded : 对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
  • Netty : 基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠拢,push在数据量大的时候会有一些问题)

1.9 增量订阅、消费

get/ack/rollback协议介绍

1.10 HA机制

依赖了zookeeper 的两个特性:watcherEPHEMERAL 节点

img

canal server 启动流程如下:

1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (创建 EPHEMERAL 节点,谁创建成功就允许谁启动);

2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态;

3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动 instance ;

2. 安装使用

2.1 mysql配置

  1. 配置mysql的binlog日志
  2. 使用mysqlbinlog命令查询binlog日志
  3. 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
  4. 创建数据库商品表 t_product
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 1. 配置mysql的binlog日志

[root@localhost etc]# pwd
/etc
[root@localhost etc]#
[root@localhost etc]#
[root@localhost etc]# cat my.cnf
[mysqld]
bind-address=0.0.0.0
port=3306
user=mysql
basedir=/usr/local/mysql
datadir=/data/mysql
socket=/tmp/mysql.sock
log-error=/data/mysql/mysql.err
pid-file=/data/mysql/mysql.pid
#character config
character_set_server=utf8mb4
symbolic-links=0
explicit_defaults_for_timestamp=true

log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

// 2. 使用mysqlbinlog命令查询binlog日志

COMMIT/*!*/;
# at 2169182
#250428 23:19:58 server id 1 end_log_pos 2169247 CRC32 0x1628e32e Anonymous_GTID last_committed=5875 sequence_number=5876 rbr_only=yes
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 2169247
#250428 23:19:58 server id 1 end_log_pos 2169315 CRC32 0xa6e5ade3 Query thread_id=1056 exec_time=0 error_code=0
SET TIMESTAMP=1745853598/*!*/;
BEGIN
/*!*/;
# at 2169315
#250428 23:19:58 server id 1 end_log_pos 2169379 CRC32 0x7177adc3 Table_map: `canal`.`t_product` mapped to number 119
# at 2169379
#250428 23:19:58 server id 1 end_log_pos 2169481 CRC32 0x66fa212c Update_rows: table id 119 flags: STMT_END_F

BINLOG '
npwPaBMBAAAAQAAAACMaIQAAAHcAAAAAAAEABWNhbmFsAAl0X3Byb2R1Y3QABggP9gESEgb8AwoC
AAAAw613cQ==
npwPaB8BAAAAZgAAAIkaIQAAAHcAAAAAAAEAAgAG///AAgAAAAAAAAAIADExMTExMTExgABOHwAB
mbZ2sACZtnawAMACAAAAAAAAAAQAOTA5MIAATh8AAZm2drAAmbZ2sAAsIfpm
'/*!*/;
# at 2169481
#250428 23:19:58 server id 1 end_log_pos 2169512 CRC32 0x6a648f77 Xid = 264437
COMMIT/*!*/;
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]# cd /data/mysql/
[root@localhost mysql]#
[root@localhost mysql]#
[root@localhost mysql]# mysqlbinlog mysql-bin.000001

// 3. 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

// 4. 创建数据库商品表 t_product

CREATE TABLE `t_product` (
`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
`name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL ( 10, 2 ) NOT NULL,
`status` TINYINT ( 4 ) NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY ( `id` )
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

2.2 tcp模式

  • 配置canal.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #集群模式 zk地址
    canal.zkServers = localhost:2181
    #本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
    canal.serverMode = tcp
    #instance 列表
    canal.destinations = product-syn
    #conf root dir
    canal.conf.dir = ../conf
    #全局的spring配置方式的组件文件 生产环境,集群化部署
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  • 配置instan.properties

    在 conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=127.0.0.1:3306
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    ...

    # table regex
    canal.instance.filter.regex=canal.t_product

    启动 canal server :

    1
    sh bin/startup.sh

    查看 server 日志:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [root@localhost ~]# tail -f /opt/module/canal_1.1.8/logs/canal/canal.log 
    2025-04-28 22:58:53.072 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start example successful.
    2025-04-28 23:24:03.562 [Thread-7] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## stop the canal server
    2025-04-28 23:24:05.312 [Thread-7] INFO com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[192.168.122.1(192.168.122.1):11111]
    2025-04-28 23:24:05.416 [Thread-7] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## canal server is down.
    2025-04-28 23:38:20.714 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2025-04-28 23:38:20.718 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2025-04-28 23:38:20.725 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
    2025-04-28 23:38:20.801 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.100.111(192.168.100.111):11111]
    2025-04-28 23:38:21.716 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
    2025-04-28 23:38:21.832 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start example successful.

    查看 instance 的日志:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [root@localhost product-syn]# tail -f /opt/module/canal_1.1.8/logs/product-syn/product-syn.log 
    2025-04-28 23:24:05.300 [Thread-7] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
    2025-04-28 23:38:21.271 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-product-syn
    2025-04-28 23:38:21.667 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal.t_product$
    2025-04-28 23:38:21.667 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
    2025-04-28 23:38:21.672 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
    2025-04-28 23:38:21.744 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
    2025-04-28 23:38:21.814 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
    {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":2169481,"serverId":1,"timestamp":1745853598000}}
    2025-04-28 23:38:21.967 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=2169481,serverId=1,gtid=,timestamp=1745853598000] cost : 217ms , the next step is binlog dump
    2025-04-28 23:38:21.980 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlConnection - load MySQL @@version_comment : MySQL Community Server (GPL)

    使用prettyZoo查看zk:

    image-20250429221225857

    采用 Canal 源码中的集群客户端的测试例子 ClusterCanalClientTest 启动:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class ClusterCanalClientTest extends AbstractCanalClientTest {

    public ClusterCanalClientTest(String destination){
    super(destination);
    }

    public static void main(String args[]) {
    String destination = "product-syn";

    // 基于固定canal server的地址,建立链接,其中一台server发生crash,可以支持failover
    // CanalConnector connector = CanalConnectors.newClusterConnector(
    // Arrays.asList(new InetSocketAddress(
    // AddressUtils.getHostIp(),
    // 11111)),
    // "stability_test", "", "");

    // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
    CanalConnector connector = CanalConnectors.newClusterConnector("192.168.100.111:2181",
    destination,
    "canal",
    "canal");

    final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);
    clientTest.setConnector(connector);
    clientTest.start();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    try {
    logger.info("## stop the canal client");
    clientTest.stop();
    } catch (Throwable e) {
    logger.warn("##something goes wrong when stopping canal:", e);
    } finally {
    logger.info("## canal client is down.");
    }
    }));
    }
    }

    修改canal.t_product表后,可以在控制台看到:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    ****************************************************
    * Batch Id: [2] ,count : [3] , memsize : [202] , Time : 2025-04-29 22:15:37
    * Start : [mysql-bin.000001:2169577:1745936141000(2025-04-29 22:15:41)]
    * End : [mysql-bin.000001:2169812:1745936141000(2025-04-29 22:15:41)]
    ****************************************************

    ================> binlog[mysql-bin.000001:2169577] , executeTime : 1745936141000(2025-04-29 22:15:41) , gtid : () , delay : -3072ms
    BEGIN ----> Thread id: 1123
    ----------------> binlog[mysql-bin.000001:2169709] , name[canal,t_product] , eventType : UPDATE , executeTime : 1745936141000(2025-04-29 22:15:41) , gtid : () , delay : -3070 ms
    id : 3 type=bigint(20)
    name : 1290999 type=varchar(255) update=true
    price : 1.0 type=decimal(10,2)
    status : 1 type=tinyint(4)
    create_time : 2025-04-27 12:00:00 type=datetime
    update_time : 2025-04-27 12:00:00 type=datetime
    ----------------
    END ----> transaction id: 333327
    ================> binlog[mysql-bin.000001:2169812] , executeTime : 1745936141000(2025-04-29 22:15:41) , gtid : () , delay : -3066ms

2.3 mq模式

3. 源码解析

common模块:主要是提供了一些公共的工具类和接口。

client模块:canal的客户端。核心接口为CanalConnector

example模块:提供client模块使用案例。

protocol模块:client和server模块之间的通信协议

deployer:部署模块。通过该模块提供的CanalLauncher来启动canal server

server模块:canal服务器端。核心接口为CanalServer

instance模块:一个server有多个instance。每个instance都会模拟成一个mysql实例的slave。instance模块有四个核心组成部分:parser模块、sink模块、store模块,meta模块。核心接口为CanalInstance

parser模块:数据源接入,模拟slave协议和master进行交互,协议解析。parser模块依赖于dbsync、driver模块。

driver模块和dbsync模块:从这两个模块的artifactId(canal.parse.driver、canal.parse.dbsync),就可以看出来,这两个模块实际上是parser模块的组件。事实上parser 是通过driver模块与mysql建立连接,从而获取到binlog。由于原始的binlog都是二进制流,需要解析成对应的binlog事件,这些binlog事件对象都定义在dbsync模块中,dbsync 模块来自于淘宝的tddl。

sink模块:parser和store链接器,进行数据过滤,加工,分发的工作。核心接口为CanalEventSink

store模块:数据存储。核心接口为CanalEventStore

meta模块:增量订阅&消费信息管理器,核心接口为CanalMetaManager,主要用于记录canal消费到的mysql binlog的位置,

4.

5.


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.