image-20250602190439026

1. CanalMetaManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* meta信息管理器
*
* @author jianghang 2012-6-14 下午09:28:48
* @author zebin.xuzb
* @version 1.0.0
*/
public interface CanalMetaManager extends CanalLifeCycle {

/**
* 增加一个 client订阅 <br/>
* 如果 client已经存在,则不做任何修改
*/
void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 判断是否订阅
*/
boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 取消client订阅
*/
void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 获取 cursor 游标
*/
Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 更新 cursor 游标
*/
void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException;

/**
* 根据指定的destination列出当前所有的clientIdentity信息
*/
List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException;

/**
* 获得该client最新的一个位置
*/
PositionRange getFirstBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 获得该clientId最新的一个位置
*/
PositionRange getLastestBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 为 client 产生一个唯一、递增的id
*/
Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException;

/**
* 指定batchId,插入batch数据
*/
void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long batchId)
throws CanalMetaManagerException;

/**
* 根据唯一messageId,查找对应的数据起始信息
*/
PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException;

/**
* 对一个batch的确认
*/
PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException;

/**
* 查询当前的所有batch信息
*/
Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 清除对应的batch信息
*/
void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException;

}

2. ZooKeeperMetaManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/**
* zk 版本的 canal manager, 存储结构:
*
* <pre>
* /otter
* canal
* destinations
* dest1
* client1
* filter
* batch_mark
* 1
* 2
* 3
* </pre>
*
* @author zebin.xuzb @ 2012-6-21
* @author jianghang
* @version 1.0.0
*/
public class ZooKeeperMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {

private static final String ENCODE = "UTF-8";
private ZkClientx zkClientx;

public void start() {
super.start();

Assert.notNull(zkClientx);
}

public void stop() {
zkClientx = null; //关闭时置空
super.stop();
}

public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
clientIdentity.getClientId());

try {
zkClientx.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
// ignore
}
if (clientIdentity.hasFilter()) {
String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
clientIdentity.getClientId());

byte[] bytes = null;
try {
bytes = clientIdentity.getFilter().getBytes(ENCODE);
} catch (UnsupportedEncodingException e) {
throw new CanalMetaManagerException(e);
}

try {
zkClientx.createPersistent(filterPath, bytes);
} catch (ZkNodeExistsException e) {
// ignore
zkClientx.writeData(filterPath, bytes);
}
}
}

public boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
clientIdentity.getClientId());
return zkClientx.exists(path);
}

public void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
clientIdentity.getClientId());
zkClientx.deleteRecursive(path); // 递归删除所有信息
}

public List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
if (zkClientx == null) { //重新加载时可能为空
return new ArrayList<>();
}
String path = ZookeeperPathUtils.getDestinationPath(destination);
List<String> childs = null;
try {
childs = zkClientx.getChildren(path);
} catch (ZkNoNodeException e) {
// ignore
}

if (CollectionUtils.isEmpty(childs)) {
return new ArrayList<>();
}
List<Short> clientIds = new ArrayList<>();
for (String child : childs) {
if (StringUtils.isNumeric(child)) {
clientIds.add(ZookeeperPathUtils.getClientId(child));
}
}

Collections.sort(clientIds); // 进行一个排序
List<ClientIdentity> clientIdentities = Lists.newArrayList();
for (Short clientId : clientIds) {
path = ZookeeperPathUtils.getFilterPath(destination, clientId);
byte[] bytes = zkClientx.readData(path, true);
String filter = null;
if (bytes != null) {
try {
filter = new String(bytes, ENCODE);
} catch (UnsupportedEncodingException e) {
throw new CanalMetaManagerException(e);
}
}
clientIdentities.add(new ClientIdentity(destination, clientId, filter));
}

return clientIdentities;
}

public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());

byte[] data = zkClientx.readData(path, true);
if (data == null || data.length == 0) {
return null;
}

return JsonUtils.unmarshalFromByte(data, Position.class);
}

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());
byte[] data = JsonUtils.marshalToByte(position, JSONWriter.Feature.WriteClassName);
try {
zkClientx.writeData(path, data);
} catch (ZkNoNodeException e) {
zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建
}
}

public Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
byte[] data = JsonUtils.marshalToByte(positionRange, JSONWriter.Feature.WriteClassName);
String batchPath = zkClientx
.createPersistentSequential(path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR, data, true);
String batchIdString = StringUtils.substringAfterLast(batchPath, ZookeeperPathUtils.ZOOKEEPER_SEPARATOR);
return ZookeeperPathUtils.getBatchMarkId(batchIdString);
}

public void addBatch(ClientIdentity clientIdentity, PositionRange positionRange,
Long batchId) throws CanalMetaManagerException {
String path = ZookeeperPathUtils
.getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
byte[] data = JsonUtils.marshalToByte(positionRange, JSONWriter.Feature.WriteClassName);
zkClientx.createPersistent(path, data, true);
}

public PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
String batchsPath = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
List<String> nodes = zkClientx.getChildren(batchsPath);
if (CollectionUtils.isEmpty(nodes)) {
// 没有batch记录
return null;
}

// 找到最小的Id
ArrayList<Long> batchIds = new ArrayList<>(nodes.size());
for (String batchIdString : nodes) {
batchIds.add(Long.valueOf(batchIdString));
}
Long minBatchId = Collections.min(batchIds);
if (!minBatchId.equals(batchId)) {
// 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d", batchId, minBatchId));
}

if (!batchIds.contains(batchId)) {
// 不存在对应的batchId
return null;
}
PositionRange positionRange = getBatch(clientIdentity, batchId);
if (positionRange != null) {
String path = ZookeeperPathUtils
.getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
zkClientx.delete(path);
}

return positionRange;
}

public PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
String path = ZookeeperPathUtils
.getBatchMarkWithIdPath(clientIdentity.getDestination(), clientIdentity.getClientId(), batchId);
byte[] data = zkClientx.readData(path, true);
if (data == null) {
return null;
}

PositionRange positionRange = JsonUtils.unmarshalFromByte(data, PositionRange.class);
return positionRange;
}

public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
List<String> batchChilds = zkClientx.getChildren(path);

for (String batchChild : batchChilds) {
String batchPath = path + ZookeeperPathUtils.ZOOKEEPER_SEPARATOR + batchChild;
zkClientx.delete(batchPath);
}
}

public PositionRange getLastestBatch(ClientIdentity clientIdentity) {
String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
List<String> nodes = null;
try {
nodes = zkClientx.getChildren(path);
} catch (ZkNoNodeException e) {
// ignore
}

if (CollectionUtils.isEmpty(nodes)) {
return null;
}
// 找到最大的Id
ArrayList<Long> batchIds = new ArrayList<>(nodes.size());
for (String batchIdString : nodes) {
batchIds.add(Long.valueOf(batchIdString));
}
Long maxBatchId = Collections.max(batchIds);
PositionRange result = getBatch(clientIdentity, maxBatchId);
if (result == null) { // 出现为null,说明zk节点有变化,重新获取
return getLastestBatch(clientIdentity);
} else {
return result;
}
}

public PositionRange getFirstBatch(ClientIdentity clientIdentity) {
String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
List<String> nodes = null;
try {
nodes = zkClientx.getChildren(path);
} catch (ZkNoNodeException e) {
// ignore
}

if (CollectionUtils.isEmpty(nodes)) {
return null;
}
// 找到最小的Id
ArrayList<Long> batchIds = new ArrayList<>(nodes.size());
for (String batchIdString : nodes) {
batchIds.add(Long.valueOf(batchIdString));
}
Long minBatchId = Collections.min(batchIds);
PositionRange result = getBatch(clientIdentity, minBatchId);
if (result == null) { // 出现为null,说明zk节点有变化,重新获取
return getFirstBatch(clientIdentity);
} else {
return result;
}
}

public Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) {
String path = ZookeeperPathUtils.getBatchMarkPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
List<String> nodes = null;
try {
nodes = zkClientx.getChildren(path);
} catch (ZkNoNodeException e) {
// ignore
}

if (CollectionUtils.isEmpty(nodes)) {
return Maps.newHashMap();
}
// 找到最大的Id
ArrayList<Long> batchIds = new ArrayList<>(nodes.size());
for (String batchIdString : nodes) {
batchIds.add(Long.valueOf(batchIdString));
}

Collections.sort(batchIds); // 从小到大排序
Map<Long, PositionRange> positionRanges = Maps.newLinkedHashMap();
for (Long batchId : batchIds) {
PositionRange result = getBatch(clientIdentity, batchId);
if (result == null) {// 出现为null,说明zk节点有变化,重新获取
return listAllBatchs(clientIdentity);
} else {
positionRanges.put(batchId, result);
}
}

return positionRanges;
}

