1. 核心类图

image-20250526223016817

image-20250526223143460

2. Generator

2.1 ManagerCanalInstanceGenerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ManagerCanalInstanceGenerator implements CanalInstanceGenerator {

private CanalConfigClient canalConfigClient;

public CanalInstance generate(String destination) {
Canal canal = canalConfigClient.findCanal(destination);
String filter = canalConfigClient.findFilter(destination);
return new CanalInstanceWithManager(canal, filter);
}

// ================ setter / getter ================

public void setCanalConfigClient(CanalConfigClient canalConfigClient) {
this.canalConfigClient = canalConfigClient;
}

}

2.2 PlainCanalInstanceGenerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {

private static final Logger logger = LoggerFactory.getLogger(PlainCanalInstanceGenerator.class);
private String springXml;
private PlainCanalConfigClient canalConfigClient;
private String defaultName = "instance";
private BeanFactory beanFactory;
private Properties canalConfig;

public PlainCanalInstanceGenerator(Properties canalConfig){
this.canalConfig = canalConfig;
}

public CanalInstance generate(String destination) {
synchronized (CanalEventParser.class) {
try {
PlainCanal canal = canalConfigClient.findInstance(destination, null);
if (canal == null) {
throw new CanalException("instance : " + destination + " config is not found");
}
Properties properties = canal.getProperties();
// merge local
properties.putAll(canalConfig);

// 设置动态properties,替换掉本地properties
com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(properties);
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty("canal.instance.destination", destination);
this.beanFactory = getBeanFactory(springXml);
String beanName = destination;
if (!beanFactory.containsBean(beanName)) {
beanName = defaultName;
}

return (CanalInstance) beanFactory.getBean(beanName);
} catch (Throwable e) {
logger.error("generator instance failed.", e);
throw new CanalException(e);
} finally {
System.setProperty("canal.instance.destination", "");
com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.remove();
}
}
}

// ================ setter / getter ================

private BeanFactory getBeanFactory(String springXml) {
if (!StringUtils.startsWithIgnoreCase(springXml, "classpath:")) {
springXml = "classpath:" + springXml;
}
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
return applicationContext;
}

public void setCanalConfigClient(PlainCanalConfigClient canalConfigClient) {
this.canalConfigClient = canalConfigClient;
}

public void setSpringXml(String springXml) {
this.springXml = springXml;
}

}

2.3 SpringCanalInstanceGenerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SpringCanalInstanceGenerator implements CanalInstanceGenerator {

private static final Logger logger = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);
private String springXml;
private String defaultName = "instance";
private BeanFactory beanFactory;

public CanalInstance generate(String destination) {
synchronized (CanalEventParser.class) {
try {
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty("canal.instance.destination", destination);
this.beanFactory = getBeanFactory(springXml);
String beanName = destination;
if (!beanFactory.containsBean(beanName)) {
beanName = defaultName;
}

return (CanalInstance) beanFactory.getBean(beanName);
} catch (Throwable e) {
logger.error("generator instance failed.", e);
throw new CanalException(e);
} finally {
System.setProperty("canal.instance.destination", "");
}
}
}

private BeanFactory getBeanFactory(String springXml) {
if (!StringUtils.startsWithIgnoreCase(springXml, "classpath:")) {
springXml = "classpath:" + springXml;
}
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
return applicationContext;
}

public void setSpringXml(String springXml) {
this.springXml = springXml;
}
}

3. Manager

3.1 CanalInstanceWithManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
public class CanalInstanceWithManager extends AbstractCanalInstance {

private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithManager.class);
protected String filter; // 过滤表达式
protected CanalParameter parameters; // 对应参数

public CanalInstanceWithManager(Canal canal, String filter){
this.parameters = canal.getCanalParameter();
this.canalId = canal.getId();
this.destination = canal.getName();
this.filter = filter;

logger.info("init CanalInstance for {}-{} with parameters:{}", canalId, destination, parameters);
// 初始化报警机制
initAlarmHandler();
// 初始化metaManager
initMetaManager();
// 初始化eventStore
initEventStore();
// 初始化eventSink
initEventSink();
// 初始化eventParser;
initEventParser();

// 基础工具,需要提前start,会有先订阅再根据filter条件启动parse的需求
if (!alarmHandler.isStart()) {
alarmHandler.start();
}

if (!metaManager.isStart()) {
metaManager.start();
}
logger.info("init successful....");
}

public void start() {
// 初始化metaManager
logger.info("start CannalInstance for {}-{} with parameters:{}", canalId, destination, parameters);
super.start();
}

@SuppressWarnings("resource")
protected void initAlarmHandler() {
logger.info("init alarmHandler begin...");
String alarmHandlerClass = parameters.getAlarmHandlerClass();
String alarmHandlerPluginDir = parameters.getAlarmHandlerPluginDir();
if (alarmHandlerClass == null || alarmHandlerPluginDir == null) {
alarmHandler = new LogAlarmHandler();
} else {
try {
File externalLibDir = new File(alarmHandlerPluginDir);
File[] jarFiles = externalLibDir.listFiles((dir, name) -> name.endsWith(".jar"));
if (jarFiles == null || jarFiles.length == 0) {
throw new IllegalStateException(String.format("alarmHandlerPluginDir [%s] can't find any name endswith \".jar\" file.",
alarmHandlerPluginDir));
}
URL[] urls = new URL[jarFiles.length];
for (int i = 0; i < jarFiles.length; i++) {
urls[i] = jarFiles[i].toURI().toURL();
}
ClassLoader currentClassLoader = new URLClassLoader(urls,
CanalInstanceWithManager.class.getClassLoader());
Class<CanalAlarmHandler> _alarmClass = (Class<CanalAlarmHandler>) currentClassLoader.loadClass(alarmHandlerClass);
alarmHandler = _alarmClass.newInstance();
logger.info("init [{}] alarm handler success.", alarmHandlerClass);
} catch (Throwable e) {
String errorMsg = String.format("init alarmHandlerPluginDir [%s] alarm handler [%s] error: %s",
alarmHandlerPluginDir,
alarmHandlerClass,
ExceptionUtils.getFullStackTrace(e));
logger.error(errorMsg);
throw new CanalException(errorMsg, e);
}
}
logger.info("init alarmHandler end! \n\t load CanalAlarmHandler:{} ", alarmHandler.getClass().getName());
}

protected void initMetaManager() {
logger.info("init metaManager begin...");
MetaMode mode = parameters.getMetaMode();
if (mode.isMemory()) {
metaManager = new MemoryMetaManager();
} else if (mode.isZookeeper()) {
metaManager = new ZooKeeperMetaManager();
((ZooKeeperMetaManager) metaManager).setZkClientx(getZkclientx());
} else if (mode.isMixed()) {
// metaManager = new MixedMetaManager();
metaManager = new PeriodMixedMetaManager();// 换用优化过的mixed, at
// 2012-09-11
// 设置内嵌的zk metaManager
ZooKeeperMetaManager zooKeeperMetaManager = new ZooKeeperMetaManager();
zooKeeperMetaManager.setZkClientx(getZkclientx());
((PeriodMixedMetaManager) metaManager).setZooKeeperMetaManager(zooKeeperMetaManager);
} else if (mode.isLocalFile()) {
FileMixedMetaManager fileMixedMetaManager = new FileMixedMetaManager();
fileMixedMetaManager.setDataDir(parameters.getDataDir());
fileMixedMetaManager.setPeriod(parameters.getMetaFileFlushPeriod());
metaManager = fileMixedMetaManager;
} else {
throw new CanalException("unsupport MetaMode for " + mode);
}

logger.info("init metaManager end! \n\t load CanalMetaManager:{} ", metaManager.getClass().getName());
}

protected void initEventStore() {
logger.info("init eventStore begin...");
StorageMode mode = parameters.getStorageMode();
if (mode.isMemory()) {
MemoryEventStoreWithBuffer memoryEventStore = new MemoryEventStoreWithBuffer();
memoryEventStore.setBufferSize(parameters.getMemoryStorageBufferSize());
memoryEventStore.setBufferMemUnit(parameters.getMemoryStorageBufferMemUnit());
memoryEventStore.setBatchMode(BatchMode.valueOf(parameters.getStorageBatchMode().name()));
memoryEventStore.setDdlIsolation(parameters.getDdlIsolation());
memoryEventStore.setRaw(parameters.getMemoryStorageRawEntry());
eventStore = memoryEventStore;
} else if (mode.isFile()) {
// 后续版本支持
throw new CanalException("unsupport MetaMode for " + mode);
} else if (mode.isMixed()) {
// 后续版本支持
throw new CanalException("unsupport MetaMode for " + mode);
} else {
throw new CanalException("unsupport MetaMode for " + mode);
}

if (eventStore instanceof AbstractCanalStoreScavenge) {
StorageScavengeMode scavengeMode = parameters.getStorageScavengeMode();
AbstractCanalStoreScavenge eventScavengeStore = (AbstractCanalStoreScavenge) eventStore;
eventScavengeStore.setDestination(destination);
eventScavengeStore.setCanalMetaManager(metaManager);
eventScavengeStore.setOnAck(scavengeMode.isOnAck());
eventScavengeStore.setOnFull(scavengeMode.isOnFull());
eventScavengeStore.setOnSchedule(scavengeMode.isOnSchedule());
if (scavengeMode.isOnSchedule()) {
eventScavengeStore.setScavengeSchedule(parameters.getScavengeSchdule());
}
}
logger.info("init eventStore end! \n\t load CanalEventStore:{}", eventStore.getClass().getName());
}

protected void initEventSink() {
logger.info("init eventSink begin...");

int groupSize = getGroupSize();
if (groupSize <= 1) {
eventSink = new EntryEventSink();
} else {
eventSink = new GroupEventSink(groupSize);
}

if (eventSink instanceof EntryEventSink) {
((EntryEventSink) eventSink).setFilterTransactionEntry(false);
((EntryEventSink) eventSink).setEventStore(getEventStore());
}
// if (StringUtils.isNotEmpty(filter)) {
// AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(filter);
// ((AbstractCanalEventSink) eventSink).setFilter(aviaterFilter);
// }
logger.info("init eventSink end! \n\t load CanalEventSink:{}", eventSink.getClass().getName());
}

protected void initEventParser() {
logger.info("init eventParser begin...");
SourcingType type = parameters.getSourcingType();

List<List<DataSourcing>> groupDbAddresses = parameters.getGroupDbAddresses();
if (!CollectionUtils.isEmpty(groupDbAddresses)) {
int size = groupDbAddresses.get(0).size();// 取第一个分组的数量,主备分组的数量必须一致
List<CanalEventParser> eventParsers = new ArrayList<>();
for (int i = 0; i < size; i++) {
List<InetSocketAddress> dbAddress = new ArrayList<>();
SourcingType lastType = null;
for (List<DataSourcing> groupDbAddress : groupDbAddresses) {
if (lastType != null && !lastType.equals(groupDbAddress.get(i).getType())) {
throw new CanalException(String.format("master/slave Sourcing type is unmatch. %s vs %s",
lastType,
groupDbAddress.get(i).getType()));
}

lastType = groupDbAddress.get(i).getType();
dbAddress.add(groupDbAddress.get(i).getDbAddress());
}

// 初始化其中的一个分组parser
eventParsers.add(doInitEventParser(lastType, dbAddress));
}

if (eventParsers.size() > 1) { // 如果存在分组,构造分组的parser
GroupEventParser groupEventParser = new GroupEventParser();
groupEventParser.setEventParsers(eventParsers);
this.eventParser = groupEventParser;
} else {
this.eventParser = eventParsers.get(0);
}
} else {
// 创建一个空数据库地址的parser,可能使用了tddl指定地址,启动的时候才会从tddl获取地址
this.eventParser = doInitEventParser(type, new ArrayList<>());
}

logger.info("init eventParser end! \n\t load CanalEventParser:{}", eventParser.getClass().getName());
}

private CanalEventParser doInitEventParser(SourcingType type, List<InetSocketAddress> dbAddresses) {
CanalEventParser eventParser;
if (type.isMysql()) {
MysqlEventParser mysqlEventParser = null;
if (StringUtils.isNotEmpty(parameters.getRdsAccesskey())
&& StringUtils.isNotEmpty(parameters.getRdsSecretkey())
&& StringUtils.isNotEmpty(parameters.getRdsInstanceId())) {

mysqlEventParser = new RdsBinlogEventParserProxy();
((RdsBinlogEventParserProxy) mysqlEventParser).setAccesskey(parameters.getRdsAccesskey());
((RdsBinlogEventParserProxy) mysqlEventParser).setSecretkey(parameters.getRdsSecretkey());
((RdsBinlogEventParserProxy) mysqlEventParser).setInstanceId(parameters.getRdsInstanceId());
} else {
mysqlEventParser = new MysqlEventParser();
}
mysqlEventParser.setDestination(destination);
// 编码参数
mysqlEventParser.setConnectionCharset(parameters.getConnectionCharset());
// 网络相关参数1
mysqlEventParser.setDefaultConnectionTimeoutInSeconds(parameters.getDefaultConnectionTimeoutInSeconds());
mysqlEventParser.setSendBufferSize(parameters.getSendBufferSize());
mysqlEventParser.setReceiveBufferSize(parameters.getReceiveBufferSize());
// 心跳检查参数
mysqlEventParser.setDetectingEnable(parameters.getDetectingEnable());
mysqlEventParser.setDetectingSQL(parameters.getDetectingSQL());
mysqlEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
// 数据库信息参数
mysqlEventParser.setSlaveId(parameters.getSlaveId());
if (!CollectionUtils.isEmpty(dbAddresses)) {
mysqlEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
parameters.getDbUsername(),
parameters.getDbPassword(),
parameters.getDefaultDatabaseName(),
parameters.getSslInfo()));

if (dbAddresses.size() > 1) {
mysqlEventParser.setStandbyInfo(new AuthenticationInfo(dbAddresses.get(1),
parameters.getDbUsername(),
parameters.getDbPassword(),
parameters.getDefaultDatabaseName(),
parameters.getSslInfo()));
}
}

if (!CollectionUtils.isEmpty(parameters.getPositions())) {
EntryPosition masterPosition = JsonUtils.unmarshalFromString(parameters.getPositions().get(0),
EntryPosition.class);
// binlog位置参数
mysqlEventParser.setMasterPosition(masterPosition);

if (parameters.getPositions().size() > 1) {
EntryPosition standbyPosition = JsonUtils.unmarshalFromString(parameters.getPositions().get(1),
EntryPosition.class);
mysqlEventParser.setStandbyPosition(standbyPosition);
}
}
mysqlEventParser.setFallbackIntervalInSeconds(parameters.getFallbackIntervalInSeconds());
mysqlEventParser.setProfilingEnabled(false);
mysqlEventParser.setFilterTableError(parameters.getFilterTableError());
mysqlEventParser.setParallel(parameters.getParallel());
mysqlEventParser.setIsGTIDMode(BooleanUtils.toBoolean(parameters.getGtidEnable()));
mysqlEventParser.setMultiStreamEnable(parameters.getMultiStreamEnable());
// tsdb
if (parameters.getTsdbSnapshotInterval() != null) {
mysqlEventParser.setTsdbSnapshotInterval(parameters.getTsdbSnapshotInterval());
}
if (parameters.getTsdbSnapshotExpire() != null) {
mysqlEventParser.setTsdbSnapshotExpire(parameters.getTsdbSnapshotExpire());
}
boolean tsdbEnable = BooleanUtils.toBoolean(parameters.getTsdbEnable());
// manager启动模式默认使用mysql tsdb机制
final String tsdbSpringXml = "classpath:spring/tsdb/mysql-tsdb.xml";
if (tsdbEnable) {
mysqlEventParser.setTableMetaTSDBFactory(new DefaultTableMetaTSDBFactory() {

@Override
public void destory(String destination) {
TableMetaTSDBBuilder.destory(destination);
}

@Override
public TableMetaTSDB build(String destination, String springXml) {
synchronized (CanalInstanceWithManager.class) {
try {
System.setProperty("canal.instance.tsdb.url", parameters.getTsdbJdbcUrl());
System.setProperty("canal.instance.tsdb.dbUsername", parameters.getTsdbJdbcUserName());
System.setProperty("canal.instance.tsdb.dbPassword", parameters.getTsdbJdbcPassword());
System.setProperty("canal.instance.destination", destination);

return TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
} finally {
// reset
System.setProperty("canal.instance.destination", "");
System.setProperty("canal.instance.tsdb.url", "");
System.setProperty("canal.instance.tsdb.dbUsername", "");
System.setProperty("canal.instance.tsdb.dbPassword", "");
}
}
}
});
mysqlEventParser.setTsdbJdbcUrl(parameters.getTsdbJdbcUrl());
mysqlEventParser.setTsdbJdbcUserName(parameters.getTsdbJdbcUserName());
mysqlEventParser.setTsdbJdbcPassword(parameters.getTsdbJdbcPassword());
mysqlEventParser.setTsdbSpringXml(tsdbSpringXml);
mysqlEventParser.setEnableTsdb(tsdbEnable);
}
eventParser = mysqlEventParser;
} else if (type.isLocalBinlog()) {
LocalBinlogEventParser localBinlogEventParser = new LocalBinlogEventParser();
localBinlogEventParser.setDestination(destination);
localBinlogEventParser.setBufferSize(parameters.getReceiveBufferSize());
localBinlogEventParser.setConnectionCharset(parameters.getConnectionCharset());
localBinlogEventParser.setDirectory(parameters.getLocalBinlogDirectory());
localBinlogEventParser.setProfilingEnabled(false);
localBinlogEventParser.setDetectingEnable(parameters.getDetectingEnable());
localBinlogEventParser.setDetectingIntervalInSeconds(parameters.getDetectingIntervalInSeconds());
localBinlogEventParser.setFilterTableError(parameters.getFilterTableError());
localBinlogEventParser.setParallel(parameters.getParallel());
// 数据库信息,反查表结构时需要
if (!CollectionUtils.isEmpty(dbAddresses)) {
localBinlogEventParser.setMasterInfo(new AuthenticationInfo(dbAddresses.get(0),
parameters.getDbUsername(),
parameters.getDbPassword(),
parameters.getDefaultDatabaseName(),
parameters.getSslInfo()));
}

eventParser = localBinlogEventParser;
} else if (type.isOracle()) {
throw new CanalException("unsupport SourcingType for " + type);
} else {
throw new CanalException("unsupport SourcingType for " + type);
}

