1. 简介

这是canal的存储模块com.alibaba.otter.canal.store,能够将binlog解析后的event存储到内存中,由于内存有限,优先使用ITEMSIZE模式进行限制,其次使用MEMSIZE进行限制,MemoryEventStoreWithBuffer作为该模块的核心实现类,基于ReentrantLock提供阻塞的get、put方法。

  • 数据存储在哪里?内存中

  • 存储的对象是什么?EVENT

2. 核心类图

img.png

3. 实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Event implements Serializable {

private static final long serialVersionUID = 1333330351758762739L;

private LogIdentity logIdentity; // 记录数据产生的来源
private ByteString rawEntry;

private long executeTime;
// 打散后的事件类型,主要用于标识事务的开始,变更数据,结束
private EntryType entryType;
private String journalName;
private long position;
private long serverId;
// 事件类型,比如:增删改查
private EventType eventType;
private String gtid;
private long rawLength;
private int rowsCount;

// ==== https://github.com/alibaba/canal/issues/1019
private CanalEntry.Entry entry;
}

4. MemoryEventStoreWithBuffer

以下是该类提供的核心方法:

image-20250520224138644

4.1 put(List<Event> data)

  1. 加锁
  2. while 循环检查是否有空位,没有空位,则notFull.await,等待被唤醒,否则,执行doPut
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public void put(List<Event> data) throws InterruptedException, CanalStoreException {
if (data == null || data.isEmpty()) {
return;
}

final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkFreeSlotAt(putSequence.get() + data.size())) { // 检查是否有空位
notFull.await(); // wait until not full
}
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
doPut(data);
if (Thread.interrupted()) {
throw new InterruptedException();
}
} finally {
lock.unlock();
}
}

private long getMinimumGetOrAck() {
long get = getSequence.get();
long ack = ackSequence.get();
return ack <= get ? ack : get;
}

private boolean checkFreeSlotAt(final long sequence) {
final long wrapPoint = sequence - bufferSize;
final long minPoint = getMinimumGetOrAck();
if (wrapPoint > minPoint) { // 刚好追上一轮
return false;
} else {
// 在bufferSize模式上,再增加memSize控制
if (batchMode.isMemSize()) {
final long memsize = putMemSize.get() - ackMemSize.get();
if (memsize < bufferSize * bufferMemUnit) {
return true;
} else {
return false;
}
} else {
return true;
}
}
}

/**
* 执行具体的put操作
*/
private void doPut(List<Event> data) {
long current = putSequence.get();
long end = current + data.size();

// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
for (long next = current + 1; next <= end; next++) {
entries[getIndex(next)] = data.get((int) (next - current - 1));
}

putSequence.set(end);

// 记录一下gets memsize信息,方便快速检索
if (batchMode.isMemSize()) {
long size = 0;
for (Event event : data) {
size += calculateSize(event);
}

putMemSize.getAndAdd(size);
}
profiling(data, OP.PUT);
// tell other threads that store is not empty
notEmpty.signal();
}

4.2 put(List<Event> data, long timeout, TimeUnit unit)

  1. 加锁
  2. for循环检查是否有空位,如果有,则执行doPut,否则notFull.awaitNanos进行等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean put(List<Event> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
if (data == null || data.isEmpty()) {
return true;
}

long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (checkFreeSlotAt(putSequence.get() + data.size())) {
doPut(data);
return true;
}
if (nanos <= 0) {
return false;
}

try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}

4.3 tryPut(List<Event> data)

  1. 加锁
  2. 只进行一次检查是否有空位,如果有则执行doPut,否则返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean tryPut(List<Event> data) throws CanalStoreException {
if (data == null || data.isEmpty()) {
return true;
}

final ReentrantLock lock = this.lock;
lock.lock();
try {
if (!checkFreeSlotAt(putSequence.get() + data.size())) {
return false;
} else {
doPut(data);
return true;
}
} finally {
lock.unlock();
}
}

4.4 get(Position start, int batchSize)

  1. 加锁
  2. while循环检查是否有足够元素,如果没有则notEmpty.await进行等待,否则执行doGet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkUnGetSlotAt((LogPosition) start, batchSize))
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}

return doGet(start, batchSize);
} finally {
lock.unlock();
}
}

/**
* 检查是否存在需要get的数据,并且数量>=batchSize
*/
private boolean checkUnGetSlotAt(LogPosition startPosition, int batchSize) {
if (batchMode.isItemSize()) {
long current = getSequence.get();
long maxAbleSequence = putSequence.get();
long next = current;
if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录
next = next + 1;// 少一条数据
}

if (current < maxAbleSequence && next + batchSize - 1 <= maxAbleSequence) {
return true;
} else {
return false;
}
} else {
// 处理内存大小判断
long currentSize = getMemSize.get();
long maxAbleSize = putMemSize.get();

if (maxAbleSize - currentSize >= batchSize * bufferMemUnit) {
return true;
} else {
return false;
}
}
}