// =========== setter ==========

public void setZkClientx(ZkClientx zkClientx) {
this.zkClientx = zkClientx;
}

}

3. MemoryMetaManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/**
* 内存版实现
*
* @author zebin.xuzb @ 2012-7-2
* @version 1.0.0
*/
public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {

protected Map<String, List<ClientIdentity>> destinations;
protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
protected Map<ClientIdentity, Position> cursors;

public void start() {
super.start();

batches = MigrateMap.makeComputingMap(MemoryClientIdentityBatch::create);

cursors = new MapMaker().makeMap();

destinations = MigrateMap.makeComputingMap(destination -> new ArrayList<>());
}

public void stop() {
super.stop();

destinations.clear();
cursors.clear();
for (MemoryClientIdentityBatch batch : batches.values()) {
batch.clearPositionRanges();
}
}

public synchronized void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());

if (clientIdentitys.contains(clientIdentity)) {
clientIdentitys.remove(clientIdentity);
}

clientIdentitys.add(clientIdentity);
}

public synchronized boolean hasSubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());
return clientIdentitys != null && clientIdentitys.contains(clientIdentity);
}

public synchronized void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());
if (clientIdentitys != null && clientIdentitys.contains(clientIdentity)) {
clientIdentitys.remove(clientIdentity);
}
}

public synchronized List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
// fixed issue #657, fixed ConcurrentModificationException
return Lists.newArrayList(destinations.get(destination));
}

public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
return cursors.get(clientIdentity);
}

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
cursors.put(clientIdentity, position);
}

public Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException {
return batches.get(clientIdentity).addPositionRange(positionRange);
}

public void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long batchId)
throws CanalMetaManagerException {
batches.get(clientIdentity).addPositionRange(positionRange, batchId);// 添加记录到指定batchId
}

public PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
return batches.get(clientIdentity).removePositionRange(batchId);
}

public PositionRange getBatch(ClientIdentity clientIdentity, Long batchId) throws CanalMetaManagerException {
return batches.get(clientIdentity).getPositionRange(batchId);
}

public PositionRange getLastestBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException {
return batches.get(clientIdentity).getLastestPositionRange();
}

public PositionRange getFirstBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException {
return batches.get(clientIdentity).getFirstPositionRange();
}

public Map<Long, PositionRange> listAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException {
return batches.get(clientIdentity).listAllPositionRange();
}

public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException {
batches.get(clientIdentity).clearPositionRanges();
}

// ============================

public static class MemoryClientIdentityBatch {

private ClientIdentity clientIdentity;
private Map<Long, PositionRange> batches = new MapMaker().makeMap();
private AtomicLong atomicMaxBatchId = new AtomicLong(1);

public static MemoryClientIdentityBatch create(ClientIdentity clientIdentity) {
return new MemoryClientIdentityBatch(clientIdentity);
}

public MemoryClientIdentityBatch(){

}

protected MemoryClientIdentityBatch(ClientIdentity clientIdentity){
this.clientIdentity = clientIdentity;
}

public synchronized void addPositionRange(PositionRange positionRange, Long batchId) {
updateMaxId(batchId);
batches.put(batchId, positionRange);
}

public synchronized Long addPositionRange(PositionRange positionRange) {
Long batchId = atomicMaxBatchId.getAndIncrement();
batches.put(batchId, positionRange);
return batchId;
}

public synchronized PositionRange removePositionRange(Long batchId) {
if (batches.containsKey(batchId)) {
Long minBatchId = Collections.min(batches.keySet());
if (!minBatchId.equals(batchId)) {
// 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d",
batchId,
minBatchId));
}
return batches.remove(batchId);
} else {
return null;
}
}

public synchronized PositionRange getPositionRange(Long batchId) {
return batches.get(batchId);
}

