SimpleCanalConnector

1. RocketMQCanalConnector

1.1 属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

private String nameServer;
private String topic;
private String groupName;
private volatile boolean connected = false;
private DefaultMQPushConsumer rocketMQConsumer;
private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
private int batchSize = -1;
private long batchProcessTimeout = 60 * 1000;
private boolean flatMessage;
private volatile ConsumerBatchMessage lastGetBatchMessage = null;
private String accessKey;
private String secretKey;
private String customizedTraceTopic;
private boolean enableMessageTrace = false;
private String accessChannel;
private String namespace;

1.2 CannalConnector实现方法

1.2.1 connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void connect() throws CanalClientException {
RPCHook rpcHook = null;
if (null != accessKey && accessKey.length() > 0 && null != secretKey && secretKey.length() > 0) {
SessionCredentials sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey(accessKey);
sessionCredentials.setSecretKey(secretKey);
rpcHook = new AclClientRPCHook(sessionCredentials);
}

rocketMQConsumer = new DefaultMQPushConsumer(groupName, rpcHook, new AllocateMessageQueueAveragely(), enableMessageTrace, customizedTraceTopic);
rocketMQConsumer.setVipChannelEnabled(false);
if (CLOUD_ACCESS_CHANNEL.equals(this.accessChannel)) {
rocketMQConsumer.setAccessChannel(AccessChannel.CLOUD);
}

if (!StringUtils.isEmpty(this.namespace)) {
rocketMQConsumer.setNamespace(this.namespace);
}

if (!StringUtils.isBlank(nameServer)) {
rocketMQConsumer.setNamesrvAddr(nameServer);
}
if (batchSize != -1) {
rocketMQConsumer.setConsumeMessageBatchMaxSize(batchSize);
}
}

1.2.2 disconnect

1
2
3
4
public void disconnect() throws CanalClientException {
rocketMQConsumer.shutdown();
connected = false;
}

1.2.3 checkValid

1
2
3
public boolean checkValid() throws CanalClientException {
return connected;
}

1.2.4 subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public synchronized void subscribe(String filter) throws CanalClientException {
if (connected) {
return;
}
try {
if (rocketMQConsumer == null) {
this.connect();
}
rocketMQConsumer.subscribe(this.topic, "*");
rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
boolean isSuccess = process(messageExts);
if (isSuccess) {
return ConsumeOrderlyStatus.SUCCESS;
} else {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
rocketMQConsumer.start();
connected = true;
} catch (MQClientException ex) {
throw new RuntimeException("Start RocketMQ consumer error", ex);
}
}

private boolean process(List<MessageExt> messageExts) {
if (logger.isDebugEnabled()) {
logger.debug("Get Message: {}", messageExts);
}
List messageList = new ArrayList<>();
for (MessageExt messageExt : messageExts) {
byte[] data = messageExt.getBody();
if (data != null) {
try {
if (!flatMessage) {
Message message = CanalMessageDeserializer.deserializer(data);
messageList.add(message);
} else {
FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
messageList.add(flatMessage);
}
} catch (Exception ex) {
logger.error("Add message error", ex);
throw new CanalClientException(ex);
}
} else {
logger.warn("Received message data is null");
}
}
ConsumerBatchMessage batchMessage;
if (!flatMessage) {
batchMessage = new ConsumerBatchMessage<Message>(messageList);
} else {
batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
}
try {
messageBlockingQueue.put(batchMessage);
} catch (InterruptedException e) {
logger.error("Put message to queue error", e);
throw new RuntimeException(e);
}
boolean isCompleted;
try {
isCompleted = batchMessage.waitFinish(batchProcessTimeout);
} catch (InterruptedException e) {
logger.error("Interrupted when waiting messages to be finished.", e);
throw new RuntimeException(e);
}
boolean isSuccess = batchMessage.isSuccess();
return isCompleted && isSuccess;
}

1.2.5 unsubscribe

1
2
3
public void unsubscribe() throws CanalClientException {
this.rocketMQConsumer.unsubscribe(this.topic);
}

1.3 CannalMQConnector实现方法

1.3.1 getList

带阻塞时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
List<Message> messages = getListWithoutAck(timeout, unit);
if (messages != null && !messages.isEmpty()) {
ack();
}
return messages;
}