private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {
LogPosition startPosition = (LogPosition) start;

long current = getSequence.get();
long maxAbleSequence = putSequence.get();
long next = current;
long end = current;
// 如果startPosition为null,说明是第一次,默认+1处理
if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录
next = next + 1;
}

if (current >= maxAbleSequence) {
return new Events<>();
}

Events<Event> result = new Events<>();
List<Event> entrys = result.getEvents();
long memsize = 0;
if (batchMode.isItemSize()) {
end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence;
// 提取数据并返回
for (; next <= end; next++) {
Event event = entries[getIndex(next)];
if (ddlIsolation && isDdl(event.getEventType())) {
// 如果是ddl隔离,直接返回
if (entrys.size() == 0) {
entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
end = next; // 更新end为当前
} else {
// 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置
end = next - 1; // next-1一定大于current,不需要判断
}
break;
} else {
entrys.add(event);
}
}
} else {
long maxMemSize = batchSize * bufferMemUnit;
for (; memsize <= maxMemSize && next <= maxAbleSequence; next++) {
// 永远保证可以取出第一条的记录,避免死锁
Event event = entries[getIndex(next)];
if (ddlIsolation && isDdl(event.getEventType())) {
// 如果是ddl隔离,直接返回
if (entrys.size() == 0) {
entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
end = next; // 更新end为当前
} else {
// 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置
end = next - 1; // next-1一定大于current,不需要判断
}
break;
} else {
entrys.add(event);
memsize += calculateSize(event);
end = next;// 记录end位点
}
}

}

PositionRange<LogPosition> range = new PositionRange<>();
result.setPositionRange(range);

range.setStart(CanalEventUtils.createPosition(entrys.get(0)));
range.setEnd(CanalEventUtils.createPosition(entrys.get(result.getEvents().size() - 1)));
range.setEndSeq(end);
// 记录一下是否存在可以被ack的点

for (int i = entrys.size() - 1; i >= 0; i--) {
Event event = entrys.get(i);
// GTID模式,ack的位点必须是事务结尾,因为下一次订阅的时候mysql会发送这个gtid之后的next,如果在事务头就记录了会丢这最后一个事务
if ((CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType() && StringUtils.isEmpty(event.getGtid()))
|| CanalEntry.EntryType.TRANSACTIONEND == event.getEntryType() || isDdl(event.getEventType())) {
// 将事务头/尾设置可被为ack的点
range.setAck(CanalEventUtils.createPosition(event));
break;
}
}

if (getSequence.compareAndSet(current, end)) {
getMemSize.addAndGet(memsize);
notFull.signal();
profiling(result.getEvents(), OP.GET);
return result;
} else {
return new Events<>();
}
}

4.5 get(Position start, int batchSize, long timeout, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Events<Event> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,
CanalStoreException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (checkUnGetSlotAt((LogPosition) start, batchSize)) {
return doGet(start, batchSize);
}

if (nanos <= 0) {
// 如果时间到了,有多少取多少
return doGet(start, batchSize);
}

try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}

}
} finally {
lock.unlock();
}
}

4.6 tryGet(Position start, int batchSize)

1
2
3
4
5
6
7
8
9
public Events<Event> tryGet(Position start, int batchSize) throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return doGet(start, batchSize);
} finally {
lock.unlock();
}
}

5. Test

5.1 MemoryEventStoreBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class MemoryEventStoreBase {

private static final String MYSQL_ADDRESS = "127.0.0.1";

protected void sleep(Long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
Assert.fail();
}
}

protected Event buildEvent(String binlogFile, long offset, long timestamp) {
Header.Builder headerBuilder = Header.newBuilder();
headerBuilder.setLogfileName(binlogFile);
headerBuilder.setLogfileOffset(offset);
headerBuilder.setExecuteTime(timestamp);
headerBuilder.setEventLength(1024);
Entry.Builder entryBuilder = Entry.newBuilder();
entryBuilder.setHeader(headerBuilder.build());
Entry entry = entryBuilder.build();

return new Event(new LogIdentity(new InetSocketAddress(MYSQL_ADDRESS, 3306), 1234L), entry);
}