public synchronized PositionRange getLastestPositionRange() {
if (batches.size() == 0) {
return null;
} else {
Long batchId = Collections.max(batches.keySet());
return batches.get(batchId);
}
}

public synchronized PositionRange getFirstPositionRange() {
if (batches.size() == 0) {
return null;
} else {
Long batchId = Collections.min(batches.keySet());
return batches.get(batchId);
}
}

public synchronized Map<Long, PositionRange> listAllPositionRange() {
Set<Long> batchIdSets = batches.keySet();
List<Long> batchIds = new ArrayList<>(batchIdSets);
Collections.sort(new ArrayList<>(batchIds));

return Maps.newHashMap(batches);
}

public synchronized void clearPositionRanges() {
batches.clear();
}

private synchronized void updateMaxId(Long batchId) {
if (atomicMaxBatchId.get() < batchId + 1) {
atomicMaxBatchId.set(batchId + 1);
}
}

// ============ setter & getter =========

public ClientIdentity getClientIdentity() {
return clientIdentity;
}

public void setClientIdentity(ClientIdentity clientIdentity) {
this.clientIdentity = clientIdentity;
}

}

}

4. PeriodMixedMetaManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* 基于定时刷新的策略的mixed实现
*
* <pre>
* 几个优化:
* 1. 去除batch数据刷新到zk中,切换时batch数据可忽略,重新从头开始获取
* 2. cursor的更新,启用定时刷新,合并多次请求。如果最近没有变化则不更新
* </pre>
*
* @author jianghang 2012-9-11 下午02:41:15
* @version 1.0.0
*/
public class PeriodMixedMetaManager extends MemoryMetaManager implements CanalMetaManager {

private static final Logger logger = LoggerFactory.getLogger(PeriodMixedMetaManager.class);
private ScheduledExecutorService executor;
private ZooKeeperMetaManager zooKeeperMetaManager;
@SuppressWarnings("serial")
private final Position nullCursor = new Position() {
};
private long period = 1000; // 单位ms
private Set<ClientIdentity> updateCursorTasks;

public void start() {
super.start();
Assert.notNull(zooKeeperMetaManager);
if (!zooKeeperMetaManager.isStart()) {
zooKeeperMetaManager.start();
}

executor = Executors.newScheduledThreadPool(1);
destinations = MigrateMap.makeComputingMap(destination -> zooKeeperMetaManager.listAllSubscribeInfo(destination));

cursors = MigrateMap.makeComputingMap(clientIdentity -> {
Position position = zooKeeperMetaManager.getCursor(clientIdentity);
if (position == null) {
return nullCursor; // 返回一个空对象标识,避免出现异常
} else {
return position;
}
});

batches = MigrateMap.makeComputingMap(clientIdentity -> {
// 读取一下zookeeper信息,初始化一次
MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity);
Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity);
for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) {
batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加记录到指定batchId
}
return batches;
});

updateCursorTasks = Collections.synchronizedSet(new HashSet<>());

// 启动定时工作任务
executor.scheduleAtFixedRate(() -> {
List<ClientIdentity> tasks = new ArrayList<>(updateCursorTasks);
for (ClientIdentity clientIdentity : tasks) {
try {
updateCursorTasks.remove(clientIdentity);

// 定时将内存中的最新值刷到zookeeper中,多次变更只刷一次
zooKeeperMetaManager.updateCursor(clientIdentity, getCursor(clientIdentity));
} catch (Throwable e) {
// ignore
logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);
}
}
}, period, period, TimeUnit.MILLISECONDS);
}

public void stop() {
super.stop();

if (zooKeeperMetaManager.isStart()) {
zooKeeperMetaManager.stop();
}

executor.shutdownNow();
destinations.clear();
batches.clear();
}

public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.subscribe(clientIdentity);

// 订阅信息频率发生比较低,不需要做定时merge处理
executor.submit(() -> zooKeeperMetaManager.subscribe(clientIdentity));
}

public void unsubscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.unsubscribe(clientIdentity);

// 订阅信息频率发生比较低,不需要做定时merge处理
executor.submit(() -> zooKeeperMetaManager.unsubscribe(clientIdentity));
}

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
super.updateCursor(clientIdentity, position);
updateCursorTasks.add(clientIdentity);// 添加到任务队列中进行触发
}