// add transaction support at 2012-12-06
if (eventParser instanceof AbstractEventParser) {
AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
abstractEventParser.setTransactionSize(parameters.getTransactionSize());
abstractEventParser.setLogPositionManager(initLogPositionManager());
abstractEventParser.setAlarmHandler(getAlarmHandler());
abstractEventParser.setEventSink(getEventSink());

if (StringUtils.isNotEmpty(filter)) {
AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(filter);
abstractEventParser.setEventFilter(aviaterFilter);
}

// 设置黑名单
if (StringUtils.isNotEmpty(parameters.getBlackFilter())) {
AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(parameters.getBlackFilter());
abstractEventParser.setEventBlackFilter(aviaterFilter);
}
}
if (eventParser instanceof MysqlEventParser) {
MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;

// 初始化haController,绑定与eventParser的关系,haController会控制eventParser
CanalHAController haController = initHaController();
mysqlEventParser.setHaController(haController);
}
return eventParser;
}

protected CanalHAController initHaController() {
logger.info("init haController begin...");
HAMode haMode = parameters.getHaMode();
CanalHAController haController = null;
if (haMode.isHeartBeat()) {
haController = new HeartBeatHAController();
((HeartBeatHAController) haController).setDetectingRetryTimes(parameters.getDetectingRetryTimes());
((HeartBeatHAController) haController).setSwitchEnable(parameters.getHeartbeatHaEnable());
} else {
throw new CanalException("unsupport HAMode for " + haMode);
}
logger.info("init haController end! \n\t load CanalHAController:{}", haController.getClass().getName());

return haController;
}

protected CanalLogPositionManager initLogPositionManager() {
logger.info("init logPositionPersistManager begin...");
IndexMode indexMode = parameters.getIndexMode();
CanalLogPositionManager logPositionManager;
if (indexMode.isMemory()) {
logPositionManager = new MemoryLogPositionManager();
} else if (indexMode.isZookeeper()) {
logPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
} else if (indexMode.isMixed()) {
MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
ZooKeeperLogPositionManager zooKeeperLogPositionManager = new ZooKeeperLogPositionManager(getZkclientx());
logPositionManager = new PeriodMixedLogPositionManager(memoryLogPositionManager,
zooKeeperLogPositionManager,
1000L);
} else if (indexMode.isMeta()) {
logPositionManager = new MetaLogPositionManager(metaManager);
} else if (indexMode.isMemoryMetaFailback()) {
MemoryLogPositionManager primary = new MemoryLogPositionManager();
MetaLogPositionManager secondary = new MetaLogPositionManager(metaManager);

logPositionManager = new FailbackLogPositionManager(primary, secondary);
} else {
throw new CanalException("unsupport indexMode for " + indexMode);
}

logger.info("init logPositionManager end! \n\t load CanalLogPositionManager:{}", logPositionManager.getClass()
.getName());

return logPositionManager;
}

protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
if (eventParser instanceof AbstractEventParser) {
AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
abstractEventParser.setAlarmHandler(getAlarmHandler());
}

super.startEventParserInternal(eventParser, isGroup);
}

private int getGroupSize() {
List<List<DataSourcing>> groupDbAddresses = parameters.getGroupDbAddresses();
if (!CollectionUtils.isEmpty(groupDbAddresses)) {
return groupDbAddresses.get(0).size();
} else {
// 可能是基于tddl的启动
return 1;
}
}

private synchronized ZkClientx getZkclientx() {
// 做一下排序,保证相同的机器只使用同一个链接
List<String> zkClusters = new ArrayList<>(parameters.getZkClusters());
Collections.sort(zkClusters);

return ZkClientx.getZkClient(StringUtils.join(zkClusters, ";"));
}

public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
this.alarmHandler = alarmHandler;
}

}

3.2 CanalInstanceWithSpring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CanalInstanceWithSpring extends AbstractCanalInstance {

private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);

public void start() {
logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
super.start();
}

// ======== setter ========

public void setDestination(String destination) {
this.destination = destination;
}

public void setEventParser(CanalEventParser eventParser) {
this.eventParser = eventParser;
}

public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {
this.eventSink = eventSink;
}

public void setEventStore(CanalEventStore<Event> eventStore) {
this.eventStore = eventStore;
}

public void setMetaManager(CanalMetaManager metaManager) {
this.metaManager = metaManager;
}

public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
this.alarmHandler = alarmHandler;
}

public void setMqConfig(CanalMQConfig mqConfig) {
this.mqConfig = mqConfig;
}

}

4. CanalConfigClient

4.1 CanalConfigClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CanalConfigClient {

/**
* 根据对应的destinantion查询Canal信息
*/
public Canal findCanal(String destination) {
// TODO 根据自己的业务实现
throw new UnsupportedOperationException();
}

/**
* 根据对应的destinantion查询filter信息
*/
public String findFilter(String destination) {
// TODO 根据自己的业务实现
throw new UnsupportedOperationException();
}

}

4.2 PlainCanalConfigClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
public class PlainCanalConfigClient extends AbstractCanalLifeCycle implements CanalLifeCycle {

private final static Integer REQUEST_TIMEOUT = 5000;
private String configURL;
private String user;
private String passwd;
private HttpHelper httpHelper;
private String localIp;
private int adminPort;
private boolean autoRegister;
private String autoCluster;
private String name;

public PlainCanalConfigClient(String configURL, String user, String passwd, String localIp, int adminPort,
boolean autoRegister, String autoCluster, String name){
this(configURL, user, passwd, localIp, adminPort);
this.autoCluster = autoCluster;
this.autoRegister = autoRegister;
this.name = name;
}

public PlainCanalConfigClient(String configURL, String user, String passwd, String localIp, int adminPort){
this.configURL = configURL;
if (!StringUtils.startsWithIgnoreCase(configURL, "http")) {
this.configURL = "http://" + configURL;
} else {
this.configURL = configURL;
}
this.user = user;
this.passwd = passwd;
this.httpHelper = new HttpHelper();
if (StringUtils.isEmpty(localIp)) {
this.localIp = "127.0.0.1";// 本地测试用
} else {
this.localIp = localIp;
}
this.adminPort = adminPort;
}

/**
* 加载canal.properties文件
*
* @return 远程配置的properties
*/
public PlainCanal findServer(String md5) {
if (StringUtils.isEmpty(md5)) {
md5 = "";
}
String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5
+ "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);
return queryConfig(url);
}

/**
* 加载远程的instance.properties
*/
public PlainCanal findInstance(String destination, String md5) {
if (StringUtils.isEmpty(md5)) {
md5 = "";
}
String url = configURL + "/api/v1/config/instance_polling/" + UrlEscapers.urlPathSegmentEscaper().escape(destination) + "?md5=" + md5;
return queryConfig(url);
}

/**
* 返回需要运行的instance列表
*/
public String findInstances(String md5) {
if (StringUtils.isEmpty(md5)) {
md5 = "";
}
String url = configURL + "/api/v1/config/instances_polling?md5=" + md5 + "&ip=" + localIp + "&port="
+ adminPort;
ResponseModel<CanalConfig> config = doQuery(url);
if (config.data != null) {
return config.data.content;
} else {
return null;
}
}

private PlainCanal queryConfig(String url) {
try {
ResponseModel<CanalConfig> config = doQuery(url);
return processData(config.data);
} catch (Throwable e) {
throw new CanalException("load manager config failed.", e);
}
}

private ResponseModel<CanalConfig> doQuery(String url) {
Map<String, String> heads = new HashMap<>();
heads.put("user", user);
heads.put("passwd", passwd);
String response = httpHelper.get(url, heads, REQUEST_TIMEOUT);
ResponseModel<CanalConfig> resp = JSON.parseObject(response,
new TypeReference<ResponseModel<CanalConfig>>() {
});

if (!HttpHelper.REST_STATE_OK.equals(resp.code)) {
throw new CanalException("requestGet for canal config error: " + resp.message);
}

return resp;
}

private PlainCanal processData(CanalConfig config) throws IOException, NoSuchAlgorithmException {
Properties properties = new Properties();
String md5 = null;
String status = null;
if (config != null && StringUtils.isNotEmpty(config.content)) {
md5 = SecurityUtil.md5String(config.content);
status = config.status;
properties.load(new ByteArrayInputStream(config.content.getBytes(StandardCharsets.UTF_8)));
} else {
// null代表没有新配置变更
return null;
}

return new PlainCanal(properties, status, md5);
}

private static class ResponseModel<T> {

public Integer code;
public String message;
public T data;
}

private static class CanalConfig {

public String content;
public String status;

}
}

image-20250601121810989

1. CanalEventParser

1
2
3
4
5
6
7
8
9
/**
* 数据复制控制器
*
* @author jianghang 2012-6-21 下午04:03:25
* @version 1.0.0
*/
public interface CanalEventParser<EVENT> extends CanalLifeCycle {

}

2. AbstractEventParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
/**
* 抽象的EventParser, 最大化共用mysql/oracle版本的实现
*
* @author jianghang 2013-1-20 下午08:10:25
* @version 1.0.0
*/
public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

protected CanalLogPositionManager logPositionManager = null;
protected CanalEventSink<List<CanalEntry.Entry>> eventSink = null;
protected CanalEventFilter eventFilter = null;
protected CanalEventFilter eventBlackFilter = null;

// 字段过滤
protected String fieldFilter;
protected Map<String, List<String>> fieldFilterMap;
protected String fieldBlackFilter;
protected Map<String, List<String>> fieldBlackFilterMap;

private CanalAlarmHandler alarmHandler = null;

// 统计参数
protected AtomicBoolean profilingEnabled = new AtomicBoolean(false); // profile开关参数
protected AtomicLong receivedEventCount = new AtomicLong();
protected AtomicLong parsedEventCount = new AtomicLong();
protected AtomicLong consumedEventCount = new AtomicLong();
protected long parsingInterval = -1;
protected long processingInterval = -1;

// 认证信息
protected volatile AuthenticationInfo runningInfo;
protected String destination;

// binLogParser
protected BinlogParser binlogParser = null;

protected Thread parseThread = null;

protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error",
e);

protected EventTransactionBuffer transactionBuffer;
protected int transactionSize = 1024;
protected AtomicBoolean needTransactionPosition = new AtomicBoolean(false);
protected long lastEntryTime = 0L;
protected volatile boolean detectingEnable = true; // 是否开启心跳检查
protected Integer detectingIntervalInSeconds = 3; // 检测频率
protected volatile Timer timer;
protected TimerTask heartBeatTimerTask;
protected Throwable exception = null;

protected boolean isGTIDMode = false; // 是否是GTID模式
protected boolean parallel = true; // 是否开启并行解析模式
protected Integer parallelThreadSize = Runtime.getRuntime()
.availableProcessors() * 60 / 100; // 60%的能力跑解析,剩余部分处理网络
protected int parallelBufferSize = 256; // 必须为2的幂
protected MultiStageCoprocessor multiStageCoprocessor;
protected ParserExceptionHandler parserExceptionHandler;
protected long serverId;

protected abstract BinlogParser buildParser();

protected abstract ErosaConnection buildErosaConnection();

protected abstract MultiStageCoprocessor buildMultiStageCoprocessor();

protected abstract EntryPosition findStartPosition(ErosaConnection connection) throws IOException;

protected void preDump(ErosaConnection connection) {
}

protected boolean processTableMeta(EntryPosition position) {
return true;
}

protected void afterDump(ErosaConnection connection) {
}

public void sendAlarm(String destination, String msg) {
if (this.alarmHandler != null) {
this.alarmHandler.sendAlarm(destination, msg);
}
}

public AbstractEventParser(){
// 初始化一下
transactionBuffer = new EventTransactionBuffer(transaction -> {
boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
if (!running) {
return;
}

if (!successed) {
throw new CanalParseException("consume failed!");
}

LogPosition position = buildLastTransactionPosition(transaction);
if (position != null) { // 可能position为空
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
}
});
}

public void start() {
super.start();
MDC.put("destination", destination);
// 配置transaction buffer
// 初始化缓冲队列
transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小
transactionBuffer.start();
// 构造bin log parser
binlogParser = buildParser();// 初始化一下BinLogParser
binlogParser.start();
// 启动工作线程
parseThread = new Thread(new Runnable() {

public void run() {
MDC.put("destination", String.valueOf(destination));
ErosaConnection erosaConnection = null;
boolean isMariaDB = false;
while (running) {
try {
// 开始执行replication
// 1. 构造Erosa连接
erosaConnection = buildErosaConnection();

// 2. 启动一个心跳线程
startHeartBeat(erosaConnection);

// 3. 执行dump前的准备工作
preDump(erosaConnection);

erosaConnection.connect();// 链接

long queryServerId = erosaConnection.queryServerId();
if (queryServerId != 0) {
serverId = queryServerId;
}

if (erosaConnection instanceof MysqlConnection) {
isMariaDB = ((MysqlConnection) erosaConnection).isMariaDB();
}
// 4. 获取最后的位置信息
long start = System.currentTimeMillis();
logger.warn("---> begin to find start position, it will be long time for reset or first position");
EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
if (startPosition == null) {
throw new PositionNotFoundException("can't find start position for " + destination);
}

if (!processTableMeta(startPosition)) {
throw new CanalParseException("can't find init table meta for " + destination
+ " with position : " + startPosition);
}
long end = System.currentTimeMillis();
logger.warn("---> find start position successfully, {}", startPosition.toString() + " cost : "
+ (end - start)
+ "ms , the next step is binlog dump");
// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();

final SinkFunction sinkHandler = new SinkFunction<EVENT>() {

private LogPosition lastPosition;

public boolean sink(EVENT event) {
try {
CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false);

if (!running) {
return false;
}

if (entry != null) {
exception = null; // 有正常数据流过,清空exception
transactionBuffer.add(entry);
// 记录一下对应的positions
this.lastPosition = buildLastPosition(entry);
// 记录一下最后一次有数据的时间
lastEntryTime = System.currentTimeMillis();
}
return running;
} catch (TableIdNotFoundException e) {
throw e;
} catch (Throwable e) {
if (e.getCause() instanceof TableIdNotFoundException) {
throw (TableIdNotFoundException) e.getCause();
}
// 记录一下,出错的位点信息
processSinkError(e,
this.lastPosition,
startPosition.getJournalName(),
startPosition.getPosition());
throw new CanalParseException(e); // 继续抛出异常,让上层统一感知
}
}

};

// 4. 开始dump数据
if (parallel) {
// build stage processor
multiStageCoprocessor = buildMultiStageCoprocessor();
if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
GTIDSet gtidSet = parseGtidSet(startPosition.getGtid(), isMariaDB);
((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
multiStageCoprocessor.start();
erosaConnection.dump(gtidSet, multiStageCoprocessor);
} else {
multiStageCoprocessor.start();
if (StringUtils.isEmpty(startPosition.getJournalName())
&& startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), multiStageCoprocessor);
} else {
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
multiStageCoprocessor);
}
}
} else {
if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
erosaConnection.dump(parseGtidSet(startPosition.getGtid(), isMariaDB), sinkHandler);
} else {
if (StringUtils.isEmpty(startPosition.getJournalName())
&& startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
} else {
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
sinkHandler);
}
}
}
} catch (TableIdNotFoundException e) {
exception = e;
// 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap
// Event时间没解析过
needTransactionPosition.compareAndSet(false, true);
logger.error(String.format("dump address %s has an error, retrying. caused by ",
runningInfo.getAddress().toString()), e);
} catch (Throwable e) {
processDumpError(e);
exception = e;
if (!running) {
if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
throw new CanalParseException(String.format("dump address %s has an error, retrying. ",
runningInfo.getAddress().toString()), e);
}
} else {
logger.error(String.format("dump address %s has an error, retrying. caused by ",
runningInfo.getAddress().toString()), e);
sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
}
if (parserExceptionHandler != null) {
parserExceptionHandler.handle(e);
}
} finally {
// 重新置为中断状态
Thread.interrupted();
// 关闭一下链接
afterDump(erosaConnection);
try {
if (erosaConnection != null) {
erosaConnection.disconnect();
}
} catch (IOException e1) {
if (!running) {
throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ",
runningInfo.getAddress().toString()),
e1);
} else {
logger.error("disconnect address {} has an error, retrying., caused by ",
runningInfo.getAddress().toString(),
e1);
}
}
}
// 出异常了,退出sink消费,释放一下状态
eventSink.interrupt();
transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
binlogParser.reset();// 重新置位
if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
// 处理 RejectedExecutionException
try {
multiStageCoprocessor.stop();
} catch (Throwable t) {
logger.debug("multi processor rejected:", t);
}
}