protected Event buildEvent(String binlogFile, long offset, long timestamp, long eventLenght) {
Header.Builder headerBuilder = Header.newBuilder();
headerBuilder.setLogfileName(binlogFile);
headerBuilder.setLogfileOffset(offset);
headerBuilder.setExecuteTime(timestamp);
headerBuilder.setEventLength(eventLenght);
Entry.Builder entryBuilder = Entry.newBuilder();
entryBuilder.setHeader(headerBuilder.build());
Entry entry = entryBuilder.build();

return new Event(new LogIdentity(new InetSocketAddress(MYSQL_ADDRESS, 3306), 1234L), entry);
}
}

5.2 MemoryEventStoreMemBatchTest

  1. testOnePut
  2. testOnePutExceedLimit
  3. testFullPut
  4. testOnePutOneGet
  5. testFullPutBatchGet
  6. testBlockPutOneGet
  7. testRollback
  8. testAck
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
public class MemoryEventStoreMemBatchTest extends MemoryEventStoreBase {

@Test
public void testOnePut() {
// 默认是初始化16K
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();
// 尝试阻塞
try {
eventStore.put(buildEvent("1", 1L, 1L, 1024));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// 尝试阻塞+超时
boolean result = false;
try {
result = eventStore.put(buildEvent("1", 1L, 1L), 1000L, TimeUnit.MILLISECONDS);
Assert.assertTrue(result);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// 尝试
result = eventStore.tryPut(buildEvent("1", 1L, 1L));
Assert.assertTrue(result);

eventStore.stop();
}

@Test
public void testOnePutExceedLimit() {
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBufferSize(1);
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();
// 尝试阻塞
try {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L, 1025));// 只有一条记录,第一条超过也允许放入
Assert.assertTrue(result);
} catch (Exception e) {
Assert.fail(e.getMessage());
}

eventStore.stop();
}

@Test
public void testFullPut() {
int bufferSize = 16;
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBufferSize(bufferSize);
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();

for (int i = 0; i < bufferSize; i++) {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L + i));
Assert.assertTrue(result);
}

boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L + bufferSize));
Assert.assertFalse(result);

try {
result = eventStore.put(buildEvent("1", 1L, 1L + bufferSize), 1000L, TimeUnit.MILLISECONDS);
} catch (CanalStoreException | InterruptedException e) {
Assert.fail(e.getMessage());
}

Assert.assertFalse(result);

eventStore.stop();
}

@Test
public void testOnePutOneGet() {
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();

boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
Assert.assertTrue(result);

Position position = eventStore.getFirstPosition();
Events<Event> entrys = eventStore.tryGet(position, 1);
Assert.assertTrue(entrys.getEvents().size() == 1);
Assert.assertEquals(position, entrys.getPositionRange().getStart());
Assert.assertEquals(position, entrys.getPositionRange().getEnd());

eventStore.stop();
}

@Test
public void testFullPutBatchGet() {
int bufferSize = 16;
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBufferSize(bufferSize);
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();

for (int i = 0; i < bufferSize; i++) {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L + i));
sleep(100L);
Assert.assertTrue(result);
}

Position first = eventStore.getFirstPosition();
Position lastest = eventStore.getLatestPosition();
Assert.assertEquals(first, CanalEventUtils.createPosition(buildEvent("1", 1L, 1L)));
Assert.assertEquals(lastest, CanalEventUtils.createPosition(buildEvent("1", 1L, 1L + bufferSize - 1)));

System.out.println("start get");
Events<Event> entrys1 = eventStore.tryGet(first, bufferSize);
System.out.println("first get size : " + entrys1.getEvents().size());

Assert.assertTrue(entrys1.getEvents().size() == bufferSize);
Assert.assertEquals(first, entrys1.getPositionRange().getStart());
Assert.assertEquals(lastest, entrys1.getPositionRange().getEnd());

Assert.assertEquals(first, CanalEventUtils.createPosition(entrys1.getEvents().get(0)));
Assert.assertEquals(lastest, CanalEventUtils.createPosition(entrys1.getEvents().get(bufferSize - 1)));
eventStore.stop();
}

@Ignore
@Test
public void testBlockPutOneGet() {
final MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBufferSize(16);
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();

final int batchSize = 10;
for (int i = 0; i < batchSize; i++) {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L));
Assert.assertTrue(result);
}

final Position position = eventStore.getFirstPosition();
try {
Events<Event> entrys = eventStore.get(position, batchSize);
Assert.assertTrue(entrys.getEvents().size() == batchSize);
Assert.assertEquals(position, entrys.getPositionRange().getStart());
Assert.assertEquals(position, entrys.getPositionRange().getEnd());
} catch (CanalStoreException | InterruptedException e) {
}

ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {
boolean result = false;
try {
eventStore.get(position, batchSize);
} catch (CanalStoreException e) {
} catch (InterruptedException e) {
System.out.println("interrupt occured.");
result = true;
}
Assert.assertTrue(result);
});

try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
executor.shutdownNow();

try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
eventStore.stop();
}