public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
Position position = super.getCursor(clientIdentity);
if (position == nullCursor) {
return null;
} else {
return position;
}
}

// =============== setter / getter ================

public void setZooKeeperMetaManager(ZooKeeperMetaManager zooKeeperMetaManager) {
this.zooKeeperMetaManager = zooKeeperMetaManager;
}

public void setPeriod(long period) {
this.period = period;
}

}

5. FileMixedMetaManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
/**
* 基于文件刷新的metaManager实现
*
* <pre>
* 策略:
* 1. 先写内存,然后定时刷新数据到File
* 2. 数据采取overwrite模式(只保留最后一次),通过logger实施append模式(记录历史版本)
* </pre>
*
* @author jianghang 2013-4-15 下午05:55:57
* @version 1.0.4
*/
public class FileMixedMetaManager extends MemoryMetaManager implements CanalMetaManager {

private static final Logger logger = LoggerFactory.getLogger(FileMixedMetaManager.class);
private static final Charset charset = StandardCharsets.UTF_8;
private File dataDir;
private String dataFileName = "meta.dat";
private Map<String, File> dataFileCaches;
private ScheduledExecutorService executor;
@SuppressWarnings("serial")
private final Position nullCursor = new Position() {
};
private long period = 1000; // 单位ms
private Set<ClientIdentity> updateCursorTasks;

public void start() {
super.start();
Assert.notNull(dataDir);
if (!dataDir.exists()) {
try {
FileUtils.forceMkdir(dataDir);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}

if (!dataDir.canRead() || !dataDir.canWrite()) {
throw new CanalMetaManagerException("dir[" + dataDir.getPath() + "] can not read/write");
}

dataFileCaches = MigrateMap.makeComputingMap(this::getDataFile);

executor = Executors.newScheduledThreadPool(1);
destinations = MigrateMap.makeComputingMap(this::loadClientIdentity);

cursors = MigrateMap.makeComputingMap(clientIdentity -> {
Position position = loadCursor(clientIdentity.getDestination(), clientIdentity);
if (position == null) {
return nullCursor; // 返回一个空对象标识,避免出现异常
} else {
return position;
}
});

updateCursorTasks = Collections.synchronizedSet(new HashSet<>());

// 启动定时工作任务
executor.scheduleAtFixedRate(() -> {
List<ClientIdentity> tasks = new ArrayList<>(updateCursorTasks);
for (ClientIdentity clientIdentity : tasks) {
MDC.put("destination", String.valueOf(clientIdentity.getDestination()));
try {
updateCursorTasks.remove(clientIdentity);

// 定时将内存中的最新值刷到file中,多次变更只刷一次
if (logger.isInfoEnabled()) {
LogPosition cursor = (LogPosition) getCursor(clientIdentity);
logger.info("clientId:{} cursor:[{},{},{},{},{}] address[{}]", clientIdentity.getClientId(), cursor.getPostion().getJournalName(),
cursor.getPostion().getPosition(), cursor.getPostion().getTimestamp(),
cursor.getPostion().getServerId(), cursor.getPostion().getGtid(),
cursor.getIdentity().getSourceAddress().toString());
}
flushDataToFile(clientIdentity.getDestination());
} catch (Throwable e) {
// ignore
logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);
}
}
},
period,
period,
TimeUnit.MILLISECONDS);
}

public void stop() {
flushDataToFile();// 刷新数据

super.stop();
executor.shutdownNow();
destinations.clear();
batches.clear();
}

public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.subscribe(clientIdentity);

// 订阅信息频率发生比较低,不需要做定时merge处理
executor.submit(() -> flushDataToFile(clientIdentity.getDestination()));
}

public void unsubscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.unsubscribe(clientIdentity);

// 订阅信息频率发生比较低,不需要做定时merge处理
executor.submit(() -> flushDataToFile(clientIdentity.getDestination()));
}

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
updateCursorTasks.add(clientIdentity);// 添加到任务队列中进行触发
super.updateCursor(clientIdentity, position);
}

public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
Position position = super.getCursor(clientIdentity);
if (position == nullCursor) {
return null;
} else {
return position;
}
}

// ============================ helper method ======================