if (running) {
// sleep一段时间再进行重试
try {
Thread.sleep(10000 + RandomUtils.nextInt(10000));
} catch (InterruptedException e) {
}
}
}
MDC.remove("destination");
}
});

parseThread.setUncaughtExceptionHandler(handler);
parseThread.setName(String.format("destination = %s , address = %s , EventParser",
destination,
runningInfo == null ? null : runningInfo.getAddress()));
parseThread.start();
}

public void stop() {
super.stop();

stopHeartBeat(); // 先停止心跳
parseThread.interrupt(); // 尝试中断
eventSink.interrupt();

if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
try {
multiStageCoprocessor.stop();
} catch (Throwable t) {
logger.debug("multi processor rejected:", t);
}
}

try {
parseThread.join();// 等待其结束
} catch (InterruptedException e) {
// ignore
}

if (binlogParser.isStart()) {
binlogParser.stop();
}
if (transactionBuffer.isStart()) {
transactionBuffer.stop();
}
}

protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
InterruptedException {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}

boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);

if (enabled) {
this.processingInterval = System.currentTimeMillis() - startTs;
}

if (consumedEventCount.incrementAndGet() < 0) {
consumedEventCount.set(0);
}

return result;
}

protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod, boolean isSeek) throws Exception {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}
CanalEntry.Entry event = binlogParser.parse(bod, isSeek);
if (enabled) {
this.parsingInterval = System.currentTimeMillis() - startTs;
}

if (parsedEventCount.incrementAndGet() < 0) {
parsedEventCount.set(0);
}
return event;
}

public Boolean getProfilingEnabled() {
return profilingEnabled.get();
}

protected LogPosition buildLastTransactionPosition(List<CanalEntry.Entry> entries) { // 初始化一下
for (int i = entries.size() - 1; i > 0; i--) {
CanalEntry.Entry entry = entries.get(i);
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {// 尽量记录一个事务做为position
return buildLastPosition(entry);
}
}

return null;
}

protected LogPosition buildLastPosition(CanalEntry.Entry entry) { // 初始化一下
LogPosition logPosition = new LogPosition();
EntryPosition position = new EntryPosition();
position.setJournalName(entry.getHeader().getLogfileName());
position.setPosition(entry.getHeader().getLogfileOffset());
position.setTimestamp(entry.getHeader().getExecuteTime());
// add serverId at 2016-06-28
position.setServerId(entry.getHeader().getServerId());
// set gtid
position.setGtid(entry.getHeader().getGtid());

logPosition.setPostion(position);

LogIdentity identity = new LogIdentity(runningInfo.getAddress(), -1L);
logPosition.setIdentity(identity);
return logPosition;
}

protected void processSinkError(Throwable e, LogPosition lastPosition, String startBinlogFile, Long startPosition) {
if (lastPosition != null) {
logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s]",
lastPosition.getPostion()),
e);
} else {
logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s,%s]",
startBinlogFile,
startPosition), e);
}
}

protected void processDumpError(Throwable e) {
// do nothing
}

protected void startHeartBeat(ErosaConnection connection) {
lastEntryTime = 0L; // 初始化
if (timer == null) {// lazy初始化一下
String name = String.format("destination = %s , address = %s , HeartBeatTimeTask",
destination,
runningInfo == null ? null : runningInfo.getAddress().toString());
synchronized (AbstractEventParser.class) {
// synchronized (MysqlEventParser.class) {
// why use MysqlEventParser.class, u know, MysqlEventParser is
// the child class 4 AbstractEventParser,
// do this is ...
if (timer == null) {
timer = new Timer(name, true);
}
}
}

if (heartBeatTimerTask == null) {// fixed issue #56,避免重复创建heartbeat线程
heartBeatTimerTask = buildHeartBeatTimeTask(connection);
Integer interval = detectingIntervalInSeconds;
timer.schedule(heartBeatTimerTask, interval * 1000L, interval * 1000L);
logger.info("start heart beat.... ");
}
}

protected TimerTask buildHeartBeatTimeTask(ErosaConnection connection) {
return new TimerTask() {

public void run() {
try {
if (exception == null || lastEntryTime > 0) {
// 如果未出现异常,或者有第一条正常数据
long now = System.currentTimeMillis();
long inteval = (now - lastEntryTime) / 1000;
if (inteval >= detectingIntervalInSeconds) {
Header.Builder headerBuilder = Header.newBuilder();
headerBuilder.setExecuteTime(now);
Entry.Builder entryBuilder = Entry.newBuilder();
entryBuilder.setHeader(headerBuilder.build());
entryBuilder.setEntryType(EntryType.HEARTBEAT);
Entry entry = entryBuilder.build();
// 提交到sink中,目前不会提交到store中,会在sink中进行忽略
consumeTheEventAndProfilingIfNecessary(Arrays.asList(entry));
}
}

} catch (Throwable e) {
logger.warn("heartBeat run failed ", e);
}
}

};
}

protected void stopHeartBeat() {
lastEntryTime = 0L; // 初始化
if (timer != null) {
timer.cancel();
timer = null;
}
heartBeatTimerTask = null;
}

/**
* 解析字段过滤规则
*/
private Map<String, List<String>> parseFieldFilterMap(String config) {

Map<String, List<String>> map = new HashMap<>();

if (StringUtils.isNotBlank(config)) {
for (String filter : config.split(",")) {
if (StringUtils.isBlank(filter)) {
continue;
}

String[] filterConfig = filter.split(":");
if (filterConfig.length != 2) {
continue;
}

map.put(filterConfig[0].trim().toUpperCase(), Arrays.asList(filterConfig[1].trim().toUpperCase().split("/")));
}
}

return map;
}

public void setEventFilter(CanalEventFilter eventFilter) {
this.eventFilter = eventFilter;
}

public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
this.eventBlackFilter = eventBlackFilter;
}

public Long getParsedEventCount() {
return parsedEventCount.get();
}

public Long getConsumedEventCount() {
return consumedEventCount.get();
}

public void setProfilingEnabled(boolean profilingEnabled) {
this.profilingEnabled = new AtomicBoolean(profilingEnabled);
}

public long getParsingInterval() {
return parsingInterval;
}

public long getProcessingInterval() {
return processingInterval;
}

public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {
this.eventSink = eventSink;
}

public void setDestination(String destination) {
this.destination = destination;
}

public void setBinlogParser(BinlogParser binlogParser) {
this.binlogParser = binlogParser;
}

public BinlogParser getBinlogParser() {
return binlogParser;
}

public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
this.alarmHandler = alarmHandler;
}

public CanalAlarmHandler getAlarmHandler() {
return this.alarmHandler;
}

public void setLogPositionManager(CanalLogPositionManager logPositionManager) {
this.logPositionManager = logPositionManager;
}

public void setTransactionSize(int transactionSize) {
this.transactionSize = transactionSize;
}

public CanalLogPositionManager getLogPositionManager() {
return logPositionManager;
}

public void setDetectingEnable(boolean detectingEnable) {
this.detectingEnable = detectingEnable;
}

public void setDetectingIntervalInSeconds(Integer detectingIntervalInSeconds) {
this.detectingIntervalInSeconds = detectingIntervalInSeconds;
}

public Throwable getException() {
return exception;
}

public boolean isGTIDMode() {
return isGTIDMode;
}

public void setIsGTIDMode(boolean isGTIDMode) {
this.isGTIDMode = isGTIDMode;
}

public boolean isParallel() {
return parallel;
}

public void setParallel(boolean parallel) {
this.parallel = parallel;
}

public int getParallelThreadSize() {
return parallelThreadSize;
}

public void setParallelThreadSize(Integer parallelThreadSize) {
if (parallelThreadSize != null) {
this.parallelThreadSize = parallelThreadSize;
}
}

public Integer getParallelBufferSize() {
return parallelBufferSize;
}

public void setParallelBufferSize(int parallelBufferSize) {
this.parallelBufferSize = parallelBufferSize;
}

public ParserExceptionHandler getParserExceptionHandler() {
return parserExceptionHandler;
}

public void setParserExceptionHandler(ParserExceptionHandler parserExceptionHandler) {
this.parserExceptionHandler = parserExceptionHandler;
}

public long getServerId() {
return serverId;
}

public void setServerId(long serverId) {
this.serverId = serverId;
}

public String getFieldFilter() {
return fieldFilter;
}

public void setFieldFilter(String fieldFilter) {
this.fieldFilter = fieldFilter.trim();
this.fieldFilterMap = parseFieldFilterMap(fieldFilter);
}

public String getFieldBlackFilter() {
return fieldBlackFilter;
}

public void setFieldBlackFilter(String fieldBlackFilter) {
this.fieldBlackFilter = fieldBlackFilter;
this.fieldBlackFilterMap = parseFieldFilterMap(fieldBlackFilter);
}

/**
* 获取表字段过滤规则
* @return
* key: schema.tableName
* value: 字段列表
*/
public Map<String, List<String>> getFieldFilterMap() {
return fieldFilterMap;
}

/**
* 获取表字段过滤规则黑名单
* @return
* key: schema.tableName
* value: 字段列表
*/
public Map<String, List<String>> getFieldBlackFilterMap() {
return fieldBlackFilterMap;
}
}

2.1 BinlogParser

1
2
3
4
5
6
7
8
9
10
11
/**
* 解析binlog的接口
*
* @author: yuanzu Date: 12-9-20 Time: 下午8:46
*/
public interface BinlogParser<T> extends CanalLifeCycle {

CanalEntry.Entry parse(T event, boolean isSeek) throws CanalParseException;

void reset();
}

2.1.1 AbstractBinlogParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class AbstractBinlogParser<T> extends AbstractCanalLifeCycle implements BinlogParser<T> {

public void reset() {
}

public Entry parse(T event, TableMeta tableMeta) throws CanalParseException {
return null;
}

public Entry parse(T event) throws CanalParseException {
return null;
}

public void stop() {
reset();
super.stop();
}

}

2.1.2 LogEventConvert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
/**
* 基于{@linkplain LogEvent}转化为Entry对象的处理
*
* @author jianghang 2013-1-17 下午02:41:14
* @version 1.0.0
*/
public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogParser<LogEvent> {

public static final String XA_XID = "XA_XID";
public static final String XA_TYPE = "XA_TYPE";
public static final String XA_START = "XA START";
public static final String XA_END = "XA END";
public static final String XA_COMMIT = "XA COMMIT";
public static final String XA_ROLLBACK = "XA ROLLBACK";
public static final String ISO_8859_1 = "ISO-8859-1";
public static final String UTF_8 = "UTF-8";
public static final int TINYINT_MAX_VALUE = 256;
public static final int SMALLINT_MAX_VALUE = 65536;
public static final int MEDIUMINT_MAX_VALUE = 16777216;
public static final long INTEGER_MAX_VALUE = 4294967296L;
public static final BigInteger BIGINT_MAX_VALUE = new BigInteger("18446744073709551616");
public static final int version = 1;
public static final String BEGIN = "BEGIN";
public static final String COMMIT = "COMMIT";
public static final Logger logger = LoggerFactory.getLogger(LogEventConvert.class);

private volatile AviaterRegexFilter nameFilter; // 运行时引用可能会有变化,比如规则发生变化时
private volatile AviaterRegexFilter nameBlackFilter;
private Map<String, List<String>> fieldFilterMap = new HashMap<>();
private Map<String, List<String>> fieldBlackFilterMap = new HashMap<>();

private TableMetaCache tableMetaCache;
private Charset charset = Charset.defaultCharset();
private boolean filterQueryDcl = false;
private boolean filterQueryDml = false;
private boolean filterQueryDdl = false;
// 是否跳过table相关的解析异常,比如表不存在或者列数量不匹配,issue 92
private boolean filterTableError = false;
// 新增rows过滤,用于仅订阅除rows以外的数据
private boolean filterRows = false;
private boolean useDruidDdlFilter = true;

public LogEventConvert(){

}

@Override
public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
if (logEvent == null || logEvent instanceof UnknownLogEvent) {
return null;
}

int eventType = logEvent.getHeader().getType();
switch (eventType) {
case LogEvent.QUERY_EVENT:
return parseQueryEvent((QueryLogEvent) logEvent, isSeek);
case LogEvent.XID_EVENT:
return parseXidEvent((XidLogEvent) logEvent);
case LogEvent.TABLE_MAP_EVENT:
parseTableMapEvent((TableMapLogEvent) logEvent);
break;
case LogEvent.WRITE_ROWS_EVENT_V1:
case LogEvent.WRITE_ROWS_EVENT:
return parseRowsEvent((WriteRowsLogEvent) logEvent);
case LogEvent.UPDATE_ROWS_EVENT_V1:
case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
case LogEvent.UPDATE_ROWS_EVENT:
return parseRowsEvent((UpdateRowsLogEvent) logEvent);
case LogEvent.DELETE_ROWS_EVENT_V1:
case LogEvent.DELETE_ROWS_EVENT:
return parseRowsEvent((DeleteRowsLogEvent) logEvent);
case LogEvent.ROWS_QUERY_LOG_EVENT:
return parseRowsQueryEvent((RowsQueryLogEvent) logEvent);
case LogEvent.ANNOTATE_ROWS_EVENT:
return parseAnnotateRowsEvent((AnnotateRowsEvent) logEvent);
case LogEvent.USER_VAR_EVENT:
return parseUserVarLogEvent((UserVarLogEvent) logEvent);
case LogEvent.INTVAR_EVENT:
return parseIntrvarLogEvent((IntvarLogEvent) logEvent);
case LogEvent.RAND_EVENT:
return parseRandLogEvent((RandLogEvent) logEvent);
case LogEvent.GTID_LOG_EVENT:
return parseGTIDLogEvent((GtidLogEvent) logEvent);
case LogEvent.HEARTBEAT_LOG_EVENT:
return parseHeartbeatLogEvent((HeartbeatLogEvent) logEvent);
case LogEvent.HEARTBEAT_LOG_EVENT_V2:
return parseHeartbeatV2LogEvent((HeartbeatV2LogEvent) logEvent);
case LogEvent.GTID_EVENT:
case LogEvent.GTID_LIST_EVENT:
return parseMariaGTIDLogEvent(logEvent);
default:
break;
}

return null;
}

public void reset() {
// do nothing
if (tableMetaCache != null) {
tableMetaCache.clearTableMeta();
}
}

private Entry parseHeartbeatLogEvent(HeartbeatLogEvent logEvent) {
Header.Builder headerBuilder = Header.newBuilder();
headerBuilder.setEventType(EventType.MHEARTBEAT);
Entry.Builder entryBuilder = Entry.newBuilder();
entryBuilder.setHeader(headerBuilder.build());
entryBuilder.setEntryType(EntryType.HEARTBEAT);
return entryBuilder.build();
}

private Entry parseHeartbeatV2LogEvent(HeartbeatV2LogEvent logEvent) {
Header.Builder headerBuilder = Header.newBuilder();
headerBuilder.setEventType(EventType.MHEARTBEAT);
Entry.Builder entryBuilder = Entry.newBuilder();
entryBuilder.setHeader(headerBuilder.build());
entryBuilder.setEntryType(EntryType.HEARTBEAT);
return entryBuilder.build();
}

private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
LogHeader logHeader = logEvent.getHeader();
Pair.Builder builder = Pair.newBuilder();
builder.setKey("gtid");
builder.setValue(logEvent.getGtidStr());

if (logEvent.getLastCommitted() != -1) {
builder.setKey("lastCommitted");
builder.setValue(String.valueOf(logEvent.getLastCommitted()));
builder.setKey("sequenceNumber");
builder.setValue(String.valueOf(logEvent.getSequenceNumber()));
}

Header header = createHeader(logHeader, "", "", EventType.GTID);
return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
}

private Entry parseMariaGTIDLogEvent(LogEvent logEvent) {
LogHeader logHeader = logEvent.getHeader();
Pair.Builder builder = Pair.newBuilder();
builder.setKey("gtid");
if (logEvent instanceof MariaGtidLogEvent) {
builder.setValue(((MariaGtidLogEvent) logEvent).getGtidStr());
} else if (logEvent instanceof MariaGtidListLogEvent) {
builder.setValue(((MariaGtidListLogEvent) logEvent).getGtidStr());
}
Header header = createHeader(logHeader, "", "", EventType.GTID);
return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
}

