image-20250521224825190

1. CanalEventSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* event事件消费者
*
* <pre>
* 1. 剥离filter/sink为独立的两个动作,方便在快速判断数据是否有效
* </pre>
*
* @author jianghang 2012-6-21 下午05:03:40
* @version 1.0.0
*/
public interface CanalEventSink<T> extends CanalLifeCycle {

/**
* 提交数据
*
* @param event
* @param remoteAddress
* @param destination
* @throws CanalSinkException
* @throws InterruptedException
*/
boolean sink(T event, InetSocketAddress remoteAddress, String destination) throws CanalSinkException,
InterruptedException;

/**
* 中断消费,比如解析模块发生了切换,想临时中断当前的merge请求,清理对应的上下文状态,可见{@linkplain GroupEventSink}
*/
void interrupt();

}

2. AbstractCanalEventSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @author jianghang 2012-7-23 下午01:02:45
*/
public abstract class AbstractCanalEventSink<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> {

protected CanalEventFilter filter;
protected List<CanalEventDownStreamHandler> handlers = new ArrayList<>();

public void setFilter(CanalEventFilter filter) {
this.filter = filter;
}

public void addHandler(CanalEventDownStreamHandler handler) {
this.handlers.add(handler);
}

public CanalEventDownStreamHandler getHandler(int index) {
return this.handlers.get(index);
}

public void addHandler(CanalEventDownStreamHandler handler, int index) {
this.handlers.add(index, handler);
}

public void removeHandler(int index) {
this.handlers.remove(index);
}

public void removeHandler(CanalEventDownStreamHandler handler) {
this.handlers.remove(handler);
}

public CanalEventFilter getFilter() {
return filter;
}

public List<CanalEventDownStreamHandler> getHandlers() {
return handlers;
}

public void interrupt() {
// do nothing
}

}

3. EntryEventSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/**
* mysql binlog数据对象输出
*
* @author jianghang 2012-7-4 下午03:23:16
* @version 1.0.0
*/
public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {

private static final Logger logger = LoggerFactory.getLogger(EntryEventSink.class);
private static final int maxFullTimes = 10;
private CanalEventStore<Event> eventStore;
protected boolean filterTransactionEntry = false; // 是否需要尽可能过滤事务头/尾
protected boolean filterEmtryTransactionEntry = true; // 是否需要过滤空的事务头/尾
protected long emptyTransactionInterval = 5 * 1000; // 空的事务输出的频率
protected long emptyTransctionThresold = 8192; // 超过8192个事务头,输出一个

protected volatile long lastTransactionTimestamp = 0L;
protected AtomicLong lastTransactionCount = new AtomicLong(0L);
protected volatile long lastEmptyTransactionTimestamp = 0L;
protected AtomicLong lastEmptyTransactionCount = new AtomicLong(0L);
protected AtomicLong eventsSinkBlockingTime = new AtomicLong(0L);
protected boolean raw;

public EntryEventSink(){
addHandler(new HeartBeatEntryEventHandler());
}

public void start() {
super.start();
Assert.notNull(eventStore);

if (eventStore instanceof MemoryEventStoreWithBuffer) {
this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw();
}

for (CanalEventDownStreamHandler handler : getHandlers()) {
if (!handler.isStart()) {
handler.start();
}
}
}

public void stop() {
super.stop();

for (CanalEventDownStreamHandler handler : getHandlers()) {
if (handler.isStart()) {
handler.stop();
}
}
}

public boolean filter(List<Entry> event, InetSocketAddress remoteAddress, String destination) {

return false;
}

public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
throws CanalSinkException,
InterruptedException {
return sinkData(entrys, remoteAddress);
}

private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)
throws InterruptedException {
boolean hasRowData = false;
boolean hasHeartBeat = false;
List<Event> events = new ArrayList<>();
for (CanalEntry.Entry entry : entrys) {
if (!doFilter(entry)) {
continue;
}

if (filterTransactionEntry
&& (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {
long currentTimestamp = entry.getHeader().getExecuteTime();
// 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold
&& Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {
continue;
} else {
// fixed issue https://github.com/alibaba/canal/issues/2616
// 主要原因在于空事务只发送了begin,没有同步发送commit信息,这里修改为只对commit事件做计数更新,确保begin/commit成对出现
if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
lastTransactionCount.set(0L);
lastTransactionTimestamp = currentTimestamp;
}
}
}

hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);
hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);
Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);
events.add(event);
}