private File getDataFile(String destination) {
File destinationMetaDir = new File(dataDir, destination);
if (!destinationMetaDir.exists()) {
try {
FileUtils.forceMkdir(destinationMetaDir);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}

return new File(destinationMetaDir, dataFileName);
}

private FileMetaInstanceData loadDataFromFile(File dataFile) {
try {
if (!dataFile.exists()) {
return null;
}

String json = FileUtils.readFileToString(dataFile, charset);
return JsonUtils.unmarshalFromString(json, FileMetaInstanceData.class);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}

private void flushDataToFile() {
for (String destination : destinations.keySet()) {
flushDataToFile(destination);
}
}

private void flushDataToFile(String destination) {
flushDataToFile(destination, dataFileCaches.get(destination));
}

private void flushDataToFile(String destination, File dataFile) {
FileMetaInstanceData data = new FileMetaInstanceData();
if (destinations.containsKey(destination)) {
synchronized (destination.intern()) { // 基于destination控制一下并发更新
data.setDestination(destination);

List<FileMetaClientIdentityData> clientDatas = new ArrayList<>();
List<ClientIdentity> clientIdentitys = destinations.get(destination);
for (ClientIdentity clientIdentity : clientIdentitys) {
FileMetaClientIdentityData clientData = new FileMetaClientIdentityData();
clientData.setClientIdentity(clientIdentity);
Position position = cursors.get(clientIdentity);
if (position != null && position != nullCursor) {
clientData.setCursor((LogPosition) position);
}

clientDatas.add(clientData);
}

data.setClientDatas(clientDatas);
}
//fixed issue https://github.com/alibaba/canal/issues/4312
//客户端数据为空时不覆盖文件内容 (适合单客户端)
if(data.getClientDatas().isEmpty()){
return;
}
String json = JsonUtils.marshalToString(data);
try {
FileUtils.writeStringToFile(dataFile, json);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}
}

private List<ClientIdentity> loadClientIdentity(String destination) {
List<ClientIdentity> result = Lists.newArrayList();

FileMetaInstanceData data = loadDataFromFile(dataFileCaches.get(destination));
if (data == null) {
return result;
}

List<FileMetaClientIdentityData> clientDatas = data.getClientDatas();
if (clientDatas == null) {
return result;
}

for (FileMetaClientIdentityData clientData : clientDatas) {
if (clientData.getClientIdentity().getDestination().equals(destination)) {
result.add(clientData.getClientIdentity());
}
}

return result;
}

private Position loadCursor(String destination, ClientIdentity clientIdentity) {
FileMetaInstanceData data = loadDataFromFile(dataFileCaches.get(destination));
if (data == null) {
return null;
}

List<FileMetaClientIdentityData> clientDatas = data.getClientDatas();
if (clientDatas == null) {
return null;
}

for (FileMetaClientIdentityData clientData : clientDatas) {
if (clientData.getClientIdentity() != null && clientData.getClientIdentity().equals(clientIdentity)) {
return clientData.getCursor();
}
}

return null;
}

/**
* 描述一个clientIdentity对应的数据对象
*
* @author jianghang 2013-4-15 下午06:19:40
* @version 1.0.4
*/
public static class FileMetaClientIdentityData {

private ClientIdentity clientIdentity;
private LogPosition cursor;

public FileMetaClientIdentityData(){

}

public FileMetaClientIdentityData(ClientIdentity clientIdentity, MemoryClientIdentityBatch batch,
LogPosition cursor){
this.clientIdentity = clientIdentity;
this.cursor = cursor;
}

public ClientIdentity getClientIdentity() {
return clientIdentity;
}

public void setClientIdentity(ClientIdentity clientIdentity) {
this.clientIdentity = clientIdentity;
}

public Position getCursor() {
return cursor;
}

public void setCursor(LogPosition cursor) {
this.cursor = cursor;
}

}

/**
* 描述整个canal instance对应数据对象
*
* @author jianghang 2013-4-15 下午06:20:22
* @version 1.0.4
*/
public static class FileMetaInstanceData {

private String destination;
private List<FileMetaClientIdentityData> clientDatas;

public FileMetaInstanceData(){

}

public FileMetaInstanceData(String destination, List<FileMetaClientIdentityData> clientDatas){
this.destination = destination;
this.clientDatas = clientDatas;
}

public String getDestination() {
return destination;
}

public void setDestination(String destination) {
this.destination = destination;
}

public List<FileMetaClientIdentityData> getClientDatas() {
return clientDatas;
}

public void setClientDatas(List<FileMetaClientIdentityData> clientDatas) {
this.clientDatas = clientDatas;
}

}

public void setDataDir(String dataDir) {
this.dataDir = new File(dataDir);
}

public void setDataDirByFile(File dataDir) {
this.dataDir = dataDir;
}

public void setPeriod(long period) {
this.period = period;
}

}

6. MixedMetaManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* 组合memory + zookeeper的使用模式
*
* @author jianghang 2012-7-11 下午03:58:00
* @version 1.0.0
*/

public class MixedMetaManager extends MemoryMetaManager implements CanalMetaManager {

private ExecutorService executor;
private ZooKeeperMetaManager zooKeeperMetaManager;
@SuppressWarnings("serial")
private final Position nullCursor = new Position() {
};

public void start() {
super.start();
Assert.notNull(zooKeeperMetaManager);
if (!zooKeeperMetaManager.isStart()) {
zooKeeperMetaManager.start();
}

executor = Executors.newFixedThreadPool(1);
destinations = MigrateMap.makeComputingMap(destination -> zooKeeperMetaManager.listAllSubscribeInfo(destination));

cursors = MigrateMap.makeComputingMap(clientIdentity -> {
Position position = zooKeeperMetaManager.getCursor(clientIdentity);
if (position == null) {
return nullCursor; // 返回一个空对象标识,避免出现异常
} else {
return position;
}
});

batches = MigrateMap.makeComputingMap(clientIdentity -> {
// 读取一下zookeeper信息,初始化一次
MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity);
Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity);
for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) {
batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加记录到指定batchId
}
return batches;
});
}

