1. CanalConnector 重点介绍RocketMQCanalConnector。
1.1 RocketMQCanalConnector-属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private String nameServer;private String topic;private String groupName;private volatile boolean connected = false ;private DefaultMQPushConsumer rocketMQConsumer;private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;private int batchSize = -1 ;private long batchProcessTimeout = 60 * 1000 ;private boolean flatMessage;private volatile ConsumerBatchMessage lastGetBatchMessage = null ;private String accessKey;private String secretKey;private String customizedTraceTopic;private boolean enableMessageTrace = false ;private String accessChannel;private String namespace;
1.2 RocketMQCanalConnector-Connector方法 1.2.1 connect 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void connect () throws CanalClientException { RPCHook rpcHook = null ; if (null != accessKey && accessKey.length() > 0 && null != secretKey && secretKey.length() > 0 ) { SessionCredentials sessionCredentials = new SessionCredentials (); sessionCredentials.setAccessKey(accessKey); sessionCredentials.setSecretKey(secretKey); rpcHook = new AclClientRPCHook (sessionCredentials); } rocketMQConsumer = new DefaultMQPushConsumer (groupName, rpcHook, new AllocateMessageQueueAveragely (), enableMessageTrace, customizedTraceTopic); rocketMQConsumer.setVipChannelEnabled(false ); if (CLOUD_ACCESS_CHANNEL.equals(this .accessChannel)) { rocketMQConsumer.setAccessChannel(AccessChannel.CLOUD); } if (!StringUtils.isEmpty(this .namespace)) { rocketMQConsumer.setNamespace(this .namespace); } if (!StringUtils.isBlank(nameServer)) { rocketMQConsumer.setNamesrvAddr(nameServer); } if (batchSize != -1 ) { rocketMQConsumer.setConsumeMessageBatchMaxSize(batchSize); } }
1.2.2 disconnect 1 2 3 4 public void disconnect () throws CanalClientException { rocketMQConsumer.shutdown(); connected = false ; }
1.2.3 checkValid 1 2 3 public boolean checkValid () throws CanalClientException { return connected; }
1.2.4 subscribe 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public synchronized void subscribe (String filter) throws CanalClientException { if (connected) { return ; } try { if (rocketMQConsumer == null ) { this .connect(); } rocketMQConsumer.subscribe(this .topic, "*" ); rocketMQConsumer.registerMessageListener(new MessageListenerOrderly () { @Override public ConsumeOrderlyStatus consumeMessage (List<MessageExt> messageExts, ConsumeOrderlyContext context) { context.setAutoCommit(true ); boolean isSuccess = process(messageExts); if (isSuccess) { return ConsumeOrderlyStatus.SUCCESS; } else { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); rocketMQConsumer.start(); connected = true ; } catch (MQClientException ex) { throw new RuntimeException ("Start RocketMQ consumer error" , ex); } } private boolean process (List<MessageExt> messageExts) { if (logger.isDebugEnabled()) { logger.debug("Get Message: {}" , messageExts); } List messageList = new ArrayList <>(); for (MessageExt messageExt : messageExts) { byte [] data = messageExt.getBody(); if (data != null ) { try { if (!flatMessage) { Message message = CanalMessageDeserializer.deserializer(data); messageList.add(message); } else { FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class); messageList.add(flatMessage); } } catch (Exception ex) { logger.error("Add message error" , ex); throw new CanalClientException (ex); } } else { logger.warn("Received message data is null" ); } } ConsumerBatchMessage batchMessage; if (!flatMessage) { batchMessage = new ConsumerBatchMessage <Message>(messageList); } else { batchMessage = new ConsumerBatchMessage <FlatMessage>(messageList); } try { messageBlockingQueue.put(batchMessage); } catch (InterruptedException e) { logger.error("Put message to queue error" , e); throw new RuntimeException (e); } boolean isCompleted; try { isCompleted = batchMessage.waitFinish(batchProcessTimeout); } catch (InterruptedException e) { logger.error("Interrupted when waiting messages to be finished." , e); throw new RuntimeException (e); } boolean isSuccess = batchMessage.isSuccess(); return isCompleted && isSuccess; }
1.2.5 unsubscribe 1 2 3 public void unsubscribe () throws CanalClientException { this .rocketMQConsumer.unsubscribe(this .topic); }
1.3 RocketMQCanalConnector-Message方法 1.3.1 getList 带阻塞时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public List<Message> getList (Long timeout, TimeUnit unit) throws CanalClientException { List<Message> messages = getListWithoutAck(timeout, unit); if (messages != null && !messages.isEmpty()) { ack(); } return messages; } @Override public List<Message> getListWithoutAck (Long timeout, TimeUnit unit) throws CanalClientException { try { if (this .lastGetBatchMessage != null ) { throw new CanalClientException ("mq get/ack not support concurrent & async ack" ); } ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit); if (batchMessage != null ) { this .lastGetBatchMessage = batchMessage; return batchMessage.getData(); } } catch (InterruptedException ex) { logger.warn("Get message timeout" , ex); throw new CanalClientException ("Failed to fetch the data after: " + timeout); } return Lists.newArrayList(); }
1.3.2 getFlatList 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public List<FlatMessage> getFlatList (Long timeout, TimeUnit unit) throws CanalClientException { List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit); if (messages != null && !messages.isEmpty()) { ack(); } return messages; } @Override public List<FlatMessage> getFlatListWithoutAck (Long timeout, TimeUnit unit) throws CanalClientException { try { if (this .lastGetBatchMessage != null ) { throw new CanalClientException ("mq get/ack not support concurrent & async ack" ); } ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit); if (batchMessage != null ) { this .lastGetBatchMessage = batchMessage; return batchMessage.getData(); } } catch (InterruptedException ex) { logger.warn("Get message timeout" , ex); throw new CanalClientException ("Failed to fetch the data after: " + timeout); } return Lists.newArrayList(); }
1.3.3 ack-不支持 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public void ack () throws CanalClientException { try { if (this .lastGetBatchMessage != null ) { this .lastGetBatchMessage.ack(); } } catch (Throwable e) { if (this .lastGetBatchMessage != null ) { this .lastGetBatchMessage.fail(); } } finally { this .lastGetBatchMessage = null ; } } @Override public void ack (long batchId) throws CanalClientException { throw new CanalClientException ("mq not support this method" ); }
1.3.4 rollback-不支持 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void rollback () throws CanalClientException { try { if (this .lastGetBatchMessage != null ) { this .lastGetBatchMessage.fail(); } } finally { this .lastGetBatchMessage = null ; } } @Override public void rollback (long batchId) throws CanalClientException { throw new CanalClientException ("mq not support this method" ); }
1.3.5 get-不支持 1 2 3 4 5 6 7 8 public Message get (int batchSize) throws CanalClientException { throw new CanalClientException ("mq not support this method" ); } @Override public Message get (int batchSize, Long timeout, TimeUnit unit) throws CanalClientException { throw new CanalClientException ("mq not support this method" ); }
1.3.6 getWithoutAck-不支持 1 2 3 4 5 6 7 8 9 @Override public Message getWithoutAck (int batchSize) throws CanalClientException { throw new CanalClientException ("mq not support this method" ); } @Override public Message getWithoutAck (int batchSize, Long timeout, TimeUnit unit) throws CanalClientException { throw new CanalClientException ("mq not support this method" ); }
2. CanalConnectors
源码:
1 2 3 4 5 6 7 8 9 10 11 12 public interface CanalNodeAccessStrategy { SocketAddress currentNode () ; SocketAddress nextNode () ; }
2.1 CanalNodeAccessStrategy 2.1.1 SimpleNodeAccessStrategy 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class SimpleNodeAccessStrategy implements CanalNodeAccessStrategy { private List<SocketAddress> nodes = new ArrayList <>(); private int index = 0 ; public SimpleNodeAccessStrategy (List<? extends SocketAddress> nodes) { if (nodes == null || nodes.size() < 1 ) { throw new IllegalArgumentException ("at least 1 node required." ); } this .nodes.addAll(nodes); } @Override public SocketAddress nextNode () { try { return nodes.get(index); } finally { index = (index + 1 ) % nodes.size(); } } @Override public SocketAddress currentNode () { return nodes.get(index); } }
2.1.2 ClusterNodeAccessStrategy 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public class ClusterNodeAccessStrategy implements CanalNodeAccessStrategy { private String destination; private IZkChildListener childListener; private IZkDataListener dataListener; private ZkClientx zkClient; private volatile List<InetSocketAddress> currentAddress = new ArrayList <>(); private volatile InetSocketAddress runningAddress = null ; public ClusterNodeAccessStrategy (String destination, ZkClientx zkClient) { this .destination = destination; this .zkClient = zkClient; childListener = (parentPath, currentChilds) -> initClusters(currentChilds); dataListener = new IZkDataListener () { public void handleDataDeleted (String dataPath) throws Exception { runningAddress = null ; } public void handleDataChange (String dataPath, Object data) throws Exception { initRunning(data); } }; String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(destination); this .zkClient.subscribeChildChanges(clusterPath, childListener); initClusters(this .zkClient.getChildren(clusterPath)); String runningPath = ZookeeperPathUtils.getDestinationServerRunning(destination); this .zkClient.subscribeDataChanges(runningPath, dataListener); initRunning(this .zkClient.readData(runningPath, true )); } @Override public SocketAddress currentNode () { return nextNode(); } @Override public SocketAddress nextNode () { if (runningAddress != null ) { return runningAddress; } else if (!currentAddress.isEmpty()) { return currentAddress.get(0 ); } else { throw new ServerNotFoundException ("no alive canal server for " + destination); } } private void initClusters (List<String> currentChilds) { if (currentChilds == null || currentChilds.isEmpty()) { currentAddress = new ArrayList <>(); } else { List<InetSocketAddress> addresses = new ArrayList <>(); for (String address : currentChilds) { String[] strs = StringUtils.split(address, ":" ); if (strs != null && strs.length == 2 ) { addresses.add(new InetSocketAddress (strs[0 ], Integer.parseInt(strs[1 ]))); } } Collections.shuffle(addresses); currentAddress = addresses; } } private void initRunning (Object data) { if (data == null ) { return ; } ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte []) data, ServerRunningData.class); String[] strs = StringUtils.split(runningData.getAddress(), ':' ); if (strs.length == 2 ) { runningAddress = new InetSocketAddress (strs[0 ], Integer.parseInt(strs[1 ])); } } public void setZkClient (ZkClientx zkClient) { this .zkClient = zkClient; } public ZkClientx getZkClient () { return zkClient; } }
2.2 CanalConnectors 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class CanalConnectors { public static CanalConnector newSingleConnector (SocketAddress address, String destination, String username, String password) { SimpleCanalConnector canalConnector = new SimpleCanalConnector (address, username, password, destination); canalConnector.setSoTimeout(60 * 1000 ); canalConnector.setIdleTimeout(60 * 60 * 1000 ); return canalConnector; } public static CanalConnector newClusterConnector (List<? extends SocketAddress> addresses, String destination, String username, String password) { ClusterCanalConnector canalConnector = new ClusterCanalConnector (username, password, destination, new SimpleNodeAccessStrategy (addresses)); canalConnector.setSoTimeout(60 * 1000 ); canalConnector.setIdleTimeout(60 * 60 * 1000 ); return canalConnector; } public static CanalConnector newClusterConnector (String zkServers, String destination, String username, String password) { ClusterCanalConnector canalConnector = new ClusterCanalConnector (username, password, destination, new ClusterNodeAccessStrategy (destination, ZkClientx.getZkClient(zkServers))); canalConnector.setSoTimeout(60 * 1000 ); canalConnector.setIdleTimeout(60 * 60 * 1000 ); return canalConnector; } }
3. ConsumerBatchMessage 3.1 源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class ConsumerBatchMessage <T> { private final List<T> data; private CountDownLatch latch; private boolean hasFailure = false ; public ConsumerBatchMessage (List<T> data) { this .data = data; latch = new CountDownLatch (1 ); } public boolean waitFinish (long timeout) throws InterruptedException { return latch.await(timeout, TimeUnit.MILLISECONDS); } public boolean isSuccess () { return !hasFailure; } public List<T> getData () { return data; } public void ack () { latch.countDown(); } public void fail () { hasFailure = true ; long count = latch.getCount(); for (int i = 0 ; i < count; i++) { latch.countDown(); } } }
3.2 使用 1 2 3 4 5 6 7 isCompleted = batchMessage.waitFinish(batchProcessTimeout); if (this .lastGetBatchMessage != null ) { this .lastGetBatchMessage.ack(); }