@Override
public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
try {
if (this.lastGetBatchMessage != null) {
throw new CanalClientException("mq get/ack not support concurrent & async ack");
}

ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
if (batchMessage != null) {
this.lastGetBatchMessage = batchMessage;
return batchMessage.getData();
}
} catch (InterruptedException ex) {
logger.warn("Get message timeout", ex);
throw new CanalClientException("Failed to fetch the data after: " + timeout);
}
return Lists.newArrayList();
}

1.3.2 getFlatList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
if (messages != null && !messages.isEmpty()) {
ack();
}
return messages;
}

@Override
public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
try {
if (this.lastGetBatchMessage != null) {
throw new CanalClientException("mq get/ack not support concurrent & async ack");
}

ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
if (batchMessage != null) {
this.lastGetBatchMessage = batchMessage;
return batchMessage.getData();
}
} catch (InterruptedException ex) {
logger.warn("Get message timeout", ex);
throw new CanalClientException("Failed to fetch the data after: " + timeout);
}
return Lists.newArrayList();
}

1.3.3 ack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void ack() throws CanalClientException {
try {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.ack();
}
} catch (Throwable e) {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.fail();
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void ack(long batchId) throws CanalClientException {
throw new CanalClientException("mq not support this method");
}

1.3.4 rollback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void rollback() throws CanalClientException {
try {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.fail();
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void rollback(long batchId) throws CanalClientException {
throw new CanalClientException("mq not support this method");
}

1.3.5 get

1
2
3
4
5
6
7
8
public Message get(int batchSize) throws CanalClientException {
throw new CanalClientException("mq not support this method");
}

@Override
public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
throw new CanalClientException("mq not support this method");
}

1.3.6 getWithoutAck

1
2
3
4
5
6
7
8
9
@Override
public Message getWithoutAck(int batchSize) throws CanalClientException {
throw new CanalClientException("mq not support this method");
}

@Override
public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
throw new CanalClientException("mq not support this method");
}

2. CanalConnectors

集群访问控制接口

CanalNodeAccessStrategy

源码:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 集群节点访问控制接口
*
* @author jianghang 2012-10-29 下午07:55:41
* @version 1.0.0
*/
public interface CanalNodeAccessStrategy {

SocketAddress currentNode();

SocketAddress nextNode();
}

2.1 SimpleNodeAccessStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 简单版本的node访问实现
*
* @author jianghang 2012-10-29 下午08:00:23
* @version 1.0.0
*/
public class SimpleNodeAccessStrategy implements CanalNodeAccessStrategy {

private List<SocketAddress> nodes = new ArrayList<>();
private int index = 0;

public SimpleNodeAccessStrategy(List<? extends SocketAddress> nodes){
if (nodes == null || nodes.size() < 1) {
throw new IllegalArgumentException("at least 1 node required.");
}
this.nodes.addAll(nodes);
}

@Override
public SocketAddress nextNode() {
try {
return nodes.get(index);
} finally {
index = (index + 1) % nodes.size();
}
}

@Override
public SocketAddress currentNode() {
return nodes.get(index);
}

}

2.2 ClusterNodeAccessStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* 集群模式的调度策略
*
* @author jianghang 2012-12-3 下午10:01:04
* @version 1.0.0
*/
public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy {

private String destination;
private IZkChildListener childListener; // 监听所有的服务器列表
private IZkDataListener dataListener; // 监听当前的工作节点
private ZkClientx zkClient;
private volatile List<InetSocketAddress> currentAddress = new ArrayList<>();
private volatile InetSocketAddress runningAddress = null;

public ClusterNodeAccessStrategy(String destination, ZkClientx zkClient){
this.destination = destination;
this.zkClient = zkClient;
// handleChildChange
childListener = (parentPath, currentChilds) -> initClusters(currentChilds);

dataListener = new IZkDataListener() {

public void handleDataDeleted(String dataPath) throws Exception {
runningAddress = null;
}

public void handleDataChange(String dataPath, Object data) throws Exception {
initRunning(data);
}

};

String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination);
this.zkClient.subscribeChildChanges(clusterPath, childListener);
initClusters(this.zkClient.getChildren(clusterPath));

String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination);
this.zkClient.subscribeDataChanges(runningPath, dataListener);
initRunning(this.zkClient.readData(runningPath, true));
}