private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
String queryString = event.getQuery();
if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
// xa start use TransactionBegin
TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
beginBuilder.setThreadId(event.getSessionId());
beginBuilder.addProps(createSpecialPair(XA_TYPE, XA_START));
beginBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_START)));
TransactionBegin transactionBegin = beginBuilder.build();
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
// xa start use TransactionEnd
TransactionEnd.Builder endBuilder = TransactionEnd.newBuilder();
endBuilder.setTransactionId(String.valueOf(0L));
endBuilder.addProps(createSpecialPair(XA_TYPE, XA_END));
endBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_END)));
TransactionEnd transactionEnd = endBuilder.build();
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
// xa commit
Header header = createHeader(event.getHeader(), "", "", EventType.XACOMMIT);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_COMMIT));
rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_COMMIT)));
rowChangeBuider.setEventType(EventType.XACOMMIT);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
// xa rollback
Header header = createHeader(event.getHeader(), "", "", EventType.XAROLLBACK);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_ROLLBACK));
rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_ROLLBACK)));
rowChangeBuider.setEventType(EventType.XAROLLBACK);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
} else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
} else if (StringUtils.endsWithIgnoreCase(queryString, COMMIT)) {
TransactionEnd transactionEnd = createTransactionEnd(0L); // MyISAM可能不会有xid事件
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
} else {
boolean notFilter = false;
EventType type = EventType.QUERY;
String tableName = null;
String schemaName = null;
if (useDruidDdlFilter) {
List<DdlResult> results = DruidDdlParser.parse(queryString, event.getDbName());
for (DdlResult result : results) {
if (!processFilter(queryString, result)) {
// 只要有一个数据不进行过滤
notFilter = true;
}
}
if (results.size() > 0) {
// 如果针对多行的DDL,只能取第一条
type = results.get(0).getType();
schemaName = results.get(0).getSchemaName();
tableName = results.get(0).getTableName();
}
} else {
DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
if (!processFilter(queryString, result)) {
notFilter = true;
}

type = result.getType();
schemaName = result.getSchemaName();
tableName = result.getTableName();
}

if (!notFilter) {
// 如果是过滤的数据就不处理了
return null;
}

boolean isDml = (type == EventType.INSERT || type == EventType.UPDATE || type == EventType.DELETE);

// filterQueryDdl=true的情况下,也得更新tablemeta
if (!isSeek && !isDml) {
// 使用新的表结构元数据管理方式
EntryPosition position = createPosition(event.getHeader());
tableMetaCache.apply(position, event.getDbName(), queryString, null);
}

if (filterQueryDdl) {
// 全部DDL过滤,那就忽略事件生成
return null;
}

Header header = createHeader(event.getHeader(), schemaName, tableName, type);
RowChange.Builder rowChangeBuilder = RowChange.newBuilder();
rowChangeBuilder.setIsDdl(!isDml);
rowChangeBuilder.setSql(queryString);
if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
rowChangeBuilder.setDdlSchemaName(event.getDbName());
}
rowChangeBuilder.setEventType(type);
return createEntry(header, EntryType.ROWDATA, rowChangeBuilder.build().toByteString());
}
}

private String getXaXid(String queryString, String type) {
return StringUtils.substringAfter(queryString, type);
}

private boolean processFilter(String queryString, DdlResult result) {
String schemaName = result.getSchemaName();
String tableName = result.getTableName();
// fixed issue https://github.com/alibaba/canal/issues/58
// 更新下table meta cache
if (tableMetaCache != null
&& (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE || result.getType() == EventType.RENAME)) {
// 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
for (DdlResult renameResult = result; renameResult != null; renameResult = renameResult.getRenameTableResult()) {
String schemaName0 = renameResult.getSchemaName();
String tableName0 = renameResult.getTableName();
if (StringUtils.isNotEmpty(tableName0)) {
// 如果解析到了正确的表信息,则根据全名进行清除
tableMetaCache.clearTableMeta(schemaName0, tableName0);
} else {
// 如果无法解析正确的表信息,则根据schema进行清除
tableMetaCache.clearTableMetaWithSchemaName(schemaName0);
}
}
}

// fixed issue https://github.com/alibaba/canal/issues/58
if (result.getType() == EventType.ALTER || result.getType() == EventType.ERASE
|| result.getType() == EventType.CREATE || result.getType() == EventType.TRUNCATE
|| result.getType() == EventType.RENAME || result.getType() == EventType.CINDEX
|| result.getType() == EventType.DINDEX) { // 针对DDL类型

if (!filterQueryDdl && (StringUtils.isEmpty(tableName)
|| (result.getType() == EventType.RENAME && StringUtils.isEmpty(result.getOriTableName())))) {
// 如果解析不出tableName,记录一下日志,方便bugfix,目前直接抛出异常,中断解析
throw new CanalParseException("SimpleDdlParser process query failed. pls submit issue with this queryString: "
+ queryString + " , and DdlResult: " + result.toString());
// return null;
} else {
// check name filter
String name = schemaName + "." + tableName;
if (nameFilter != null && !nameFilter.filter(name)) {
if (result.getType() == EventType.RENAME) {
// rename校验只要源和目标满足一个就进行操作
if (nameFilter != null
&& !nameFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
return true;
}
} else {
// 其他情况返回null
return true;
}
}

if (nameBlackFilter != null && nameBlackFilter.filter(name)) {
if (result.getType() == EventType.RENAME) {
// rename校验只要源和目标满足一个就进行操作
if (nameBlackFilter != null
&& nameBlackFilter.filter(result.getOriSchemaName() + "." + result.getOriTableName())) {
return true;
}
} else {
// 其他情况返回null
return true;
}
}
}
} else if (result.getType() == EventType.INSERT || result.getType() == EventType.UPDATE
|| result.getType() == EventType.DELETE) {
// 对外返回,保证兼容,还是返回QUERY类型,这里暂不解析tableName,所以无法支持过滤
if (filterQueryDml) {
return true;
}
} else if (filterQueryDcl) {
return true;
}

return false;
}

private Entry parseRowsQueryEvent(RowsQueryLogEvent event) {
if (filterQueryDml) {
return null;
}
// mysql5.6支持,需要设置binlog-rows-query-log-events=1,可详细打印原始DML语句
String queryString = null;
try {
queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset);
String tableName = null;
if (useDruidDdlFilter) {
List<DdlResult> results = DruidDdlParser.parse(queryString, null);
if (results.size() > 0) {
tableName = results.get(0).getTableName();
}
}

return buildQueryEntry(queryString, event.getHeader(), tableName);
} catch (UnsupportedEncodingException e) {
throw new CanalParseException(e);
}
}

private Entry parseAnnotateRowsEvent(AnnotateRowsEvent event) {
if (filterQueryDml) {
return null;
}
// mariaDb支持,需要设置binlog_annotate_row_events=true,可详细打印原始DML语句
String queryString = null;
try {
queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset);
return buildQueryEntry(queryString, event.getHeader());
} catch (UnsupportedEncodingException e) {
throw new CanalParseException(e);
}
}

private Entry parseUserVarLogEvent(UserVarLogEvent event) {
if (filterQueryDml) {
return null;
}

return buildQueryEntry(event.getQuery(), event.getHeader());
}

private Entry parseIntrvarLogEvent(IntvarLogEvent event) {
if (filterQueryDml) {
return null;
}

return buildQueryEntry(event.getQuery(), event.getHeader());
}

private Entry parseRandLogEvent(RandLogEvent event) {
if (filterQueryDml) {
return null;
}

return buildQueryEntry(event.getQuery(), event.getHeader());
}

private Entry parseXidEvent(XidLogEvent event) {
TransactionEnd transactionEnd = createTransactionEnd(event.getXid());
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
}

public TableMeta parseRowsEventForTableMeta(RowsLogEvent event) {
TableMapLogEvent table = event.getTable();
if (table == null) {
// tableId对应的记录不存在
throw new TableIdNotFoundException("not found tableId:" + event.getTableId());
}

boolean isHeartBeat = isAliSQLHeartBeat(table.getDbName(), table.getTableName());
boolean isRDSHeartBeat = tableMetaCache.isOnRDS() && isRDSHeartBeat(table.getDbName(), table.getTableName());

String fullname = table.getDbName() + "." + table.getTableName();
// check name filter
if (nameFilter != null && !nameFilter.filter(fullname)) {
return null;
}
if (nameBlackFilter != null && nameBlackFilter.filter(fullname)) {
return null;
}

// if (isHeartBeat || isRDSHeartBeat) {
// // 忽略rds模式的mysql.ha_health_check心跳数据
// return null;
// }
TableMeta tableMeta = null;
if (isRDSHeartBeat) {
// 处理rds模式的mysql.ha_health_check心跳数据
// 主要RDS的心跳表基本无权限,需要mock一个tableMeta
FieldMeta idMeta = new FieldMeta("id", "bigint(20)", true, false, "0");
FieldMeta typeMeta = new FieldMeta("type", "char(1)", false, true, "0");
tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
} else if (isHeartBeat) {
// 处理alisql模式的test.heartbeat心跳数据
// 心跳表基本无权限,需要mock一个tableMeta
FieldMeta idMeta = new FieldMeta("id", "smallint(6)", false, true, null);
FieldMeta typeMeta = new FieldMeta("ts", "int(11)", true, false, null);
tableMeta = new TableMeta(table.getDbName(), table.getTableName(), Arrays.asList(idMeta, typeMeta));
}

EntryPosition position = createPosition(event.getHeader());
if (tableMetaCache != null && tableMeta == null) {// 入错存在table meta
tableMeta = getTableMeta(table.getDbName(), table.getTableName(), true, position);
if (tableMeta == null) {
if (!filterTableError) {
throw new CanalParseException("not found [" + fullname + "] in db , pls check!");
}
}
}

return tableMeta;
}

public Entry parseRowsEvent(RowsLogEvent event) {
return parseRowsEvent(event, null);
}

public void parseTableMapEvent(TableMapLogEvent event) {
try {
String charsetDbName = new String(event.getDbName().getBytes(ISO_8859_1), charset);
event.setDbname(charsetDbName);

String charsetTbName = new String(event.getTableName().getBytes(ISO_8859_1), charset);
event.setTblname(charsetTbName);
} catch (UnsupportedEncodingException e) {
throw new CanalParseException(e);
}
}

public Entry parseRowsEvent(RowsLogEvent event, TableMeta tableMeta) {
if (filterRows) {
return null;
}
try {
if (tableMeta == null) { // 如果没有外部指定
tableMeta = parseRowsEventForTableMeta(event);
}

if (tableMeta == null) {
// 拿不到表结构,执行忽略
return null;
}

EventType eventType = null;
int type = event.getHeader().getType();
if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) {
eventType = EventType.INSERT;
} else if (LogEvent.UPDATE_ROWS_EVENT_V1 == type || LogEvent.UPDATE_ROWS_EVENT == type
|| LogEvent.PARTIAL_UPDATE_ROWS_EVENT == type) {
eventType = EventType.UPDATE;
} else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type) {
eventType = EventType.DELETE;
} else {
throw new CanalParseException("unsupport event type :" + event.getHeader().getType());
}

RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setTableId(event.getTableId());
rowChangeBuider.setIsDdl(false);

rowChangeBuider.setEventType(eventType);
RowsLogBuffer buffer = event.getRowsBuf(charset);
BitSet columns = event.getColumns();
BitSet changeColumns = event.getChangeColumns();

boolean tableError = false;
int rowsCount = 0;
while (buffer.nextOneRow(columns, false)) {
// 处理row记录
RowData.Builder rowDataBuilder = RowData.newBuilder();
if (EventType.INSERT == eventType) {
// insert的记录放在before字段中
tableError |= parseOneRow(rowDataBuilder, event, buffer, columns, true, tableMeta);
} else if (EventType.DELETE == eventType) {
// delete的记录放在before字段中
tableError |= parseOneRow(rowDataBuilder, event, buffer, columns, false, tableMeta);
} else {
// update需要处理before/after
tableError |= parseOneRow(rowDataBuilder, event, buffer, columns, false, tableMeta);
if (!buffer.nextOneRow(changeColumns, true)) {
rowChangeBuider.addRowDatas(rowDataBuilder.build());
break;
}

tableError |= parseOneRow(rowDataBuilder, event, buffer, changeColumns, true, tableMeta);
}

rowsCount++;
rowChangeBuider.addRowDatas(rowDataBuilder.build());
}
TableMapLogEvent table = event.getTable();
Header header = createHeader(event.getHeader(),
table.getDbName(),
table.getTableName(),
eventType,
rowsCount);

RowChange rowChange = rowChangeBuider.build();
if (tableError) {
Entry entry = createEntry(header, EntryType.ROWDATA, ByteString.EMPTY);
logger.warn("table parser error : {}storeValue: {}", entry.toString(), rowChange.toString());
return null;
} else {
Entry entry = createEntry(header, EntryType.ROWDATA, rowChange.toByteString());
return entry;
}
} catch (Exception e) {
throw new CanalParseException("parse row data failed.", e);
}
}

private EntryPosition createPosition(LogHeader logHeader) {
return new EntryPosition(logHeader.getLogFileName(), logHeader.getLogPos() - logHeader.getEventLen(), // startPos
logHeader.getWhen() * 1000L,
logHeader.getServerId()); // 记录到秒
}

private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
int columnCnt = event.getTable().getColumnCnt();
ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
// mysql8.0针对set @@global.binlog_row_metadata='FULL' 可以记录部分的metadata信息
boolean existOptionalMetaData = event.getTable().isExistOptionalMetaData();
boolean tableError = false;
// check table fileds count,只能处理加字段
boolean existRDSNoPrimaryKey = false;
// 获取字段过滤条件
List<String> fieldList = null;
List<String> blackFieldList = null;

if (tableMeta != null) {
fieldList = fieldFilterMap.get(tableMeta.getFullName().toUpperCase());
blackFieldList = fieldBlackFilterMap.get(tableMeta.getFullName().toUpperCase());
}

if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
if (tableMetaCache.isOnRDS() || tableMetaCache.isOnPolarX()) {
// 特殊处理下RDS的场景
List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
if (primaryKeys == null || primaryKeys.isEmpty()) {
if (columnInfo.length == tableMeta.getFields().size() + 1
&& columnInfo[columnInfo.length - 1].type == LogEvent.MYSQL_TYPE_LONGLONG) {
existRDSNoPrimaryKey = true;
}
}
}

EntryPosition position = createPosition(event.getHeader());
if (!existRDSNoPrimaryKey) {
// online ddl增加字段操作步骤:
// 1. 新增一张临时表,将需要做ddl表的数据全量导入
// 2. 在老表上建立I/U/D的trigger,增量的将数据插入到临时表
// 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
// 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
// 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false, position);// 强制重新获取一次
if (tableMeta == null) {
tableError = true;
if (!filterTableError) {
throw new CanalParseException("not found [" + event.getTable().getDbName() + "."
+ event.getTable().getTableName() + "] in db , pls check!");
}
}

// 在做一次判断
if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
tableError = true;
if (!filterTableError) {
throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName()
+ "," + columnInfo.length + " vs " + tableMeta.getFields().size());
}
}
// } else {
// logger.warn("[" + event.getTable().getDbName() + "." +
// event.getTable().getTableName()
// + "] is no primary key , skip alibaba_rds_row_id column");
}
}

for (int i = 0; i < columnCnt; i++) {
ColumnInfo info = columnInfo[i];
// mysql 5.6开始支持nolob/mininal类型,并不一定记录所有的列,需要进行判断
if (!cols.get(i)) {
continue;
}

if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
// 不解析最后一列
String rdsRowIdColumnName = "__#alibaba_rds_row_id#__";
if (tableMetaCache.isOnPolarX()) {
rdsRowIdColumnName = "_drds_implicit_id_";
}
buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false);
Column.Builder columnBuilder = Column.newBuilder();
columnBuilder.setName(rdsRowIdColumnName);
columnBuilder.setIsKey(true);
columnBuilder.setMysqlType("bigint");
columnBuilder.setIndex(i);
columnBuilder.setIsNull(false);
Serializable value = buffer.getValue();
columnBuilder.setValue(value.toString());
columnBuilder.setSqlType(Types.BIGINT);
columnBuilder.setUpdated(false);

if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
if (isAfter) {
rowDataBuilder.addAfterColumns(columnBuilder.build());
} else {
rowDataBuilder.addBeforeColumns(columnBuilder.build());
}
}
continue;
}

FieldMeta fieldMeta = null;
if (tableMeta != null && !tableError) {
// 处理file meta
fieldMeta = tableMeta.getFields().get(i);
}

if (fieldMeta != null && existOptionalMetaData && tableMetaCache.isOnTSDB()) {
// check column info
boolean check = StringUtils.equalsIgnoreCase(fieldMeta.getColumnName(), info.name);
check &= (fieldMeta.isUnsigned() == info.unsigned);
check &= (fieldMeta.isNullable() == info.nullable);

if (!check) {
throw new CanalParseException("MySQL8.0 unmatch column metadata & pls submit issue , table : "
+ tableMeta.getFullName() + ", db fieldMeta : "
+ fieldMeta.toString() + " , binlog fieldMeta : " + info.toString()
+ " , on : " + event.getHeader().getLogFileName() + ":"
+ (event.getHeader().getLogPos() - event.getHeader().getEventLen()));
}
}

Column.Builder columnBuilder = Column.newBuilder();
if (fieldMeta != null) {
columnBuilder.setName(fieldMeta.getColumnName());
columnBuilder.setIsKey(fieldMeta.isKey());
// 增加mysql type类型,issue 73
columnBuilder.setMysqlType(fieldMeta.getColumnType());
} else if (existOptionalMetaData) {
columnBuilder.setName(info.name);
columnBuilder.setIsKey(info.pk);
// mysql8.0里没有mysql type类型
// columnBuilder.setMysqlType(fieldMeta.getColumnType());
}
columnBuilder.setIndex(i);
columnBuilder.setIsNull(false);

// fixed issue
// https://github.com/alibaba/canal/issues/66,特殊处理binary/varbinary,不能做编码处理
boolean isBinary = false;
if (fieldMeta != null) {
if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "VARBINARY")) {
isBinary = true;
} else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "BINARY")) {
isBinary = true;
}
}