if (hasRowData || hasHeartBeat) {
// 存在row记录 或者 存在heartbeat记录,直接跳给后续处理
return doSink(events);
} else {
// 需要过滤的数据
if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {
long currentTimestamp = events.get(0).getExecuteTime();
// 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常
if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval
|| lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {
lastEmptyTransactionCount.set(0L);
lastEmptyTransactionTimestamp = currentTimestamp;
return doSink(events);
}
}

// 直接返回true,忽略空的事务头和尾
return true;
}
}

protected boolean doFilter(CanalEntry.Entry entry) {
if (filter != null && entry.getEntryType() == EntryType.ROWDATA) {
String name = getSchemaNameAndTableName(entry);
boolean need = filter.filter(name);
if (!need) {
logger.debug("filter name[{}] entry : {}:{}",
name,
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset());
}

return need;
} else {
return true;
}
}

protected boolean doSink(List<Event> events) {
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.before(events);
}
long blockingStart = 0L;
int fullTimes = 0;
do {
if (eventStore.tryPut(events)) {
if (fullTimes > 0) {
eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
}
for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.after(events);
}
return true;
} else {
if (fullTimes == 0) {
blockingStart = System.nanoTime();
}
applyWait(++fullTimes);
if (fullTimes % 100 == 0) {
long nextStart = System.nanoTime();
eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
blockingStart = nextStart;
}
}

for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
events = handler.retry(events);
}

} while (running && !Thread.interrupted());
return false;
}

// 处理无数据的情况,避免空循环挂死
private void applyWait(int fullTimes) {
int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
if (fullTimes <= 3) { // 3次以内
Thread.yield();
} else { // 超过3次,最多只sleep 10ms
LockSupport.parkNanos(1000 * 1000L * newFullTimes);
}

}

private String getSchemaNameAndTableName(CanalEntry.Entry entry) {
return entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
}

public void setEventStore(CanalEventStore<Event> eventStore) {
this.eventStore = eventStore;
}

public void setFilterTransactionEntry(boolean filterTransactionEntry) {
this.filterTransactionEntry = filterTransactionEntry;
}

public void setFilterEmtryTransactionEntry(boolean filterEmtryTransactionEntry) {
this.filterEmtryTransactionEntry = filterEmtryTransactionEntry;
}

public void setEmptyTransactionInterval(long emptyTransactionInterval) {
this.emptyTransactionInterval = emptyTransactionInterval;
}

public void setEmptyTransctionThresold(long emptyTransctionThresold) {
this.emptyTransctionThresold = emptyTransctionThresold;
}

public AtomicLong getEventsSinkBlockingTime() {
return eventsSinkBlockingTime;
}

}

4. GroupEventSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 基于归并排序的sink处理
*
* <pre>
* 几点设计说明:
* 1. 多库合并时,需要控制不满足groupSize的条件,就会阻塞其他库的合并操作. (比如刚启动时会所有通道正常工作才开始合并,或者中间过程出现主备切换)
* 2. 库解析出现问题,但没有进行主备切换,此时需要通过{@linkplain CanalEventDownStreamHandler}进行定时监听合并数据的产生时间间隔
* a. 因为一旦库解析异常,就不会再sink数据,此时groupSize就会一直缺少,就会阻塞其他库的合并,也就是不会有数据写入到store中
* </pre>
*
* @author jianghang 2012-10-15 下午09:54:18
* @version 1.0.0
*/
public class GroupEventSink extends EntryEventSink {

private int groupSize;
private GroupBarrier barrier; // 归并排序需要预先知道组的大小,用于判断是否组内所有的sink都已经开始正常取数据

public GroupEventSink(){
this(1);
}

public GroupEventSink(int groupSize){
super();
this.groupSize = groupSize;
}

public void start() {
super.start();

if (filterTransactionEntry) {
barrier = new TimelineBarrier(groupSize);
} else {
barrier = new TimelineTransactionBarrier(groupSize);// 支持事务保留
}
}

protected boolean doSink(List<Event> events) {
int size = events.size();
for (int i = 0; i < events.size(); i++) {
Event event = events.get(i);
try {
barrier.await(event);// 进行timeline的归并调度处理
if (filterTransactionEntry) {
super.doSink(Arrays.asList(event));
} else if (i == size - 1) {
// 针对事务数据,只有到最后一条数据都通过后,才进行sink操作,保证原子性
// 同时批量sink,也要保证在最后一条数据释放状态之前写出数据,否则就有并发问题
return super.doSink(events);
}
} catch (InterruptedException e) {
return false;
} finally {
barrier.clear(event);
}
}

return false;
}

public void interrupt() {
super.interrupt();
barrier.interrupt();
}

}

5. GroupBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 针对group合并的barrier接口,控制多个sink操作的合并处理
*
* @author jianghang 2012-10-18 下午05:07:35
* @version 1.0.0
*/
public interface GroupBarrier<T> {

/**
* 判断当前的数据对象是否允许通过
*
* @param event
* @throws InterruptedException
*/
public void await(T event) throws InterruptedException;

/**
* 判断当前的数据对象是否允许通过,带超时控制
*
* @param event
* @param timeout
* @param unit
* @throws InterruptedException
* @throws TimeoutException
*/
public void await(T event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;

/**
* sink成功,清理对应barrier的状态
*/
public void clear(T event);

/**
* 出现切换,发起interrupt,清理对应的上下文
*/
public void interrupt();
}

6. TimelineBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* 时间归并控制
*
* <pre>
* 大致设计:
* 1. 多个队列都提交一个timestamp,判断出最小的一个timestamp做为通过的条件,然后唤醒<=该最小时间的线程通过
* 2. 只有当多个队列都提交了一个timestamp,缺少任何一个提交,都会阻塞其他队列通过。(解决当一个库启动过慢或者发生主备切换时出现延迟等问题)
*
* 存在一个假定,认为提交的timestamp是一个顺序递增,但是在两种case下会出现时间回退
* a. 大事务时,事务头的时间会晚于事务当中数据的时间,相当于出现一个时间回退
* b. 出现主备切换,从备机上发过来的数据会回退几秒钟
*
* </pre>
*
* @author jianghang 2012-10-15 下午10:01:53
* @version 1.0.0
*/
public class TimelineBarrier implements GroupBarrier<Event> {

protected int groupSize;
protected ReentrantLock lock = new ReentrantLock();
protected Condition condition = lock.newCondition();
protected volatile long threshold;
protected BlockingQueue<Long> lastTimestamps = new PriorityBlockingQueue<>(); // 当前通道最后一次single的时间戳

public TimelineBarrier(int groupSize){
this.groupSize = groupSize;
threshold = Long.MIN_VALUE;
}

/**
* 判断自己的timestamp是否可以通过
*
* @throws InterruptedException
*/
public void await(Event event) throws InterruptedException {
long timestamp = getTimestamp(event);
try {
lock.lockInterruptibly();
single(timestamp);
while (isPermit(event, timestamp) == false) {
condition.await();
}
} finally {
lock.unlock();
}
}

/**
* 判断自己的timestamp是否可以通过,带超时控制
*
* @throws InterruptedException
* @throws TimeoutException
*/
public void await(Event event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
long timestamp = getTimestamp(event);
try {
lock.lockInterruptibly();
single(timestamp);
while (isPermit(event, timestamp) == false) {
condition.await(timeout, unit);
}
} finally {
lock.unlock();
}
}

public void clear(Event event) {
// 出现中断有两种可能:
// 1.出现主备切换,需要剔除到Timeline中的时间占位(这样合并时就会小于groupSize,不满足调度条件,直到主备切换完成后才能重新开启合并处理)
// 2.出现关闭操作,退出即可
lastTimestamps.remove(getTimestamp(event));
}

public void interrupt() {
// do nothing,没有需要清理的上下文状态
}

public long state() {
return threshold;
}

/**
* 判断是否允许通过
*/
protected boolean isPermit(Event event, long state) {
return state <= state();
}

/**
* 通知一下
*/
protected void notify(long minTimestamp) {
// 通知阻塞的线程恢复, 这里采用single all操作,当group中的几个时间都相同时,一次性触发通过多个
condition.signalAll();
}

/**
* 通知下一个minTimestamp数据出队列
*
* @throws InterruptedException
*/
private void single(long timestamp) throws InterruptedException {
lastTimestamps.add(timestamp);

if (timestamp < state()) {
// 针对mysql事务中会出现时间跳跃
// 例子:
// 2012-08-08 16:24:26 事务头
// 2012-08-08 16:24:24 变更记录
// 2012-08-08 16:24:25 变更记录
// 2012-08-08 16:24:26 事务尾

// 针对这种case,一旦发现timestamp有回退的情况,直接更新threshold,强制阻塞其他的操作,等待最小数据优先处理完成
threshold = timestamp; // 更新为最小值
}

if (lastTimestamps.size() >= groupSize) {// 判断队列是否需要触发
// 触发下一个出队列的数据
Long minTimestamp = this.lastTimestamps.peek();
if (minTimestamp != null) {
threshold = minTimestamp;
notify(minTimestamp);
}
} else {
threshold = Long.MIN_VALUE;// 如果不满足队列长度,需要阻塞等待
}
}

private Long getTimestamp(Event event) {
return event.getExecuteTime();
}

}

7. TimelineTransactionBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* 相比于{@linkplain TimelineBarrier},增加了按事务支持,会按照事务进行分库合并处理
*
* @author jianghang 2012-10-18 下午05:18:38
* @version 1.0.0
*/
public class TimelineTransactionBarrier extends TimelineBarrier {

private ThreadLocal<Boolean> inTransaction = ThreadLocal.withInitial(() -> false);

/**
* <pre>
* 几种状态:
* 0:初始状态,允许大家竞争
* 1: 事务数据处理中
* 2: 非事务数据处理中
* </pre>
*/
private AtomicInteger txState = new AtomicInteger(0);

public TimelineTransactionBarrier(int groupSize){
super(groupSize);
}

public void await(Event event) throws InterruptedException {
try {
super.await(event);
} catch (InterruptedException e) {
// 出现线程中断,可能是因为关闭或者主备切换
// 主备切换对应的事务尾会未正常发送,需要强制设置为事务结束,允许其他队列通过
reset();
throw e;
}
}

public void await(Event event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
try {
super.await(event, timeout, unit);
} catch (InterruptedException e) {
// 出现线程中断,可能是因为关闭或者主备切换
// 主备切换对应的事务尾会未正常发送,需要强制设置为事务结束,允许其他队列通过
reset();
throw e;
}
}

public void clear(Event event) {
super.clear(event);

// 应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2
// 如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁
// CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常
if (txState.intValue() == 2) {// 非事务中
boolean result = txState.compareAndSet(2, 0);
if (result == false) {
throw new CanalSinkException("state is not correct in non-transaction");
}
} else if (isTransactionEnd(event)) {
inTransaction.set(false); // 事务结束并且已经成功写入store,清理标记,进入重新排队判断,允许新的事务进入
boolean result = txState.compareAndSet(1, 0);
if (result == false) {
throw new CanalSinkException("state is not correct in transaction");
}
}
}

protected boolean isPermit(Event event, long state) {
if (txState.intValue() == 1 && inTransaction.get()) { // 如果处于事务中,直接允许通过。因为事务头已经做过判断
return true;
} else if (txState.intValue() == 0) {
boolean result = super.isPermit(event, state);
if (result) {
// 可能第一条送过来的数据不为Begin,需要做判断处理,如果非事务,允许直接通过,比如DDL语句
if (isTransactionBegin(event)) {
if (txState.compareAndSet(0, 1)) {
inTransaction.set(true);
return true; // 事务允许通过
}
} else if (txState.compareAndSet(0, 2)) { // 非事务保护中
// 当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd
return true; // DDL/DCL/TransactionEnd允许通过
}
}
}

return false;
}

public void interrupt() {
super.interrupt();
reset();
}

// 重新设置状态
private void reset() {
inTransaction.remove();
txState.set(0);// 重新置位
}

private boolean isTransactionBegin(Event event) {
return event.getEntryType() == EntryType.TRANSACTIONBEGIN;
}

private boolean isTransactionEnd(Event event) {
return event.getEntryType() == EntryType.TRANSACTIONEND;
}

}

8. CanalEventDownStreamHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 处理下sink时的数据流
*
* @author jianghang 2012-7-31 下午03:06:26
* @version 1.0.0
*/
public interface CanalEventDownStreamHandler<T> extends CanalLifeCycle {

/**
* 提交到store之前做一下处理,允许替换Event
*/
public T before(T events);

/**
* store处于full后,retry时处理做一下处理
*/
public T retry(T events);

/**
* 提交store成功后做一下处理
*/
public T after(T events);
}

9. AbstractCanalEventDownStreamHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 默认的实现
*
* @author jianghang 2013-10-8 下午8:35:29
* @since 1.0.12
*/
public class AbstractCanalEventDownStreamHandler<T> extends AbstractCanalLifeCycle implements CanalEventDownStreamHandler<T> {

public T before(T events) {
return events;
}

public T retry(T events) {
return events;
}

public T after(T events) {
return events;
}

}

10. HeartBeatEntryEventHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 处理一下一下heartbeat数据
*
* @author jianghang 2013-10-8 下午6:03:53
* @since 1.0.12
*/
public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {

public List<Event> before(List<Event> events) {
boolean existHeartBeat = false;
for (Event event : events) {
if (event.getEntryType() == EntryType.HEARTBEAT) {
existHeartBeat = true;
break;
}
}

if (!existHeartBeat) {
return events;
} else {
// 目前heartbeat和其他事件是分离的,保险一点还是做一下检查处理
List<Event> result = new ArrayList<>();
for (Event event : events) {
if (event.getEntryType() != EntryType.HEARTBEAT) {
result.add(event);
}
}

return result;
}
}

}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.