1. 介绍
canal [kə’næl],基于数据库增量日志解析,提供增量数据订阅&消费。
1.1 MYSQL主备复制实现
- master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
- slave将master的binary log events拷贝到它的中继日志(relay log);
- slave重做中继日志中的事件,将改变反映它自己的数据。
1.2 Canal工作原理
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
1.3 Canal架构
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
1.4 EventParser
- Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
- Connection建立链接,发送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name - Mysql开始推送Binaly Log
- 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
// 补充字段名字,字段类型,主键信息,unsigned类型处理 - 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
- 存储成功后,定时记录Binaly Log位置
1.5 EventSink
- 数据过滤:支持通配符的过滤模式,表名,字段内容等
- 数据路由/分发:解决1:n (1个parser对应多个store的模式)
- 数据归并:解决n:1 (多个parser对应1个store)
- 数据加工:在进入store之前进行额外的处理,比如join
1.6 EventStore
目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式
借鉴了Disruptor的RingBuffer的实现思路
RingBuffer设计:

定义了3个cursor
- Put : Sink模块进行数据存储的最后一次写入位置
- Get : 数据订阅获取的最后一次提取位置
- Ack : 数据消费成功的最后一次消费位置
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:
实现说明:
- Put/Get/Ack cursor用于递增,采用long型存储
- buffer的get操作,通过取余或者与操作。(与操作: cusor & (size - 1) , size需要为2的指数,效率比较高)
1.7 Instance
instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。
抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:
- manager方式: 和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
- spring方式:基于spring xml + properties进行定义,构建spring配置.
1.8 Server
server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现
- Embeded : 对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
- Netty : 基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠拢,push在数据量大的时候会有一些问题)
1.9 增量订阅、消费
get/ack/rollback协议介绍
1.10 HA机制
依赖了zookeeper 的两个特性:watcher、EPHEMERAL 节点
canal server 启动流程如下:
1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (创建 EPHEMERAL 节点,谁创建成功就允许谁启动);
2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态;
3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动 instance ;
2. 安装使用
2.1 mysql配置
- 配置mysql的binlog日志
- 使用mysqlbinlog命令查询binlog日志
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
- 创建数据库商品表 t_product
1 | // 1. 配置mysql的binlog日志 |
2.2 tcp模式
配置canal.properties
1
2
3
4
5
6
7
8
9
10#集群模式 zk地址
canal.zkServers = localhost:2181
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的组件文件 生产环境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml配置instan.properties
在 conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties。
1
2
3
4
5
6
7
8
9
10
11
12# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=127.0.0.1:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# table regex
canal.instance.filter.regex=canal.t_product启动 canal server :
1
sh bin/startup.sh
查看 server 日志:
1
2
3
4
5
6
7
8
9
10
11[root@localhost ~]# tail -f /opt/module/canal_1.1.8/logs/canal/canal.log
2025-04-28 22:58:53.072 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start example successful.
2025-04-28 23:24:03.562 [Thread-7] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## stop the canal server
2025-04-28 23:24:05.312 [Thread-7] INFO com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[192.168.122.1(192.168.122.1):11111]
2025-04-28 23:24:05.416 [Thread-7] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## canal server is down.
2025-04-28 23:38:20.714 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2025-04-28 23:38:20.718 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2025-04-28 23:38:20.725 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2025-04-28 23:38:20.801 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.100.111(192.168.100.111):11111]
2025-04-28 23:38:21.716 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
2025-04-28 23:38:21.832 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start example successful.查看 instance 的日志:
1
2
3
4
5
6
7
8
9
10
11[root@localhost product-syn]# tail -f /opt/module/canal_1.1.8/logs/product-syn/product-syn.log
2025-04-28 23:24:05.300 [Thread-7] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
2025-04-28 23:38:21.271 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-product-syn
2025-04-28 23:38:21.667 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal.t_product$
2025-04-28 23:38:21.667 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2025-04-28 23:38:21.672 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2025-04-28 23:38:21.744 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2025-04-28 23:38:21.814 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":2169481,"serverId":1,"timestamp":1745853598000}}
2025-04-28 23:38:21.967 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=2169481,serverId=1,gtid=,timestamp=1745853598000] cost : 217ms , the next step is binlog dump
2025-04-28 23:38:21.980 [destination = product-syn , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlConnection - load MySQL @@version_comment : MySQL Community Server (GPL)使用prettyZoo查看zk:
采用 Canal 源码中的集群客户端的测试例子 ClusterCanalClientTest 启动:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class ClusterCanalClientTest extends AbstractCanalClientTest {
public ClusterCanalClientTest(String destination){
super(destination);
}
public static void main(String args[]) {
String destination = "product-syn";
// 基于固定canal server的地址,建立链接,其中一台server发生crash,可以支持failover
// CanalConnector connector = CanalConnectors.newClusterConnector(
// Arrays.asList(new InetSocketAddress(
// AddressUtils.getHostIp(),
// 11111)),
// "stability_test", "", "");
// 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
CanalConnector connector = CanalConnectors.newClusterConnector("192.168.100.111:2181",
destination,
"canal",
"canal");
final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);
clientTest.setConnector(connector);
clientTest.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
logger.info("## stop the canal client");
clientTest.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:", e);
} finally {
logger.info("## canal client is down.");
}
}));
}
}修改canal.t_product表后,可以在控制台看到:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18****************************************************
* Batch Id: [2] ,count : [3] , memsize : [202] , Time : 2025-04-29 22:15:37
* Start : [mysql-bin.000001:2169577:1745936141000(2025-04-29 22:15:41)]
* End : [mysql-bin.000001:2169812:1745936141000(2025-04-29 22:15:41)]
****************************************************
================> binlog[mysql-bin.000001:2169577] , executeTime : 1745936141000(2025-04-29 22:15:41) , gtid : () , delay : -3072ms
BEGIN ----> Thread id: 1123
----------------> binlog[mysql-bin.000001:2169709] , name[canal,t_product] , eventType : UPDATE , executeTime : 1745936141000(2025-04-29 22:15:41) , gtid : () , delay : -3070 ms
id : 3 type=bigint(20)
name : 1290999 type=varchar(255) update=true
price : 1.0 type=decimal(10,2)
status : 1 type=tinyint(4)
create_time : 2025-04-27 12:00:00 type=datetime
update_time : 2025-04-27 12:00:00 type=datetime
----------------
END ----> transaction id: 333327
================> binlog[mysql-bin.000001:2169812] , executeTime : 1745936141000(2025-04-29 22:15:41) , gtid : () , delay : -3066ms
2.3 mq模式
略
3. 源码解析
common模块:主要是提供了一些公共的工具类和接口。
client模块:canal的客户端。核心接口为CanalConnector
example模块:提供client模块使用案例。
protocol模块:client和server模块之间的通信协议
deployer:部署模块。通过该模块提供的CanalLauncher来启动canal server
server模块:canal服务器端。核心接口为CanalServer
instance模块:一个server有多个instance。每个instance都会模拟成一个mysql实例的slave。instance模块有四个核心组成部分:parser模块、sink模块、store模块,meta模块。核心接口为CanalInstance
parser模块:数据源接入,模拟slave协议和master进行交互,协议解析。parser模块依赖于dbsync、driver模块。
driver模块和dbsync模块:从这两个模块的artifactId(canal.parse.driver、canal.parse.dbsync),就可以看出来,这两个模块实际上是parser模块的组件。事实上parser 是通过driver模块与mysql建立连接,从而获取到binlog。由于原始的binlog都是二进制流,需要解析成对应的binlog事件,这些binlog事件对象都定义在dbsync模块中,dbsync 模块来自于淘宝的tddl。
sink模块:parser和store链接器,进行数据过滤,加工,分发的工作。核心接口为CanalEventSink
store模块:数据存储。核心接口为CanalEventStore
meta模块:增量订阅&消费信息管理器,核心接口为CanalMetaManager,主要用于记录canal消费到的mysql binlog的位置,