@Override
public SocketAddress currentNode() {
return nextNode();
}

@Override
public SocketAddress nextNode() {
if (runningAddress != null) {// 如果服务已经启动,直接选择当前正在工作的节点
return runningAddress;
} else if (!currentAddress.isEmpty()) { // 如果不存在已经启动的服务,可能服务是一种lazy启动,随机选择一台触发服务器进行启动
return currentAddress.get(0);// 默认返回第一个节点,之前已经做过shuffle
} else {
throw new ServerNotFoundException("no alive canal server for " + destination);
}
}

private void initClusters(List<String> currentChilds) {
if (currentChilds == null || currentChilds.isEmpty()) {
currentAddress = new ArrayList<>();
} else {
List<InetSocketAddress> addresses = new ArrayList<>();
for (String address : currentChilds) {
String[] strs = StringUtils.split(address, ":");
if (strs != null && strs.length == 2) {
addresses.add(new InetSocketAddress(strs[0], Integer.parseInt(strs[1])));
}
}

Collections.shuffle(addresses);
currentAddress = addresses;// 直接切换引用
}
}

private void initRunning(Object data) {
if (data == null) {
return;
}

ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
String[] strs = StringUtils.split(runningData.getAddress(), ':');
if (strs.length == 2) {
runningAddress = new InetSocketAddress(strs[0], Integer.parseInt(strs[1]));
}
}

public void setZkClient(ZkClientx zkClient) {
this.zkClient = zkClient;
}

public ZkClientx getZkClient() {
return zkClient;
}

}

2.3 CanalConnectors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* canal connectors创建工具类
*
* @author jianghang 2012-10-29 下午11:18:50
* @version 1.0.0
*/
public class CanalConnectors {

/**
* 创建单链接的客户端链接
*
* @param address
* @param destination
* @param username
* @param password
* @return
*/
public static CanalConnector newSingleConnector(SocketAddress address, String destination, String username,
String password) {
SimpleCanalConnector canalConnector = new SimpleCanalConnector(address, username, password, destination);
canalConnector.setSoTimeout(60 * 1000);
canalConnector.setIdleTimeout(60 * 60 * 1000);
return canalConnector;
}

/**
* 创建带cluster模式的客户端链接,自动完成failover切换
*
* @param addresses
* @param destination
* @param username
* @param password
* @return
*/
public static CanalConnector newClusterConnector(List<? extends SocketAddress> addresses, String destination,
String username, String password) {
ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
password,
destination,
new SimpleNodeAccessStrategy(addresses));
canalConnector.setSoTimeout(60 * 1000);
canalConnector.setIdleTimeout(60 * 60 * 1000);
return canalConnector;
}

/**
* 创建带cluster模式的客户端链接,自动完成failover切换,服务器列表自动扫描
*
* @param zkServers
* @param destination
* @param username
* @param password
* @return
*/
public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
String password) {
ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
password,
destination,
new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
canalConnector.setSoTimeout(60 * 1000);
canalConnector.setIdleTimeout(60 * 60 * 1000);
return canalConnector;
}
}

3. ConsumerBatchMessage

3.1 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ConsumerBatchMessage<T> {

private final List<T> data;
private CountDownLatch latch;
private boolean hasFailure = false;

public ConsumerBatchMessage(List<T> data){
this.data = data;
latch = new CountDownLatch(1);
}

public boolean waitFinish(long timeout) throws InterruptedException {
return latch.await(timeout, TimeUnit.MILLISECONDS);
}

public boolean isSuccess() {
return !hasFailure;
}

public List<T> getData() {
return data;
}

/**
* Countdown if the sub message is successful.
*/
public void ack() {
latch.countDown();
}

/**
* Countdown and fail-fast if the sub message is failed.
*/
public void fail() {
hasFailure = true;
// fail fast
long count = latch.getCount();
for (int i = 0; i < count; i++) {
latch.countDown();
}
}
}

3.2 使用

1
2
3
4
5
6
7
// 1. 阻塞在这里
isCompleted = batchMessage.waitFinish(batchProcessTimeout);

// 2. 等待lastMsg ack
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.ack();
}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.