public void stop() {
super.stop();

if (zooKeeperMetaManager.isStart()) {
zooKeeperMetaManager.stop();
}

executor.shutdownNow();
destinations.clear();
batches.clear();
}

public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.subscribe(clientIdentity);

executor.submit(() -> zooKeeperMetaManager.subscribe(clientIdentity));
}

public void unsubscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.unsubscribe(clientIdentity);

executor.submit(() -> zooKeeperMetaManager.unsubscribe(clientIdentity));
}

public void updateCursor(final ClientIdentity clientIdentity, final Position position)
throws CanalMetaManagerException {
super.updateCursor(clientIdentity, position);

// 异步刷新
executor.submit(() -> zooKeeperMetaManager.updateCursor(clientIdentity, position));
}

@Override
public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
Position position = super.getCursor(clientIdentity);
if (position == nullCursor) {
return null;
} else {
return position;
}
}

public Long addBatch(final ClientIdentity clientIdentity, final PositionRange positionRange)
throws CanalMetaManagerException {
final Long batchId = super.addBatch(clientIdentity, positionRange);
// 异步刷新
executor.submit(() -> zooKeeperMetaManager.addBatch(clientIdentity, positionRange, batchId));
return batchId;
}

public void addBatch(final ClientIdentity clientIdentity, final PositionRange positionRange, final Long batchId)
throws CanalMetaManagerException {
super.addBatch(clientIdentity, positionRange, batchId);
// 异步刷新
executor.submit(() -> zooKeeperMetaManager.addBatch(clientIdentity, positionRange, batchId));
}

public PositionRange removeBatch(final ClientIdentity clientIdentity, final Long batchId)
throws CanalMetaManagerException {
PositionRange positionRange = super.removeBatch(clientIdentity, batchId);
// 异步刷新
executor.submit(() -> {
zooKeeperMetaManager.removeBatch(clientIdentity, batchId);
});

return positionRange;
}

public void clearAllBatchs(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.clearAllBatchs(clientIdentity);

// 异步刷新
executor.submit(() -> zooKeeperMetaManager.clearAllBatchs(clientIdentity));
}

// =============== setter / getter ================
public void setZooKeeperMetaManager(ZooKeeperMetaManager zooKeeperMetaManager) {
this.zooKeeperMetaManager = zooKeeperMetaManager;
}
}


本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.