buffer.nextValue(columnBuilder.getName(), i, info.type, info.meta, isBinary);
int javaType = buffer.getJavaType();
if (buffer.isNull()) {
columnBuilder.setIsNull(true);

// 处理各种类型
switch (javaType) {
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:

// https://github.com/alibaba/canal/issues/4652
// mysql binlog中blob/text都处理为blob类型,需要反查table
// meta,按编码解析text
if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
javaType = Types.CLOB;
} else {
javaType = Types.BLOB;
}
break;
}
} else {
final Serializable value = buffer.getValue();
// 处理各种类型
switch (javaType) {
case Types.INTEGER:
case Types.TINYINT:
case Types.SMALLINT:
case Types.BIGINT:
// 处理unsigned类型
Number number = (Number) value;
boolean isUnsigned = (fieldMeta != null ? fieldMeta.isUnsigned() : (existOptionalMetaData ? info.unsigned : false));
if (isUnsigned && number.longValue() < 0) {
switch (buffer.getLength()) {
case 1: /* MYSQL_TYPE_TINY */
columnBuilder.setValue(String.valueOf(Integer.valueOf(TINYINT_MAX_VALUE
+ number.intValue())));
javaType = Types.SMALLINT; // 往上加一个量级
break;

case 2: /* MYSQL_TYPE_SHORT */
columnBuilder.setValue(String.valueOf(Integer.valueOf(SMALLINT_MAX_VALUE
+ number.intValue())));
javaType = Types.INTEGER; // 往上加一个量级
break;

case 3: /* MYSQL_TYPE_INT24 */
columnBuilder.setValue(String.valueOf(Integer.valueOf(MEDIUMINT_MAX_VALUE
+ number.intValue())));
javaType = Types.INTEGER; // 往上加一个量级
break;

case 4: /* MYSQL_TYPE_LONG */
columnBuilder.setValue(String.valueOf(Long.valueOf(INTEGER_MAX_VALUE
+ number.longValue())));
javaType = Types.BIGINT; // 往上加一个量级
break;

case 8: /* MYSQL_TYPE_LONGLONG */
columnBuilder.setValue(BIGINT_MAX_VALUE.add(BigInteger.valueOf(number.longValue()))
.toString());
javaType = Types.DECIMAL; // 往上加一个量级,避免执行出错
break;
}
} else {
// 对象为number类型,直接valueof即可
columnBuilder.setValue(String.valueOf(value));
}
break;
case Types.REAL: // float
case Types.DOUBLE: // double
// 对象为number类型,直接valueof即可
columnBuilder.setValue(String.valueOf(value));
break;
case Types.BIT:// bit
// 对象为number类型
columnBuilder.setValue(String.valueOf(value));
break;
case Types.DECIMAL:
columnBuilder.setValue(((BigDecimal) value).toPlainString());
break;
case Types.TIMESTAMP:
// 修复时间边界值
// String v = value.toString();
// v = v.substring(0, v.length() - 2);
// columnBuilder.setValue(v);
// break;
case Types.TIME:
case Types.DATE:
// 需要处理year
columnBuilder.setValue(value.toString());
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
// fixed text encoding
// https://github.com/AlibabaTech/canal/issues/18
// mysql binlog中blob/text都处理为blob类型,需要反查table
// meta,按编码解析text
if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
columnBuilder.setValue(new String((byte[]) value, charset));
javaType = Types.CLOB;
} else {
// byte数组,直接使用iso-8859-1保留对应编码,浪费内存
columnBuilder.setValue(new String((byte[]) value, ISO_8859_1));
// columnBuilder.setValueBytes(ByteString.copyFrom((byte[])
// value));
javaType = Types.BLOB;
}
break;
case Types.CHAR:
case Types.VARCHAR:
columnBuilder.setValue(value.toString());
break;
default:
columnBuilder.setValue(value.toString());
}
}

columnBuilder.setSqlType(javaType);
// 设置是否update的标记位
columnBuilder.setUpdated(isAfter
&& isUpdate(rowDataBuilder.getBeforeColumnsList(),
columnBuilder.getIsNull() ? null : columnBuilder.getValue(),
i));
if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
if (isAfter) {
rowDataBuilder.addAfterColumns(columnBuilder.build());
} else {
rowDataBuilder.addBeforeColumns(columnBuilder.build());
}
}
}

return tableError;

}

private Entry buildQueryEntry(String queryString, LogHeader logHeader, String tableName) {
Header header = createHeader(logHeader, "", tableName, EventType.QUERY);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.setEventType(EventType.QUERY);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
}

private Entry buildQueryEntry(String queryString, LogHeader logHeader) {
Header header = createHeader(logHeader, "", "", EventType.QUERY);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.setEventType(EventType.QUERY);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
}

private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType) {
return createHeader(logHeader, schemaName, tableName, eventType, -1);
}

private Header createHeader(LogHeader logHeader, String schemaName, String tableName, EventType eventType,
Integer rowsCount) {
// header会做信息冗余,方便以后做检索或者过滤
Header.Builder headerBuilder = Header.newBuilder();
headerBuilder.setVersion(version);
headerBuilder.setLogfileName(logHeader.getLogFileName());
// 记录的是该binlog的start offest
headerBuilder.setLogfileOffset(logHeader.getLogPos() - logHeader.getEventLen());
headerBuilder.setServerId(logHeader.getServerId());
headerBuilder.setServerenCode(UTF_8);// 经过java输出后所有的编码为unicode
headerBuilder.setExecuteTime(logHeader.getWhen() * 1000L);
headerBuilder.setSourceType(Type.MYSQL);
if (eventType != null) {
headerBuilder.setEventType(eventType);
}
if (schemaName != null) {
headerBuilder.setSchemaName(schemaName);
}
if (tableName != null) {
headerBuilder.setTableName(tableName);
}
headerBuilder.setEventLength(logHeader.getEventLen());
// enable gtid position
if (StringUtils.isNotEmpty(logHeader.getGtidSetStr())) {
headerBuilder.setGtid(logHeader.getGtidSetStr());
}
// add current gtid
if (StringUtils.isNotEmpty(logHeader.getCurrentGtid())) {
Pair pair = createSpecialPair("curtGtid", logHeader.getCurrentGtid());
headerBuilder.addProps(pair);
}
// add current gtid sequence no
if (StringUtils.isNotEmpty(logHeader.getCurrentGtidSn())) {
Pair pair = createSpecialPair("curtGtidSn", logHeader.getCurrentGtidSn());
headerBuilder.addProps(pair);
}

// add current gtid last committed
if (StringUtils.isNotEmpty(logHeader.getCurrentGtidLastCommit())) {
Pair pair = createSpecialPair("curtGtidLct", logHeader.getCurrentGtidLastCommit());
headerBuilder.addProps(pair);
}

// add rowsCount suppport
if (rowsCount > 0) {
Pair pair = createSpecialPair("rowsCount", String.valueOf(rowsCount));
headerBuilder.addProps(pair);
}
return headerBuilder.build();
}

private boolean isUpdate(List<Column> bfColumns, String newValue, int index) {
if (bfColumns == null) {
throw new CanalParseException("ERROR ## the bfColumns is null");
}

if (index < 0) {
return false;
}

for (Column column : bfColumns) {
if (column.getIndex() == index) {// 比较before / after的column index
if (column.getIsNull() && newValue == null) {
// 如果全是null
return false;
} else if (newValue != null && (!column.getIsNull() && column.getValue().equals(newValue))) {
// fixed issue #135, old column is Null
// 如果不为null,并且相等
return false;
}
}
}

// 比如nolob/minial模式下,可能找不到before记录,认为是有变化
return true;
}

private TableMeta getTableMeta(String dbName, String tbName, boolean useCache, EntryPosition position) {
try {
return tableMetaCache.getTableMeta(dbName, tbName, useCache, position);
} catch (Throwable e) {
String message = ExceptionUtils.getRootCauseMessage(e);
if (filterTableError) {
if (StringUtils.contains(message, "errorNumber=1146") && StringUtils.contains(message, "doesn't exist")) {
return null;
} else if (StringUtils.contains(message, "errorNumber=1142")
&& StringUtils.contains(message, "command denied")) {
return null;
}
}

throw new CanalParseException(e);
}
}

private boolean isText(String columnType) {
return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
|| "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
}

private boolean isAliSQLHeartBeat(String schema, String table) {
return "test".equalsIgnoreCase(schema) && "heartbeat".equalsIgnoreCase(table);
}

private boolean isRDSHeartBeat(String schema, String table) {
return "mysql".equalsIgnoreCase(schema) && "ha_health_check".equalsIgnoreCase(table);
}

/**
* 字段过滤判断
*/
private boolean needField(List<String> fieldList, List<String> blackFieldList, String columnName) {
if (fieldList == null || fieldList.isEmpty()) {
return blackFieldList == null || blackFieldList.isEmpty()
|| !blackFieldList.contains(columnName.toUpperCase());
} else {
return fieldList.contains(columnName.toUpperCase());
}
}

public static TransactionBegin createTransactionBegin(long threadId) {
TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
beginBuilder.setThreadId(threadId);
return beginBuilder.build();
}

public static TransactionEnd createTransactionEnd(long transactionId) {
TransactionEnd.Builder endBuilder = TransactionEnd.newBuilder();
endBuilder.setTransactionId(String.valueOf(transactionId));
return endBuilder.build();
}

public static Pair createSpecialPair(String key, String value) {
Pair.Builder pairBuilder = Pair.newBuilder();
pairBuilder.setKey(key);
pairBuilder.setValue(value);
return pairBuilder.build();
}

public static Entry createEntry(Header header, EntryType entryType, ByteString storeValue) {
Entry.Builder entryBuilder = Entry.newBuilder();
entryBuilder.setHeader(header);
entryBuilder.setEntryType(entryType);
entryBuilder.setStoreValue(storeValue);
return entryBuilder.build();
}

public void setCharset(Charset charset) {
this.charset = charset;
}

public void setNameFilter(AviaterRegexFilter nameFilter) {
this.nameFilter = nameFilter;
logger.warn("--> init table filter : " + nameFilter.toString());
}

public void setNameBlackFilter(AviaterRegexFilter nameBlackFilter) {
this.nameBlackFilter = nameBlackFilter;
logger.warn("--> init table black filter : " + nameBlackFilter.toString());
}

public void setFieldFilterMap(Map<String, List<String>> fieldFilterMap) {
if (fieldFilterMap != null) {
this.fieldFilterMap = fieldFilterMap;
} else {
this.fieldFilterMap = new HashMap<>();
}

for (Map.Entry<String, List<String>> entry : this.fieldFilterMap.entrySet()) {
logger.warn("--> init field filter : " + entry.getKey() + "->" + entry.getValue());
}
}

public void setFieldBlackFilterMap(Map<String, List<String>> fieldBlackFilterMap) {
if (fieldBlackFilterMap != null) {
this.fieldBlackFilterMap = fieldBlackFilterMap;
} else {
this.fieldBlackFilterMap = new HashMap<>();
}

for (Map.Entry<String, List<String>> entry : this.fieldBlackFilterMap.entrySet()) {
logger.warn("--> init field black filter : " + entry.getKey() + "->" + entry.getValue());
}
}

public void setTableMetaCache(TableMetaCache tableMetaCache) {
this.tableMetaCache = tableMetaCache;
}

public void setFilterQueryDcl(boolean filterQueryDcl) {
this.filterQueryDcl = filterQueryDcl;
}

public void setFilterQueryDml(boolean filterQueryDml) {
this.filterQueryDml = filterQueryDml;
}

public void setFilterQueryDdl(boolean filterQueryDdl) {
this.filterQueryDdl = filterQueryDdl;
}

public void setFilterTableError(boolean filterTableError) {
this.filterTableError = filterTableError;
}

public void setFilterRows(boolean filterRows) {
this.filterRows = filterRows;
}

public void setUseDruidDdlFilter(boolean useDruidDdlFilter) {
this.useDruidDdlFilter = useDruidDdlFilter;
}
}

2.2 CanalLogPositionManager

1
2
3
4
5
6
7
8
9
10
/**
* Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
*/
public interface CanalLogPositionManager extends CanalLifeCycle {

LogPosition getLatestIndexBy(String destination);

void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException;

}

2.2.1 AbstractLogPositionManager

1
2
3
4
5
/**
* Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
*/
public abstract class AbstractLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
}

2.2.2 MemoryLogPositionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
*/
public class MemoryLogPositionManager extends AbstractLogPositionManager {

private Map<String, LogPosition> positions;

@Override
public void start() {
super.start();
positions = new MapMaker().makeMap();
}

@Override
public void stop() {
super.stop();
positions.clear();
}

@Override
public LogPosition getLatestIndexBy(String destination) {
return positions.get(destination);
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
positions.put(destination, logPosition);
}

public Set<String> destinations() {
return positions.keySet();
}

}

2.2.3 ZooKeeperLogPositionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com
*/
public class ZooKeeperLogPositionManager extends AbstractLogPositionManager {

private final ZkClientx zkClientx;

public ZooKeeperLogPositionManager(ZkClientx zkClient){
if (zkClient == null) {
throw new NullPointerException("null zkClient");
}

this.zkClientx = zkClient;
}

@Override
public LogPosition getLatestIndexBy(String destination) {
String path = ZookeeperPathUtils.getParsePath(destination);
byte[] data = zkClientx.readData(path, true);
if (data == null || data.length == 0) {
return null;
}

return JsonUtils.unmarshalFromByte(data, LogPosition.class);
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
String path = ZookeeperPathUtils.getParsePath(destination);
byte[] data = JsonUtils.marshalToByte(logPosition);
try {
zkClientx.writeData(path, data);
} catch (ZkNoNodeException e) {
zkClientx.createPersistent(path, data, true);
}
}

}