@Test
public void testRollback() {
int bufferSize = 16;
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBufferSize(bufferSize);
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();

for (int i = 0; i < bufferSize / 2; i++) {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L + i));
sleep(100L);
Assert.assertTrue(result);
}

sleep(50L);
Position first = eventStore.getFirstPosition();
Position lastest = eventStore.getLatestPosition();
Assert.assertEquals(first, CanalEventUtils.createPosition(buildEvent("1", 1L, 1L)));
Assert.assertEquals(lastest, CanalEventUtils.createPosition(buildEvent("1", 1L, 1L + bufferSize / 2 - 1)));

System.out.println("start get");
Events<Event> entrys1 = eventStore.tryGet(first, bufferSize);
System.out.println("first get size : " + entrys1.getEvents().size());

eventStore.rollback();

entrys1 = eventStore.tryGet(first, bufferSize);
System.out.println("after rollback get size : " + entrys1.getEvents().size());
Assert.assertTrue(entrys1.getEvents().size() == bufferSize / 2);

// 继续造数据
for (int i = bufferSize / 2; i < bufferSize; i++) {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L + i));
sleep(100L);
Assert.assertTrue(result);
}

Events<Event> entrys2 = eventStore.tryGet(entrys1.getPositionRange().getEnd(), bufferSize);
System.out.println("second get size : " + entrys2.getEvents().size());

eventStore.rollback();

entrys2 = eventStore.tryGet(entrys1.getPositionRange().getEnd(), bufferSize);
System.out.println("after rollback get size : " + entrys2.getEvents().size());
Assert.assertTrue(entrys2.getEvents().size() == bufferSize);

first = eventStore.getFirstPosition();
lastest = eventStore.getLatestPosition();
List<Event> entrys = new ArrayList<>(entrys2.getEvents());
Assert.assertTrue(entrys.size() == bufferSize);
Assert.assertEquals(first, entrys2.getPositionRange().getStart());
Assert.assertEquals(lastest, entrys2.getPositionRange().getEnd());

Assert.assertEquals(first, CanalEventUtils.createPosition(entrys.get(0)));
Assert.assertEquals(lastest, CanalEventUtils.createPosition(entrys.get(bufferSize - 1)));
eventStore.stop();
}

@Test
public void testAck() {
int bufferSize = 16;
MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer();
eventStore.setBufferSize(bufferSize);
eventStore.setBatchMode(BatchMode.MEMSIZE);
eventStore.start();

for (int i = 0; i < bufferSize / 2; i++) {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L + i));
sleep(100L);
Assert.assertTrue(result);
}

sleep(50L);
Position first = eventStore.getFirstPosition();
Position lastest = eventStore.getLatestPosition();
Assert.assertEquals(first, CanalEventUtils.createPosition(buildEvent("1", 1L, 1L)));
Assert.assertEquals(lastest, CanalEventUtils.createPosition(buildEvent("1", 1L, 1L + bufferSize / 2 - 1)));

System.out.println("start get");
Events<Event> entrys1 = eventStore.tryGet(first, bufferSize);
System.out.println("first get size : " + entrys1.getEvents().size());

eventStore.cleanUntil(entrys1.getPositionRange().getEnd());
sleep(50L);

// 继续造数据
for (int i = bufferSize / 2; i < bufferSize; i++) {
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L + i));
sleep(100L);
Assert.assertTrue(result);
}

Events<Event> entrys2 = eventStore.tryGet(entrys1.getPositionRange().getEnd(), bufferSize);
System.out.println("second get size : " + entrys2.getEvents().size());

eventStore.rollback();

entrys2 = eventStore.tryGet(entrys1.getPositionRange().getEnd(), bufferSize);
System.out.println("after rollback get size : " + entrys2.getEvents().size());

first = eventStore.getFirstPosition();
lastest = eventStore.getLatestPosition();
List<Event> entrys = new ArrayList<>(entrys2.getEvents());
// Assert.assertEquals(first, entrys2.getPositionRange().getStart());
Assert.assertEquals(lastest, entrys2.getPositionRange().getEnd());

// Assert.assertEquals(first,
// CanalEventUtils.createPosition(entrys.get(0)));
Assert.assertEquals(lastest, CanalEventUtils.createPosition(entrys.get(entrys.size() - 1)));

// 全部ack掉
eventStore.cleanUntil(entrys2.getPositionRange().getEnd());

// 最后就拿不到数据
Events<Event> entrys3 = eventStore.tryGet(entrys1.getPositionRange().getEnd(), bufferSize);
System.out.println("third get size : " + entrys3.getEvents().size());
Assert.assertEquals(0, entrys3.getEvents().size());

eventStore.stop();
}
}


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.