2.2.4 MetaLogPositionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com
*/
public class MetaLogPositionManager extends AbstractLogPositionManager {

private final static Logger logger = LoggerFactory.getLogger(MetaLogPositionManager.class);

private final CanalMetaManager metaManager;

public MetaLogPositionManager(CanalMetaManager metaManager){
if (metaManager == null) {
throw new NullPointerException("null metaManager");
}

this.metaManager = metaManager;
}

@Override
public void stop() {
super.stop();

if (metaManager.isStart()) {
metaManager.stop();
}
}

@Override
public void start() {
super.start();

if (!metaManager.isStart()) {
metaManager.start();
}
}

@Override
public LogPosition getLatestIndexBy(String destination) {
List<ClientIdentity> clientIdentities = metaManager.listAllSubscribeInfo(destination);
LogPosition result = null;
if (!CollectionUtils.isEmpty(clientIdentities)) {
// 尝试找到一个最小的logPosition
for (ClientIdentity clientIdentity : clientIdentities) {
LogPosition position = (LogPosition) metaManager.getCursor(clientIdentity);
if (position == null) {
continue;
}

if (result == null) {
result = position;
} else {
result = CanalEventUtils.min(result, position);
}
}
}

return result;
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
// do nothing
logger.info("destination [{}] persist LogPosition:{}", destination, logPosition);
}

2.2.5 MixedLogPositionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* Created by yinxiu on 17/3/17. Email: marklin.hz@gmail.com Memory first.
* Asynchronous commit position info to ZK.
*/
public class MixedLogPositionManager extends AbstractLogPositionManager {

private final Logger logger = LoggerFactory.getLogger(MixedLogPositionManager.class);

private final MemoryLogPositionManager memoryLogPositionManager;
private final ZooKeeperLogPositionManager zooKeeperLogPositionManager;

private final ExecutorService executor;

public MixedLogPositionManager(ZkClientx zkClient){
if (zkClient == null) {
throw new NullPointerException("null zkClient");
}

this.memoryLogPositionManager = new MemoryLogPositionManager();
this.zooKeeperLogPositionManager = new ZooKeeperLogPositionManager(zkClient);

this.executor = Executors.newFixedThreadPool(1);
}

@Override
public void start() {
super.start();

if (!memoryLogPositionManager.isStart()) {
memoryLogPositionManager.start();
}

if (!zooKeeperLogPositionManager.isStart()) {
zooKeeperLogPositionManager.start();
}
}

@Override
public void stop() {
super.stop();

executor.shutdown();
zooKeeperLogPositionManager.stop();
memoryLogPositionManager.stop();
}

@Override
public LogPosition getLatestIndexBy(String destination) {
LogPosition logPosition = memoryLogPositionManager.getLatestIndexBy(destination);
if (logPosition != null) {
return logPosition;
}
logPosition = zooKeeperLogPositionManager.getLatestIndexBy(destination);
// 这里保持和重构前的逻辑一致,重新添加到Memory中
if (logPosition != null) {
memoryLogPositionManager.persistLogPosition(destination, logPosition);
}
return logPosition;
}

@Override
public void persistLogPosition(final String destination, final LogPosition logPosition) throws CanalParseException {
memoryLogPositionManager.persistLogPosition(destination, logPosition);
executor.submit(() -> {
try {
zooKeeperLogPositionManager.persistLogPosition(destination, logPosition);
} catch (Exception e) {
logger.error("ERROR # persist to zookeeper has an error", e);
}
});
}
}

2.2.6 PeriodMixedLogPositionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com
*/
public class PeriodMixedLogPositionManager extends AbstractLogPositionManager {

private static final Logger logger = LoggerFactory.getLogger(PeriodMixedLogPositionManager.class);

private MemoryLogPositionManager memoryLogPositionManager;
private ZooKeeperLogPositionManager zooKeeperLogPositionManager;
private ScheduledExecutorService executorService;

private long period;
private Set<String> persistTasks;

@SuppressWarnings("serial")
private final LogPosition nullPosition = new LogPosition() {
};

public PeriodMixedLogPositionManager(MemoryLogPositionManager memoryLogPositionManager,
ZooKeeperLogPositionManager zooKeeperLogPositionManager, long period){
if (memoryLogPositionManager == null) {
throw new NullPointerException("null memoryLogPositionManager");
}

if (zooKeeperLogPositionManager == null) {
throw new NullPointerException("null zooKeeperLogPositionManager");
}

if (period <= 0) {
throw new IllegalArgumentException("period must be positive, given: " + period);
}

this.memoryLogPositionManager = memoryLogPositionManager;
this.zooKeeperLogPositionManager = zooKeeperLogPositionManager;
this.period = period;
this.persistTasks = Collections.synchronizedSet(new HashSet<>());
this.executorService = Executors.newScheduledThreadPool(1);
}

@Override
public void stop() {
super.stop();

if (zooKeeperLogPositionManager.isStart()) {
zooKeeperLogPositionManager.stop();
}

if (memoryLogPositionManager.isStart()) {
memoryLogPositionManager.stop();
}

executorService.shutdown();
}

@Override
public void start() {
super.start();

if (!memoryLogPositionManager.isStart()) {
memoryLogPositionManager.start();
}

if (!zooKeeperLogPositionManager.isStart()) {
zooKeeperLogPositionManager.start();
}

// 启动定时工作任务
executorService.scheduleAtFixedRate(() -> {
List<String> tasks = new ArrayList<>(persistTasks);
for (String destination : tasks) {
try {
// 定时将内存中的最新值刷到zookeeper中,多次变更只刷一次
zooKeeperLogPositionManager.persistLogPosition(destination, getLatestIndexBy(destination));
persistTasks.remove(destination);
} catch (Throwable e) {
// ignore
logger.error("period update" + destination + " curosr failed!", e);
}
}
}, period, period, TimeUnit.MILLISECONDS);
}

@Override
public LogPosition getLatestIndexBy(String destination) {
LogPosition logPosition = memoryLogPositionManager.getLatestIndexBy(destination);
if (logPosition == nullPosition) {
return null;
} else {
return logPosition;
}
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
persistTasks.add(destination);
memoryLogPositionManager.persistLogPosition(destination, logPosition);
}
}

2.2.7 FileMixedLogPositionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com 基于文件刷新的log
* position实现
*
* <pre>
* 策略:
* 1. 先写内存,然后定时刷新数据到File
* 2. 数据采取overwrite模式(只保留最后一次)
* </pre>
*/
public class FileMixedLogPositionManager extends AbstractLogPositionManager {

private final static Logger logger = LoggerFactory.getLogger(FileMixedLogPositionManager.class);
private final static Charset charset = Charset.forName("UTF-8");

private File dataDir;

private Map<String, File> dataFileCaches;

private ScheduledExecutorService executorService;

@SuppressWarnings("serial")
private final LogPosition nullPosition = new LogPosition() {
};

private MemoryLogPositionManager memoryLogPositionManager;

private long period;
private Set<String> persistTasks;

public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager){
if (dataDir == null) {
throw new NullPointerException("null dataDir");
}
if (period <= 0) {
throw new IllegalArgumentException("period must be positive, given: " + period);
}
if (memoryLogPositionManager == null) {
throw new NullPointerException("null memoryLogPositionManager");
}
this.dataDir = dataDir;
this.period = period;
this.memoryLogPositionManager = memoryLogPositionManager;

this.dataFileCaches = MigrateMap.makeComputingMap(this::getDataFile);

this.executorService = Executors.newScheduledThreadPool(1);
this.persistTasks = Collections.synchronizedSet(new HashSet<>());
}

@Override
public void start() {
super.start();

if (!dataDir.exists()) {
try {
FileUtils.forceMkdir(dataDir);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}

if (!dataDir.canRead() || !dataDir.canWrite()) {
throw new CanalMetaManagerException("dir[" + dataDir.getPath() + "] can not read/write");
}

if (!memoryLogPositionManager.isStart()) {
memoryLogPositionManager.start();
}

// 启动定时工作任务
executorService.scheduleAtFixedRate(() -> {
List<String> tasks = new ArrayList<>(persistTasks);
for (String destination : tasks) {
try {
// 定时将内存中的最新值刷到file中,多次变更只刷一次
flushDataToFile(destination);
persistTasks.remove(destination);
} catch (Throwable e) {
// ignore
logger.error("period update" + destination + " curosr failed!", e);
}
}
}, period, period, TimeUnit.MILLISECONDS);

}

@Override
public void stop() {
super.stop();

flushDataToFile();
executorService.shutdown();
memoryLogPositionManager.stop();
}

@Override
public LogPosition getLatestIndexBy(String destination) {
LogPosition logPosition = memoryLogPositionManager.getLatestIndexBy(destination);
if (logPosition != null) {
return logPosition;
}
logPosition = loadDataFromFile(dataFileCaches.get(destination));
if (logPosition == null) {
return nullPosition;
}
return logPosition;
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
persistTasks.add(destination);
memoryLogPositionManager.persistLogPosition(destination, logPosition);
}

// ============================ helper method ======================

private File getDataFile(String destination) {
File destinationMetaDir = new File(dataDir, destination);
if (!destinationMetaDir.exists()) {
try {
FileUtils.forceMkdir(destinationMetaDir);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}

String dataFileName = "parse.dat";
return new File(destinationMetaDir, dataFileName);
}

private void flushDataToFile() {
for (String destination : memoryLogPositionManager.destinations()) {
flushDataToFile(destination);
}
}

private void flushDataToFile(String destination) {
flushDataToFile(destination, dataFileCaches.get(destination));
}

private void flushDataToFile(String destination, File dataFile) {
LogPosition position = memoryLogPositionManager.getLatestIndexBy(destination);
if (position != null && position != nullPosition) {
String json = JsonUtils.marshalToString(position);
try {
FileUtils.writeStringToFile(dataFile, json);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}
}

private LogPosition loadDataFromFile(File dataFile) {
try {
if (!dataFile.exists()) {
return null;
}

String json = FileUtils.readFileToString(dataFile, charset);
return JsonUtils.unmarshalFromString(json, LogPosition.class);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}
}

2.2.8 FailbackLogPositionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* Created by yinxiu on 17/3/18. Email: marklin.hz@gmail.com
* 实现基于failover查找的机制完成meta的操作
*
* <pre>
* 应用场景:比如针对内存buffer,出现HA切换,先尝试从内存buffer区中找到lastest position,如果不存在才尝试找一下meta里消费的信息
* </pre>
*/
public class FailbackLogPositionManager extends AbstractLogPositionManager {

private final static Logger logger = LoggerFactory.getLogger(FailbackLogPositionManager.class);

private final CanalLogPositionManager primary;
private final CanalLogPositionManager secondary;

public FailbackLogPositionManager(CanalLogPositionManager primary, CanalLogPositionManager secondary){
if (primary == null) {
throw new NullPointerException("nul primary LogPositionManager");
}
if (secondary == null) {
throw new NullPointerException("nul secondary LogPositionManager");
}

this.primary = primary;
this.secondary = secondary;
}

@Override
public void start() {
super.start();

if (!primary.isStart()) {
primary.start();
}

if (!secondary.isStart()) {
secondary.start();
}
}

@Override
public void stop() {
super.stop();

if (secondary.isStart()) {
secondary.stop();
}

if (primary.isStart()) {
primary.stop();
}
}

@Override
public LogPosition getLatestIndexBy(String destination) {
LogPosition logPosition = primary.getLatestIndexBy(destination);
if (logPosition != null) {
return logPosition;
}
return secondary.getLatestIndexBy(destination);
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
try {
primary.persistLogPosition(destination, logPosition);
} catch (CanalParseException e) {
logger.warn("persistLogPosition use primary log position manager exception. destination: {}, logPosition: {}",
destination,
logPosition,
e);
secondary.persistLogPosition(destination, logPosition);
}
}
}

3. AbstractMysqlEventParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
public abstract class AbstractMysqlEventParser extends AbstractEventParser {

protected static final long BINLOG_START_OFFEST = 4L;

protected TableMetaTSDBFactory tableMetaTSDBFactory = new DefaultTableMetaTSDBFactory();
protected boolean enableTsdb = false;
protected String tsdbJdbcUrl;
protected String tsdbJdbcUserName;
protected String tsdbJdbcPassword;
protected int tsdbSnapshotInterval = 24;
protected int tsdbSnapshotExpire = 360;
protected String tsdbSpringXml;
protected TableMetaTSDB tableMetaTSDB;

// 编码信息
protected Charset connectionCharset = Charset.forName("UTF-8");
protected boolean filterQueryDcl = false;
protected boolean filterQueryDml = false;
protected boolean filterQueryDdl = false;
protected boolean filterRows = false;
protected boolean filterTableError = false;
protected boolean useDruidDdlFilter = true;

protected boolean filterDmlInsert = false;
protected boolean filterDmlUpdate = false;
protected boolean filterDmlDelete = false;
// instance received binlog bytes
protected final AtomicLong receivedBinlogBytes = new AtomicLong(0L);
private final AtomicLong eventsPublishBlockingTime = new AtomicLong(0L);

protected BinlogParser buildParser() {
LogEventConvert convert = new LogEventConvert();
if (eventFilter != null && eventFilter instanceof AviaterRegexFilter) {
convert.setNameFilter((AviaterRegexFilter) eventFilter);
}

if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexFilter) {
convert.setNameBlackFilter((AviaterRegexFilter) eventBlackFilter);
}

convert.setFieldFilterMap(getFieldFilterMap());
convert.setFieldBlackFilterMap(getFieldBlackFilterMap());

convert.setCharset(connectionCharset);
convert.setFilterQueryDcl(filterQueryDcl);
convert.setFilterQueryDml(filterQueryDml);
convert.setFilterQueryDdl(filterQueryDdl);
convert.setFilterRows(filterRows);
convert.setFilterTableError(filterTableError);
convert.setUseDruidDdlFilter(useDruidDdlFilter);
return convert;
}

public void setEventFilter(CanalEventFilter eventFilter) {
super.setEventFilter(eventFilter);

// 触发一下filter变更
if (eventFilter != null && eventFilter instanceof AviaterRegexFilter) {
if (binlogParser instanceof LogEventConvert) {
((LogEventConvert) binlogParser).setNameFilter((AviaterRegexFilter) eventFilter);
}

if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
}
}
}

public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
super.setEventBlackFilter(eventBlackFilter);

// 触发一下filter变更
if (eventBlackFilter != null && eventBlackFilter instanceof AviaterRegexFilter) {
if (binlogParser instanceof LogEventConvert) {
((LogEventConvert) binlogParser).setNameBlackFilter((AviaterRegexFilter) eventBlackFilter);
}

if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
}
}
}

@Override
public void setFieldFilter(String fieldFilter) {
super.setFieldFilter(fieldFilter);

// 触发一下filter变更
if (binlogParser instanceof LogEventConvert) {
((LogEventConvert) binlogParser).setFieldFilterMap(getFieldFilterMap());
}

if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
((DatabaseTableMeta) tableMetaTSDB).setFieldFilterMap(getFieldFilterMap());
}
}

@Override
public void setFieldBlackFilter(String fieldBlackFilter) {
super.setFieldBlackFilter(fieldBlackFilter);

// 触发一下filter变更
if (binlogParser instanceof LogEventConvert) {
((LogEventConvert) binlogParser).setFieldBlackFilterMap(getFieldBlackFilterMap());
}

if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
((DatabaseTableMeta) tableMetaTSDB).setFieldBlackFilterMap(getFieldBlackFilterMap());
}
}

/**
* 回滚到指定位点
*
* @param position
* @return
*/
protected boolean processTableMeta(EntryPosition position) {
if (tableMetaTSDB != null) {
if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0");
}

return tableMetaTSDB.rollback(position);
}

return true;
}

public void start() throws CanalParseException {
if (enableTsdb) {
if (tableMetaTSDB == null) {
synchronized (CanalEventParser.class) {
buildTableMetaTSDB(tsdbSpringXml);
}
}
}

super.start();
}

public void stop() throws CanalParseException {
if (enableTsdb) {
tableMetaTSDBFactory.destory(destination);
tableMetaTSDB = null;
}

super.stop();
}

protected synchronized void buildTableMetaTSDB(String tsdbSpringXml) {
if (tableMetaTSDB != null) {
return;
}

try {
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty("canal.instance.tsdb.url", tsdbJdbcUrl);
System.setProperty("canal.instance.tsdb.dbUsername", tsdbJdbcUserName);
System.setProperty("canal.instance.tsdb.dbPassword", tsdbJdbcPassword);
// 初始化
this.tableMetaTSDB = tableMetaTSDBFactory.build(destination, tsdbSpringXml);
} catch (Throwable e) {
logger.warn("failed to build TableMetaTSDB ",e);
throw new CanalParseException(e);
} finally {
// reset
Properties props = System.getProperties();
props.remove("canal.instance.tsdb.url");
props.remove("canal.instance.tsdb.dbUsername");
props.remove("canal.instance.tsdb.dbPassword");
}
}

protected MultiStageCoprocessor buildMultiStageCoprocessor() {
MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
parallelThreadSize,
(LogEventConvert) binlogParser,
transactionBuffer,
destination, filterDmlInsert, filterDmlUpdate, filterDmlDelete);
mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime);
return mysqlMultiStageCoprocessor;
}

// ============================ setter / getter =========================

public void setConnectionCharsetStd(Charset connectionCharset) {
this.connectionCharset = connectionCharset;
}

public void setConnectionCharset(String connectionCharset) {
if ("UTF8MB4".equalsIgnoreCase(connectionCharset)) {
connectionCharset = "UTF-8";
}

this.connectionCharset = Charset.forName(connectionCharset);
}

public void setFilterQueryDcl(boolean filterQueryDcl) {
this.filterQueryDcl = filterQueryDcl;
}

public void setFilterQueryDml(boolean filterQueryDml) {
this.filterQueryDml = filterQueryDml;
}

public void setFilterQueryDdl(boolean filterQueryDdl) {
this.filterQueryDdl = filterQueryDdl;
}

public void setFilterRows(boolean filterRows) {
this.filterRows = filterRows;
}

public void setFilterTableError(boolean filterTableError) {
this.filterTableError = filterTableError;
}

public boolean isUseDruidDdlFilter() {
return useDruidDdlFilter;
}

public void setUseDruidDdlFilter(boolean useDruidDdlFilter) {
this.useDruidDdlFilter = useDruidDdlFilter;
}

public boolean isFilterDmlInsert() {
return filterDmlInsert;
}

public void setFilterDmlInsert(boolean filterDmlInsert) {
this.filterDmlInsert = filterDmlInsert;
}

public boolean isFilterDmlUpdate() {
return filterDmlUpdate;
}

public void setFilterDmlUpdate(boolean filterDmlUpdate) {
this.filterDmlUpdate = filterDmlUpdate;
}

public boolean isFilterDmlDelete() {
return filterDmlDelete;
}

public void setFilterDmlDelete(boolean filterDmlDelete) {
this.filterDmlDelete = filterDmlDelete;
}

public void setEnableTsdb(boolean enableTsdb) {
this.enableTsdb = enableTsdb;
}

public void setTsdbSpringXml(String tsdbSpringXml) {
this.tsdbSpringXml = tsdbSpringXml;
}

public void setTableMetaTSDBFactory(TableMetaTSDBFactory tableMetaTSDBFactory) {
this.tableMetaTSDBFactory = tableMetaTSDBFactory;
}

public AtomicLong getEventsPublishBlockingTime() {
return this.eventsPublishBlockingTime;
}

public AtomicLong getReceivedBinlogBytes() {
return this.receivedBinlogBytes;
}

public int getTsdbSnapshotInterval() {
return tsdbSnapshotInterval;
}

public void setTsdbSnapshotInterval(int tsdbSnapshotInterval) {
this.tsdbSnapshotInterval = tsdbSnapshotInterval;
}

public int getTsdbSnapshotExpire() {
return tsdbSnapshotExpire;
}

public void setTsdbSnapshotExpire(int tsdbSnapshotExpire) {
this.tsdbSnapshotExpire = tsdbSnapshotExpire;
}

public void setTsdbJdbcUrl(String tsdbJdbcUrl) {
this.tsdbJdbcUrl = tsdbJdbcUrl;
}

public void setTsdbJdbcUserName(String tsdbJdbcUserName) {
this.tsdbJdbcUserName = tsdbJdbcUserName;
}

public void setTsdbJdbcPassword(String tsdbJdbcPassword) {
this.tsdbJdbcPassword = tsdbJdbcPassword;
}
}

4. MysqlEventParser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
/**
* 基于向mysql server复制binlog实现
*
* <pre>
* 1. 自身不控制mysql主备切换,由ha机制来控制. 比如接入tddl/cobar/自身心跳包成功率
* 2. 切换机制
* </pre>
*
* @author jianghang 2012-6-21 下午04:06:32
* @version 1.0.0
*/
public class MysqlEventParser extends AbstractMysqlEventParser implements CanalEventParser, CanalHASwitchable {

private CanalHAController haController = null;
private int defaultConnectionTimeoutInSeconds = 30;
private int receiveBufferSize = 64 * 1024;
private int sendBufferSize = 64 * 1024;
// 数据库信息
protected AuthenticationInfo masterInfo; // 主库
protected AuthenticationInfo standbyInfo; // 备库
// binlog信息
protected EntryPosition masterPosition;
protected EntryPosition standbyPosition;
private long slaveId; // 链接到mysql的slave
// 心跳检查信息
private String detectingSQL; // 心跳sql
private MysqlConnection metaConnection; // 查询meta信息的链接
private TableMetaCache tableMetaCache; // 对应meta
private int fallbackIntervalInSeconds = 60; // 切换回退时间
private BinlogFormat[] supportBinlogFormats; // 支持的binlogFormat,如果设置会执行强校验
private BinlogImage[] supportBinlogImages; // 支持的binlogImage,如果设置会执行强校验

// update by yishun.chen,特殊异常处理参数
private int dumpErrorCount = 0; // binlogDump失败异常计数
private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值
private boolean rdsOssMode = false;
private boolean autoResetLatestPosMode = false; // binlog被删除之后,自动按最新的数据订阅
private boolean multiStreamEnable; // support for polardbx binlog-x

protected ErosaConnection buildErosaConnection() {
return buildMysqlConnection(this.runningInfo);
}

protected void preDump(ErosaConnection connection) {
if (!(connection instanceof MysqlConnection)) {
throw new CanalParseException("Unsupported connection type : " + connection.getClass().getSimpleName());
}

if (binlogParser != null && binlogParser instanceof LogEventConvert) {
metaConnection = (MysqlConnection) connection.fork();
try {
metaConnection.connect();
} catch (IOException e) {
throw new CanalParseException(e);
}

if (supportBinlogFormats != null && supportBinlogFormats.length > 0) {
BinlogFormat format = ((MysqlConnection) metaConnection).getBinlogFormat();
boolean found = false;
for (BinlogFormat supportFormat : supportBinlogFormats) {
if (supportFormat != null && format == supportFormat) {
found = true;
break;
}
}

if (!found) {
throw new CanalParseException("Unsupported BinlogFormat " + format);
}
}

if (supportBinlogImages != null && supportBinlogImages.length > 0) {
BinlogImage image = ((MysqlConnection) metaConnection).getBinlogImage();
boolean found = false;
for (BinlogImage supportImage : supportBinlogImages) {
if (supportImage != null && image == supportImage) {
found = true;
break;
}
}

if (!found) {
throw new CanalParseException("Unsupported BinlogImage " + image);
}
}

if (tableMetaTSDB != null && tableMetaTSDB instanceof DatabaseTableMeta) {
((DatabaseTableMeta) tableMetaTSDB).setConnection(metaConnection);
((DatabaseTableMeta) tableMetaTSDB).setFilter(eventFilter);
((DatabaseTableMeta) tableMetaTSDB).setBlackFilter(eventBlackFilter);
((DatabaseTableMeta) tableMetaTSDB).setSnapshotInterval(tsdbSnapshotInterval);
((DatabaseTableMeta) tableMetaTSDB).setSnapshotExpire(tsdbSnapshotExpire);
((DatabaseTableMeta) tableMetaTSDB).init(destination);
}

tableMetaCache = new TableMetaCache(metaConnection, tableMetaTSDB);
((LogEventConvert) binlogParser).setTableMetaCache(tableMetaCache);
}
}

protected void afterDump(ErosaConnection connection) {
super.afterDump(connection);

if (connection == null) {
throw new CanalParseException("illegal connection is null");
}

if (!(connection instanceof MysqlConnection)) {
throw new CanalParseException("Unsupported connection type : " + connection.getClass().getSimpleName());
}

if (metaConnection != null) {
try {
metaConnection.disconnect();
} catch (IOException e) {
logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
.getAddress(), e);
}
}
}

public void start() throws CanalParseException {
if (runningInfo == null) { // 第一次链接主库
runningInfo = masterInfo;
}

super.start();
}

public void stop() throws CanalParseException {
if (metaConnection != null) {
try {
metaConnection.disconnect();
} catch (IOException e) {
logger.error("ERROR # disconnect meta connection for address:{}", metaConnection.getConnector()
.getAddress(), e);
}
}

if (tableMetaCache != null) {
tableMetaCache.clearTableMeta();
}

super.stop();
}

protected TimerTask buildHeartBeatTimeTask(ErosaConnection connection) {
if (!(connection instanceof MysqlConnection)) {
throw new CanalParseException("Unsupported connection type : " + connection.getClass().getSimpleName());
}

// 开始mysql心跳sql
if (detectingEnable && StringUtils.isNotBlank(detectingSQL)) {
return new MysqlDetectingTimeTask((MysqlConnection) connection.fork());
} else {
return super.buildHeartBeatTimeTask(connection);
}

}

protected void stopHeartBeat() {
TimerTask heartBeatTimerTask = this.heartBeatTimerTask;
super.stopHeartBeat();
if (heartBeatTimerTask != null && heartBeatTimerTask instanceof MysqlDetectingTimeTask) {
MysqlConnection mysqlConnection = ((MysqlDetectingTimeTask) heartBeatTimerTask).getMysqlConnection();
try {
mysqlConnection.disconnect();
} catch (IOException e) {
logger.error("ERROR # disconnect heartbeat connection for address:{}", mysqlConnection.getConnector()
.getAddress(), e);
}
}
}

/**
* 心跳信息
*
* @author jianghang 2012-7-6 下午02:50:15
* @version 1.0.0
*/
class MysqlDetectingTimeTask extends TimerTask {

private boolean reconnect = false;
private MysqlConnection mysqlConnection;

public MysqlDetectingTimeTask(MysqlConnection mysqlConnection){
this.mysqlConnection = mysqlConnection;
}

public void run() {
try {
if (reconnect) {
reconnect = false;
mysqlConnection.reconnect();
} else if (!mysqlConnection.isConnected()) {
mysqlConnection.connect();
}
long startTime = System.currentTimeMillis();

// 可能心跳sql为select 1
if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {
mysqlConnection.query(detectingSQL);
} else {
mysqlConnection.update(detectingSQL);
}

long costTime = System.currentTimeMillis() - startTime;
if (haController != null && haController instanceof HeartBeatCallback) {
((HeartBeatCallback) haController).onSuccess(costTime);
}
} catch (Throwable e) {
if (haController != null && haController instanceof HeartBeatCallback) {
((HeartBeatCallback) haController).onFailed(e);
}
reconnect = true;
logger.warn("connect failed by ", e);
}
}

public MysqlConnection getMysqlConnection() {
return mysqlConnection;
}
}

// 处理主备切换的逻辑
public void doSwitch() {
AuthenticationInfo newRunningInfo = (runningInfo.equals(masterInfo) ? standbyInfo : masterInfo);
this.doSwitch(newRunningInfo);
}

public void doSwitch(AuthenticationInfo newRunningInfo) {
// 1. 需要停止当前正在复制的过程
// 2. 找到新的position点
// 3. 重新建立链接,开始复制数据
// 切换ip
String alarmMessage = null;

if (this.runningInfo.equals(newRunningInfo)) {
alarmMessage = "same runingInfo switch again : " + runningInfo.getAddress().toString();
logger.warn(alarmMessage);
return;
}

if (newRunningInfo == null) {
alarmMessage = "no standby config, just do nothing, will continue try:"
+ runningInfo.getAddress().toString();
logger.warn(alarmMessage);
sendAlarm(destination, alarmMessage);
return;
} else {
stop();
alarmMessage = "try to ha switch, old:" + runningInfo.getAddress().toString() + ", new:"
+ newRunningInfo.getAddress().toString();
logger.warn(alarmMessage);
sendAlarm(destination, alarmMessage);
runningInfo = newRunningInfo;
start();
}
}

// =================== helper method =================

private MysqlConnection buildMysqlConnection(AuthenticationInfo runningInfo) {
MysqlConnection connection = new MysqlConnection(runningInfo.getAddress(),
runningInfo.getUsername(),
runningInfo.getPassword(),
runningInfo.getDefaultDatabaseName(),
runningInfo.getSslInfo());
connection.getConnector().setReceiveBufferSize(receiveBufferSize);
connection.getConnector().setSendBufferSize(sendBufferSize);
connection.getConnector().setSoTimeout(defaultConnectionTimeoutInSeconds * 1000);
connection.setCharset(connectionCharset);
connection.setReceivedBinlogBytes(receivedBinlogBytes);
// 随机生成slaveId
if (this.slaveId <= 0) {
this.slaveId = generateUniqueServerId();
}
connection.setSlaveId(this.slaveId);
return connection;
}

private final long generateUniqueServerId() {
try {
// a=`echo $masterip|cut -d\. -f1`
// b=`echo $masterip|cut -d\. -f2`
// c=`echo $masterip|cut -d\. -f3`
// d=`echo $masterip|cut -d\. -f4`
// #server_id=`expr $a \* 256 \* 256 \* 256 + $b \* 256 \* 256 + $c
// \* 256 + $d `
// #server_id=$b$c$d
// server_id=`expr $b \* 256 \* 256 + $c \* 256 + $d `
InetAddress localHost = InetAddress.getLocalHost();
byte[] addr = localHost.getAddress();
int salt = (destination != null) ? destination.hashCode() : 0;
return ((0x7f & salt) << 24) + ((0xff & (int) addr[1]) << 16) // NL
+ ((0xff & (int) addr[2]) << 8) // NL
+ (0xff & (int) addr[3]);
} catch (UnknownHostException e) {
throw new CanalParseException("Unknown host", e);
}
}

protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
if (isGTIDMode()) {
// GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instance配置中的
LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
if (logPosition != null) {
// 如果以前是非GTID模式,后来调整为了GTID模式,那么为了保持兼容,需要判断gtid是否为空
if (StringUtils.isNotEmpty(logPosition.getPostion().getGtid())) {
return logPosition.getPostion();
}
} else {
if (masterPosition != null && StringUtils.isNotEmpty(masterPosition.getGtid())) {
return masterPosition;
}
}
}

EntryPosition startPosition = findStartPositionInternal(connection);
if (needTransactionPosition.get()) {
logger.warn("prepare to find last position : {}", startPosition.toString());
Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
logger.warn("find new start Transaction Position , old : {} , new : {}",
startPosition.getPosition(),
preTransactionStartPosition);
startPosition.setPosition(preTransactionStartPosition);
}
needTransactionPosition.compareAndSet(true, false);
}
return startPosition;
}

protected EntryPosition findEndPosition(ErosaConnection connection) throws IOException {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
EntryPosition endPosition = findEndPosition(mysqlConnection);
return endPosition;
}

protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
final EntryPosition endPosition = findEndPosition(mysqlConnection);
if (tableMetaTSDB != null || isGTIDMode()) {
long startTimestamp = System.currentTimeMillis();
return findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
endPosition,
endPosition.getJournalName(),
true);
} else {
return endPosition;
}
}

protected EntryPosition findPositionWithMasterIdAndTimestamp(MysqlConnection connection, EntryPosition fixedPosition) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
if (tableMetaTSDB != null && (fixedPosition.getTimestamp() == null || fixedPosition.getTimestamp() <= 0)) {
// 使用一个未来极大的时间,基于位点进行定位
long startTimestamp = System.currentTimeMillis() + 102L * 365 * 24 * 3600 * 1000; // 当前时间的未来102年
EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
fixedPosition,
fixedPosition.getJournalName(),
true);
if (entryPosition == null) {
throw new CanalParseException("[fixed timestamp] can't found begin/commit position before with fixed position "
+ fixedPosition.getJournalName() + ":" + fixedPosition.getPosition());
}
return entryPosition;
} else {
return fixedPosition;
}
}

protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
if (logPosition == null) {// 找不到历史成功记录
EntryPosition entryPosition = null;
if (masterInfo != null && mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) {
entryPosition = masterPosition;
} else if (standbyInfo != null
&& mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) {
entryPosition = standbyPosition;
}

if (entryPosition == null) {
entryPosition =
findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}

// 判断一下是否需要按时间订阅
if (StringUtils.isEmpty(entryPosition.getJournalName())) {
// 如果没有指定binlogName,尝试按照timestamp进行查找
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { "", "", entryPosition.getTimestamp() });
return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
} else {
logger.warn("prepare to find start position just show master status");
return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}
} else {
if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) {
// 如果指定binlogName + offest,直接返回
entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection, entryPosition);
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(),
entryPosition.getTimestamp() });
return entryPosition;
} else {
EntryPosition specificLogFilePosition = null;
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
// 如果指定binlogName +
// timestamp,但没有指定对应的offest,尝试根据时间找一下offest
EntryPosition endPosition = findEndPosition(mysqlConnection);
if (endPosition != null) {
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { entryPosition.getJournalName(), "", entryPosition.getTimestamp() });
specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
entryPosition.getTimestamp(),
endPosition,
entryPosition.getJournalName(),
true);
}
}

if (specificLogFilePosition == null) {
if (isRdsOssMode()) {
// 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
return null;
}
// position不存在,从文件头开始
entryPosition.setPosition(BINLOG_START_OFFEST);
return entryPosition;
} else {
return specificLogFilePosition;
}
}
}
} else {
if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {
if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) {
// binlog定位位点失败,可能有两个原因:
// 1. binlog位点被删除
// 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
&& logPosition.getPostion().getServerId() != null
&& !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
if (case2) {
EntryPosition findPosition = fallbackFindByStartTimestamp(logPosition, mysqlConnection);
dumpErrorCount = 0;
return findPosition;
}
// 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能
// 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog
// 丢失调到最新位点也即意味着数据丢失
if (isAutoResetLatestPosMode()) {
dumpErrorCount = 0;
return findEndPosition(mysqlConnection);
}
Long timestamp = logPosition.getPostion().getTimestamp();
if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
// 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
return null;
}
} else if (StringUtils.isBlank(logPosition.getPostion().getJournalName())
&& logPosition.getPostion().getPosition() <= 0
&& logPosition.getPostion().getTimestamp() > 0) {
return fallbackFindByStartTimestamp(logPosition, mysqlConnection);
}
// 其余情况
logger.warn("prepare to find start position just last position\n {}",
JsonUtils.marshalToString(logPosition));
return logPosition.getPostion();
} else {
// 针对切换的情况,考虑回退时间
long newStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by switch {}:{}:{}", new Object[] { "", "",
logPosition.getPostion().getTimestamp() });
return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
}
}
}

/**
* find position by timestamp with a fallback interval seconds.
*
* @param logPosition
* @param mysqlConnection
* @return
*/
protected EntryPosition fallbackFindByStartTimestamp(LogPosition logPosition, MysqlConnection mysqlConnection) {
long timestamp = logPosition.getPostion().getTimestamp();
long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
logPosition.getPostion().getTimestamp() });
return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
}

// 根据想要的position,可能这个position对应的记录为rowdata,需要找到事务头,避免丢数据
// 主要考虑一个事务执行时间可能会几秒种,如果仅仅按照timestamp相同,则可能会丢失事务的前半部分数据
private Long findTransactionBeginPosition(ErosaConnection mysqlConnection, final EntryPosition entryPosition)
throws IOException {
// 针对开始的第一条为非Begin记录,需要从该binlog扫描
final java.util.concurrent.atomic.AtomicLong preTransactionStartPosition = new java.util.concurrent.atomic.AtomicLong(0L);
mysqlConnection.reconnect();
mysqlConnection.seek(entryPosition.getJournalName(), 4L, entryPosition.getGtid(), new SinkFunction<LogEvent>() {

private LogPosition lastPosition;

public boolean sink(LogEvent event) {
try {
CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
if (entry == null) {
return true;
}

// 直接查询第一条业务数据,确认是否为事务Begin
// 记录一下transaction begin position
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
&& entry.getHeader().getLogfileOffset() < entryPosition.getPosition()) {
preTransactionStartPosition.set(entry.getHeader().getLogfileOffset());
}

if (entry.getHeader().getLogfileOffset() >= entryPosition.getPosition()) {
return false;// 退出
}

lastPosition = buildLastPosition(entry);
} catch (Exception e) {
processSinkError(e, lastPosition, entryPosition.getJournalName(), entryPosition.getPosition());
return false;
}

return running;
}
});

// 判断一下找到的最接近position的事务头的位置
if (preTransactionStartPosition.get() > entryPosition.getPosition()) {
logger.error("preTransactionEndPosition greater than startPosition from zk or localconf, maybe lost data");
throw new CanalParseException("preTransactionStartPosition greater than startPosition from zk or localconf, maybe lost data");
}
return preTransactionStartPosition.get();
}

// 根据时间查找binlog位置
private EntryPosition findByStartTimeStamp(MysqlConnection mysqlConnection, Long startTimestamp) {
EntryPosition endPosition = findEndPosition(mysqlConnection);
EntryPosition startPosition = findStartPosition(mysqlConnection);
String maxBinlogFileName = endPosition.getJournalName();
String minBinlogFileName = startPosition.getJournalName();
logger.info("show master status to set search end condition:{} ", endPosition);
String startSearchBinlogFile = endPosition.getJournalName();
boolean shouldBreak = false;
while (running && !shouldBreak) {
try {
EntryPosition entryPosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
endPosition,
startSearchBinlogFile,
false);
if (entryPosition == null) {
if (StringUtils.equalsIgnoreCase(minBinlogFileName, startSearchBinlogFile)) {
// 已经找到最早的一个binlog,没必要往前找了
shouldBreak = true;
logger.warn("Didn't find the corresponding binlog files from {} to {}",
minBinlogFileName,
maxBinlogFileName);
} else {
// 继续往前找
int binlogSeqNum = Integer.parseInt(startSearchBinlogFile.substring(startSearchBinlogFile.indexOf(".") + 1));
if (binlogSeqNum <= 1) {
logger.warn("Didn't find the corresponding binlog files");
shouldBreak = true;
} else {
int nextBinlogSeqNum = binlogSeqNum - 1;
String binlogFileNamePrefix = startSearchBinlogFile.substring(0,
startSearchBinlogFile.indexOf(".") + 1);
String binlogFileNameSuffix = String.format("%06d", nextBinlogSeqNum);
startSearchBinlogFile = binlogFileNamePrefix + binlogFileNameSuffix;
}
}
} else {
logger.info("found and return:{} in findByStartTimeStamp operation.", entryPosition);
return entryPosition;
}
} catch (Exception e) {
logger.warn(String.format("the binlogfile:%s doesn't exist, to continue to search the next binlogfile , caused by",
startSearchBinlogFile),
e);
int binlogSeqNum = Integer.parseInt(startSearchBinlogFile.substring(startSearchBinlogFile.indexOf(".") + 1));
if (binlogSeqNum <= 1) {
logger.warn("Didn't find the corresponding binlog files");
shouldBreak = true;
} else {
int nextBinlogSeqNum = binlogSeqNum - 1;
String binlogFileNamePrefix = startSearchBinlogFile.substring(0,
startSearchBinlogFile.indexOf(".") + 1);
String binlogFileNameSuffix = String.format("%06d", nextBinlogSeqNum);
startSearchBinlogFile = binlogFileNamePrefix + binlogFileNameSuffix;
}
}
}
// 找不到
return null;
}

/**
* 查询当前db的serverId信息
*/
private Long findServerId(MysqlConnection mysqlConnection) {
try {
ResultSetPacket packet = mysqlConnection.query("show variables like 'server_id'");
List<String> fields = packet.getFieldValues();
if (CollectionUtils.isEmpty(fields)) {
throw new CanalParseException("command : show variables like 'server_id' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
}
return Long.valueOf(fields.get(1));
} catch (IOException e) {
throw new CanalParseException("command : show variables like 'server_id' has an error!", e);
}
}

/**
* 查询当前的binlog位置
*/
private EntryPosition findEndPosition(MysqlConnection mysqlConnection) {
String showSql = "show master status";
try {
if (mysqlConnection.atLeastMySQL84()) {
// 8.4新语法
showSql = "show binary log status";
} else if (multiStreamEnable) {
// 兼容polardb-x的多流binlog
showSql = "show master status with " + destination;
}
ResultSetPacket packet = mysqlConnection.query(showSql);
List<String> fields = packet.getFieldValues();
if (CollectionUtils.isEmpty(fields)) {
throw new CanalParseException(
"command : '" + showSql + "' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
}
EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
if (isGTIDMode() && fields.size() > 4) {
endPosition.setGtid(fields.get(4));
}
// MariaDB 无法通过`show master status`获取 gtid
if (mysqlConnection.isMariaDB() && isGTIDMode()) {
ResultSetPacket gtidPacket = mysqlConnection.query("SELECT @@global.gtid_binlog_pos");
List<String> gtidFields = gtidPacket.getFieldValues();
if (!CollectionUtils.isEmpty(gtidFields) && gtidFields.size() > 0) {
endPosition.setGtid(gtidFields.get(0));
}
}
return endPosition;
} catch (IOException e) {
throw new CanalParseException("command : '" + showSql + "' has an error!", e);
}
}

/**
* 查询当前的binlog位置
*/
private EntryPosition findStartPosition(MysqlConnection mysqlConnection) {
try {
String showSql = "show binlog events limit 1";
if (multiStreamEnable) {
showSql = "show binlog events with " + destination + " limit 1";
}
ResultSetPacket packet = mysqlConnection.query(showSql);
List<String> fields = packet.getFieldValues();
if (CollectionUtils.isEmpty(fields)) {
throw new CanalParseException(
"command : 'show binlog events limit 1' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
}
EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
return endPosition;
} catch (IOException e) {
throw new CanalParseException("command : 'show binlog events limit 1' has an error!", e);
}

}

/**
* 查询当前的slave视图的binlog位置
*/
@SuppressWarnings("unused")
private SlaveEntryPosition findSlavePosition(MysqlConnection mysqlConnection) {
try {
String showSql = "show slave status";
if (mysqlConnection.atLeastMySQL84()) {
// 兼容mysql 8.4
showSql = "show replica status";
}
ResultSetPacket packet = mysqlConnection.query(showSql);
List<FieldPacket> names = packet.getFieldDescriptors();
List<String> fields = packet.getFieldValues();
if (CollectionUtils.isEmpty(fields)) {
return null;
}

int i = 0;
Map<String, String> maps = new HashMap<>(names.size(), 1f);
for (FieldPacket name : names) {
maps.put(name.getName(), fields.get(i));
i++;
}

String errno = maps.get("Last_Errno");
String slaveIORunning = maps.get("Slave_IO_Running"); // Slave_SQL_Running
String slaveSQLRunning = maps.get("Slave_SQL_Running"); // Slave_SQL_Running
if ((!"0".equals(errno)) || (!"Yes".equalsIgnoreCase(slaveIORunning))
|| (!"Yes".equalsIgnoreCase(slaveSQLRunning))) {
logger.warn("Ignoring failed slave: " + mysqlConnection.getConnector().getAddress() + ", Last_Errno = "
+ errno + ", Slave_IO_Running = " + slaveIORunning + ", Slave_SQL_Running = "
+ slaveSQLRunning);
return null;
}

String masterHost = maps.get("Master_Host");
String masterPort = maps.get("Master_Port");
String binlog = maps.get("Master_Log_File");
String position = maps.get("Exec_Master_Log_Pos");
return new SlaveEntryPosition(binlog, Long.valueOf(position), masterHost, masterPort);
} catch (IOException e) {
logger.error("find slave position error", e);
}

return null;
}

/**
* 根据给定的时间戳,在指定的binlog中找到最接近于该时间戳(必须是小于时间戳)的一个事务起始位置。
* 针对最后一个binlog会给定endPosition,避免无尽的查询
*/
private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
final Long startTimestamp,
final EntryPosition endPosition,
final String searchBinlogFile,
final Boolean justForPositionTimestamp) {

final LogPosition logPosition = new LogPosition();
try {
mysqlConnection.reconnect();
// 开始遍历文件
mysqlConnection.seek(searchBinlogFile, 4L, endPosition.getGtid(), new SinkFunction<LogEvent>() {

private LogPosition lastPosition;

public boolean sink(LogEvent event) {
EntryPosition entryPosition = null;
try {
CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, true);
if (justForPositionTimestamp && logPosition.getPostion() == null && event.getWhen() > 0) {
// 初始位点
entryPosition = new EntryPosition(searchBinlogFile,
event.getLogPos() - event.getEventLen(),
event.getWhen() * 1000,
event.getServerId());
entryPosition.setGtid(event.getHeader().getGtidSetStr());
logPosition.setPostion(entryPosition);
}

// 直接用event的位点来处理,解决一个binlog文件里没有任何事件导致死循环无法退出的问题
String logfilename = event.getHeader().getLogFileName();
// 记录的是binlog end offest,
// 因为与其对比的offest是show master status里的end offest
Long logfileoffset = event.getHeader().getLogPos();
Long logposTimestamp = event.getHeader().getWhen() * 1000;
Long serverId = event.getHeader().getServerId();

// 如果最小的一条记录都不满足条件,可直接退出
if (logposTimestamp >= startTimestamp) {
return false;
}

if (StringUtils.equals(endPosition.getJournalName(), logfilename)
&& endPosition.getPosition() <= logfileoffset) {
return false;
}

if (entry == null) {
return true;
}

// 记录一下上一个事务结束的位置,即下一个事务的position
// position = current +
// data.length,代表该事务的下一条offest,避免多余的事务重复
if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
if (logger.isDebugEnabled()) {
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
}
logPosition.setPostion(entryPosition);
entryPosition.setGtid(entry.getHeader().getGtid());
} else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
// 当前事务开始位点
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp, serverId);
if (logger.isDebugEnabled()) {
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
}
entryPosition.setGtid(entry.getHeader().getGtid());
logPosition.setPostion(entryPosition);
}

lastPosition = buildLastPosition(entry);
} catch (Throwable e) {
processSinkError(e, lastPosition, searchBinlogFile, 4L);
}

return running;
}
});

} catch (IOException e) {
logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error", e);
}

if (logPosition.getPostion() != null) {
return logPosition.getPostion();
} else {
return null;
}
}

@Override
protected void processDumpError(Throwable e) {
if (e instanceof IOException) {
String message = e.getMessage();
if (StringUtils.contains(message, "errno = 1236")) {
// 1236 errorCode代表ER_MASTER_FATAL_ERROR_READING_BINLOG
dumpErrorCount++;
}
}

super.processDumpError(e);
}

public void setSupportBinlogFormats(String formatStrs) {
String[] formats = StringUtils.split(formatStrs, ',');
if (formats != null) {
BinlogFormat[] supportBinlogFormats = new BinlogFormat[formats.length];
int i = 0;
for (String format : formats) {
supportBinlogFormats[i++] = BinlogFormat.valuesOf(format);
}

this.supportBinlogFormats = supportBinlogFormats;
}
}

public void setSupportBinlogImages(String imageStrs) {
String[] images = StringUtils.split(imageStrs, ',');
if (images != null) {
BinlogImage[] supportBinlogImages = new BinlogImage[images.length];
int i = 0;
for (String image : images) {
supportBinlogImages[i++] = BinlogImage.valuesOf(image);
}

this.supportBinlogImages = supportBinlogImages;
}
}

// ===================== setter / getter ========================

public void setDefaultConnectionTimeoutInSeconds(int defaultConnectionTimeoutInSeconds) {
this.defaultConnectionTimeoutInSeconds = defaultConnectionTimeoutInSeconds;
}

public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}

public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}

public void setMasterInfo(AuthenticationInfo masterInfo) {
this.masterInfo = masterInfo;
}

public void setStandbyInfo(AuthenticationInfo standbyInfo) {
this.standbyInfo = standbyInfo;
}

public void setMasterPosition(EntryPosition masterPosition) {
this.masterPosition = masterPosition;
}

public void setStandbyPosition(EntryPosition standbyPosition) {
this.standbyPosition = standbyPosition;
}

public void setSlaveId(long slaveId) {
this.slaveId = slaveId;
}

public void setDetectingSQL(String detectingSQL) {
this.detectingSQL = detectingSQL;
}

public void setDetectingIntervalInSeconds(Integer detectingIntervalInSeconds) {
this.detectingIntervalInSeconds = detectingIntervalInSeconds;
}

public void setDetectingEnable(boolean detectingEnable) {
this.detectingEnable = detectingEnable;
}

public void setFallbackIntervalInSeconds(int fallbackIntervalInSeconds) {
this.fallbackIntervalInSeconds = fallbackIntervalInSeconds;
}

public CanalHAController getHaController() {
return haController;
}

public void setHaController(CanalHAController haController) {
this.haController = haController;
}

public void setDumpErrorCountThreshold(int dumpErrorCountThreshold) {
this.dumpErrorCountThreshold = dumpErrorCountThreshold;
}

public boolean isRdsOssMode() {
return rdsOssMode;
}

public void setRdsOssMode(boolean rdsOssMode) {
this.rdsOssMode = rdsOssMode;
}

public void setDumpErrorCount(int dumpErrorCount) {
this.dumpErrorCount = dumpErrorCount;
}

public boolean isAutoResetLatestPosMode() {
return autoResetLatestPosMode;
}

public void setAutoResetLatestPosMode(boolean autoResetLatestPosMode) {
this.autoResetLatestPosMode = autoResetLatestPosMode;
}

public void setMultiStreamEnable(boolean multiStreamEnable) {
this.multiStreamEnable = multiStreamEnable;
}
}

5. RdsBinlogEventParserProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
* aliyun rds的binlog parser支持
*
* <pre>
* 注意点:aliyun的binlog会有定期清理并备份到oss上, 这里实现了一份自动下载oss+rds binlog的机制
* </pre>
*
* @author chengjin.lyf on 2018/7/20 上午10:52
* @since 1.0.25
*/
public class RdsBinlogEventParserProxy extends MysqlEventParser {

private String rdsOpenApiUrl = "https://rds.aliyuncs.com/"; // openapi地址
private String accesskey; // 云账号的ak
private String secretkey; // 云账号sk
private String instanceId; // rds实例id
private String directory; // binlog目录
private int batchFileSize = 4; // 最多下载的binlog文件数量

private RdsLocalBinlogEventParser rdsLocalBinlogEventParser = null;
private ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r,
"rds-binlog-daemon-thread");
t.setDaemon(true);
return t;
});

@Override
public void start() {
if (rdsLocalBinlogEventParser == null && StringUtils.isNotEmpty(accesskey) && StringUtils.isNotEmpty(secretkey)
&& StringUtils.isNotEmpty(instanceId)) {
rdsLocalBinlogEventParser = new RdsLocalBinlogEventParser();
// rds oss mode
setRdsOssMode(true);
final ParserExceptionHandler targetHandler = this.getParserExceptionHandler();
if (directory == null) {
directory = System.getProperty("java.io.tmpdir", "/tmp") + "/" + destination;
}
rdsLocalBinlogEventParser.setLogPositionManager(this.getLogPositionManager());
rdsLocalBinlogEventParser.setDestination(destination);
rdsLocalBinlogEventParser.setAlarmHandler(this.getAlarmHandler());
rdsLocalBinlogEventParser.setConnectionCharsetStd(this.connectionCharset);
rdsLocalBinlogEventParser.setEnableTsdb(this.enableTsdb);
rdsLocalBinlogEventParser.setEventBlackFilter(this.eventBlackFilter);
rdsLocalBinlogEventParser.setFilterQueryDcl(this.filterQueryDcl);
rdsLocalBinlogEventParser.setFilterQueryDdl(this.filterQueryDdl);
rdsLocalBinlogEventParser.setFilterQueryDml(this.filterQueryDml);
rdsLocalBinlogEventParser.setFilterRows(this.filterRows);
rdsLocalBinlogEventParser.setFilterTableError(this.filterTableError);
// rdsLocalBinlogEventParser.setIsGTIDMode(this.isGTIDMode);
rdsLocalBinlogEventParser.setMasterInfo(this.masterInfo);
rdsLocalBinlogEventParser.setEventFilter(this.eventFilter);
rdsLocalBinlogEventParser.setMasterPosition(this.masterPosition);
rdsLocalBinlogEventParser.setTransactionSize(this.transactionSize);
rdsLocalBinlogEventParser.setUrl(this.rdsOpenApiUrl);
rdsLocalBinlogEventParser.setAccesskey(this.accesskey);
rdsLocalBinlogEventParser.setSecretkey(this.secretkey);
rdsLocalBinlogEventParser.setInstanceId(this.instanceId);
rdsLocalBinlogEventParser.setEventSink(eventSink);
rdsLocalBinlogEventParser.setDirectory(directory);
rdsLocalBinlogEventParser.setBatchFileSize(batchFileSize);
rdsLocalBinlogEventParser.setParallel(this.parallel);
rdsLocalBinlogEventParser.setParallelBufferSize(this.parallelBufferSize);
rdsLocalBinlogEventParser.setParallelThreadSize(this.parallelThreadSize);
rdsLocalBinlogEventParser.setFinishListener(() -> executorService.execute(() -> {
rdsLocalBinlogEventParser.stop();
// empty the dump error count,or will go into local binlog mode again,with error
// position,never get out,fixed by bucketli
RdsBinlogEventParserProxy.this.setDumpErrorCount(0);
RdsBinlogEventParserProxy.this.start();
}));
this.setParserExceptionHandler(e -> {
handleMysqlParserException(e);
if (targetHandler != null) {
targetHandler.handle(e);
}
});
}

super.start();
}

private void handleMysqlParserException(Throwable throwable) {
if (throwable instanceof PositionNotFoundException) {
logger.info("remove rds not found position, try download rds binlog!");
executorService.execute(() -> {
try {
logger.info("stop mysql parser!");
RdsBinlogEventParserProxy rdsBinlogEventParserProxy = RdsBinlogEventParserProxy.this;
long serverId = rdsBinlogEventParserProxy.getServerId();
rdsLocalBinlogEventParser.setServerId(serverId);
rdsBinlogEventParserProxy.stop();
} catch (Throwable e) {
logger.info("handle exception failed", e);
}

try {
logger.info("start rds mysql binlog parser!");
rdsLocalBinlogEventParser.start();
} catch (Throwable e) {
logger.info("handle exception failed", e);
rdsLocalBinlogEventParser.stop();
RdsBinlogEventParserProxy rdsBinlogEventParserProxy = RdsBinlogEventParserProxy.this;
rdsBinlogEventParserProxy.start();// 继续重试
}
});
}
}

@Override
public void stop() {
super.stop();
}

@Override
public boolean isStart() {
return super.isStart();
}

public void setRdsOpenApiUrl(String rdsOpenApiUrl) {
this.rdsOpenApiUrl = rdsOpenApiUrl;
}

public void setAccesskey(String accesskey) {
this.accesskey = accesskey;
}

public void setSecretkey(String secretkey) {
this.secretkey = secretkey;
}

public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}

public void setDirectory(String directory) {
this.directory = directory;
}

public void setBatchFileSize(int batchFileSize) {
this.batchFileSize = batchFileSize;
}

}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.