1. MigrateMap

1.1 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class MigrateMap {

public static <K, V> ConcurrentMap<K, V> makeComputingMap(CacheBuilder<Object, Object> builder,
Function<? super K, ? extends V> computingFunction) {
final Function<? super K, ? extends V> function = computingFunction;
LoadingCache<K, V> computingCache = builder.build(new CacheLoader<K, V>() {

@Override
public V load(K key) throws Exception {
return function.apply(key);
}
});

return new MigrateConcurrentMap<>(computingCache);
}

public static <K, V> ConcurrentMap<K, V> makeComputingMap(Function<? super K, ? extends V> computingFunction) {
return makeComputingMap(CacheBuilder.newBuilder(), computingFunction);
}

final static class MigrateConcurrentMap<K, V> implements ConcurrentMap<K, V> {

private final LoadingCache<K, V> computingCache;

private final ConcurrentMap<K, V> cacheView;

MigrateConcurrentMap(LoadingCache<K, V> computingCache){
this.computingCache = computingCache;
this.cacheView = computingCache.asMap();
}

@Override
public int size() {
return cacheView.size();
}

@Override
public boolean isEmpty() {
return cacheView.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return cacheView.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return cacheView.containsValue(value);
}

@Override
public V get(Object key) {
try {
return computingCache.get((K) key);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public V put(K key, V value) {
return cacheView.put(key, value);
}

@Override
public V remove(Object key) {
return cacheView.remove(key);
}

@Override
public void putAll(Map<? extends K, ? extends V> m) {
cacheView.putAll(m);
}

@Override
public void clear() {
cacheView.clear();
}

@Override
public Set<K> keySet() {
return cacheView.keySet();
}

@Override
public Collection<V> values() {
return cacheView.values();
}

@Override
public Set<Entry<K, V>> entrySet() {
return cacheView.entrySet();
}

@Override
public V putIfAbsent(K key, V value) {
return cacheView.putIfAbsent(key, value);
}

@Override
public boolean remove(Object key, Object value) {
return cacheView.remove(key, value);
}

@Override
public boolean replace(K key, V oldValue, V newValue) {
return cacheView.replace(key, oldValue, newValue);
}

@Override
public V replace(K key, V value) {
return cacheView.replace(key, value);
}

@Override
public String toString() {
return cacheView.toString();
}
}
}

1.2 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {

public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
embeddedCanalServer.start(destination);
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}

public void processActiveExit() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (canalMQStarter != null) {
canalMQStarter.stopDestination(destination);
}
embeddedCanalServer.stop(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}

public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
registerIp + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {

public void handleStateChanged(KeeperState state) throws Exception {

}

public void handleNewSession() throws Exception {
initCid(path);
}

@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}

public void processStop() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
registerIp + ":" + port);
releaseCid(path);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}

});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
// 触发创建一下cid节点
runningMonitor.init();
return runningMonitor;
}));

2. 基于AQS实现的BooleanMutex

2.1 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
public class BooleanMutex {

private final Sync sync;

public BooleanMutex(){
sync = new Sync();
set(false);
}

public BooleanMutex(Boolean mutex){
sync = new Sync();
set(mutex);
}

/**
* 阻塞等待Boolean为true
*
* @throws InterruptedException if the current thread is interrupted
*/
public void get() throws InterruptedException {
sync.innerGet();
}

/**
* 阻塞等待Boolean为true,允许设置超时时间
*
* @param timeout
* @param unit
* @throws InterruptedException
* @throws TimeoutException
*/
public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
sync.innerGet(unit.toNanos(timeout));
}

/**
* 重新设置对应的Boolean mutex
*
* @param mutex
*/
public void set(Boolean mutex) {
if (mutex) {
sync.innerSetTrue();
} else {
sync.innerSetFalse();
}
}

public boolean state() {
return sync.innerState();
}

/**
* Synchronization control for BooleanMutex. Uses AQS sync state to
* represent run status
*/
private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 2559471934544126329L;
/** State value representing that TRUE */
private static final int TRUE = 1;
/** State value representing that FALSE */
private static final int FALSE = 2;

private boolean isTrue(int state) {
return (state & TRUE) != 0;
}

/**
* 实现AQS的接口,获取共享锁的判断
*/
protected int tryAcquireShared(int state) {
// 如果为true,直接允许获取锁对象
// 如果为false,进入阻塞队列,等待被唤醒
return isTrue(getState()) ? 1 : -1;
}

/**
* 实现AQS的接口,释放共享锁的判断
*/
protected boolean tryReleaseShared(int ignore) {
// 始终返回true,代表可以release
return true;
}

boolean innerState() {
return isTrue(getState());
}

void innerGet() throws InterruptedException {
acquireSharedInterruptibly(0);
}

void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
}

void innerSetTrue() {
for (;;) {
int s = getState();
if (s == TRUE) {
return; // 直接退出
}
if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作
releaseShared(0);// 释放一下锁对象,唤醒一下阻塞的Thread
return;
}
}
}

void innerSetFalse() {
for (;;) {
int s = getState();
if (s == FALSE) {
return; // 直接退出
}
if (compareAndSetState(s, FALSE)) {// cas更新状态,避免并发更新false操作
return;
}
}
}

}

}

2.2 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 1. 
private BooleanMutex mutex = new BooleanMutex(false);

// 2. 初始化
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}

// 3. 启动
private void initRunning() {
if (!isStart()) {
return;
}

String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();// 触发一下事件
mutex.set(true);
release = false;
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
initRunning();
}
}

// 4. 真正启动

/**
* 阻塞等待自己成为active,如果自己成为active,立马返回
*
* @throws InterruptedException
*/
public void waitForActive() throws InterruptedException {
initRunning();
mutex.get();
}

3. 对zk的封装

3.1 继承ZkConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* 封装了ZooKeeper,使其支持节点的优先顺序,比如美国机房的节点会优先加载美国对应的zk集群列表,都失败后才会选择加载杭州的zk集群列表 *
*
* @author jianghang 2012-7-10 下午02:31:42
* @version 1.0.0
*/
public class ZooKeeperx extends ZkConnection {

private static final String SERVER_COMMA = ";";
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperx.class);
private static final Field clientCnxnField = ReflectionUtils.findField(ZooKeeper.class, "cnxn");
private static final Field hostProviderField = ReflectionUtils.findField(ClientCnxn.class, "hostProvider");
private static final Field serverAddressesField = ReflectionUtils.findField(StaticHostProvider.class,
"serverAddresses");
private static final Field zookeeperLockField = ReflectionUtils.findField(ZkConnection.class,
"_zookeeperLock");
private static final Field zookeeperFiled = ReflectionUtils.findField(ZkConnection.class, "_zk");
private static final int DEFAULT_SESSION_TIMEOUT = 90000;

private final List<String> _serversList;
private final int _sessionTimeOut;

public ZooKeeperx(String zkServers){
this(zkServers, DEFAULT_SESSION_TIMEOUT);
}

public ZooKeeperx(String zkServers, int sessionTimeOut){
super(zkServers, sessionTimeOut);
_serversList = Arrays.asList(StringUtils.split(this.getServers(), SERVER_COMMA));
_sessionTimeOut = sessionTimeOut;
}

@Override
public void connect(Watcher watcher) {
ReflectionUtils.makeAccessible(zookeeperLockField);
ReflectionUtils.makeAccessible(zookeeperFiled);
Lock _zookeeperLock = (ReentrantLock) ReflectionUtils.getField(zookeeperLockField, this);
ZooKeeper _zk = (ZooKeeper) ReflectionUtils.getField(zookeeperFiled, this);

_zookeeperLock.lock();
try {
if (_zk != null) {
throw new IllegalStateException("zk client has already been started");
}
String zkServers = _serversList.get(0);

try {
logger.debug("Creating new ZookKeeper instance to connect to " + zkServers + ".");
_zk = new ZooKeeper(zkServers, _sessionTimeOut, watcher);
configMutliCluster(_zk);
ReflectionUtils.setField(zookeeperFiled, this, _zk);
} catch (IOException e) {
throw new ZkException("Unable to connect to " + zkServers, e);
}
} finally {
_zookeeperLock.unlock();
}
}

// ===============================

public void configMutliCluster(ZooKeeper zk) {
if (_serversList.size() == 1) {
return;
}
String cluster1 = _serversList.get(0);
try {
if (_serversList.size() > 1) {
// 强制的声明accessible
ReflectionUtils.makeAccessible(clientCnxnField);
ReflectionUtils.makeAccessible(hostProviderField);
ReflectionUtils.makeAccessible(serverAddressesField);

// 添加第二组集群列表
for (int i = 1; i < _serversList.size(); i++) {
String cluster = _serversList.get(i);
// 强制获取zk中的地址信息
ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn);
List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField,
hostProvider);
// 添加第二组集群列表
serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses());
}
}
} catch (Exception e) {
try {
if (zk != null) {
zk.close();
}
} catch (InterruptedException ie) {
// ignore interrupt
}
throw new ZkException("zookeeper_create_error, serveraddrs=" + cluster1, e);
}

}
}

3.2 继承ZkClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public class ZkClientx extends ZkClient {

// 对于zkclient进行一次缓存,避免一个jvm内部使用多个zk connection
private static Map<String, ZkClientx> clients = MigrateMap.makeComputingMap(ZkClientx::new);

public static ZkClientx getZkClient(String servers) {
return clients.get(servers);
}

public static void clearClients() {
clients.clear();
}

public ZkClientx(String serverstring){
this(serverstring, Integer.MAX_VALUE);
}

public ZkClientx(String zkServers, int connectionTimeout){
this(new ZooKeeperx(zkServers), connectionTimeout);
}

public ZkClientx(String zkServers, int sessionTimeout, int connectionTimeout){
this(new ZooKeeperx(zkServers, sessionTimeout), connectionTimeout);
}

public ZkClientx(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer){
this(new ZooKeeperx(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
}

private ZkClientx(IZkConnection connection, int connectionTimeout){
this(connection, connectionTimeout, new ByteSerializer());
}

private ZkClientx(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer){
super(zkConnection, connectionTimeout, zkSerializer);
}

/**
* Create a persistent Sequential node.
*
* @param path
* @param createParents if true all parent dirs are created as well and no
* {@link ZkNodeExistsException} is thrown in case the path already exists
* @throws ZkInterruptedException if operation was interrupted, or a
* required reconnection got interrupted
* @throws IllegalArgumentException if called from anything except the
* ZooKeeper event thread
* @throws ZkException if any ZooKeeper exception occurred
* @throws RuntimeException if any other exception occurs
*/
public String createPersistentSequential(String path, boolean createParents) throws ZkInterruptedException,
IllegalArgumentException, ZkException,
RuntimeException {
try {
return create(path, null, CreateMode.PERSISTENT_SEQUENTIAL);
} catch (ZkNoNodeException e) {
if (!createParents) {
throw e;
}
String parentDir = path.substring(0, path.lastIndexOf('/'));
createPersistent(parentDir, createParents);
return createPersistentSequential(path, createParents);
}
}

/**
* Create a persistent Sequential node.
*
* @param path
* @param data
* @param createParents if true all parent dirs are created as well and no
* {@link ZkNodeExistsException} is thrown in case the path already exists
* @throws ZkInterruptedException if operation was interrupted, or a
* required reconnection got interrupted
* @throws IllegalArgumentException if called from anything except the
* ZooKeeper event thread
* @throws ZkException if any ZooKeeper exception occurred
* @throws RuntimeException if any other exception occurs
*/
public String createPersistentSequential(String path, Object data, boolean createParents)
throws ZkInterruptedException,
IllegalArgumentException,
ZkException,
RuntimeException {
try {
return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
} catch (ZkNoNodeException e) {
if (!createParents) {
throw e;
}
String parentDir = path.substring(0, path.lastIndexOf('/'));
createPersistent(parentDir, createParents);
return createPersistentSequential(path, data, createParents);
}
}

/**
* Create a persistent Sequential node.
*
* @param path
* @param data
* @param createParents if true all parent dirs are created as well and no
* {@link ZkNodeExistsException} is thrown in case the path already exists
* @throws ZkInterruptedException if operation was interrupted, or a
* required reconnection got interrupted
* @throws IllegalArgumentException if called from anything except the
* ZooKeeper event thread
* @throws ZkException if any ZooKeeper exception occurred
* @throws RuntimeException if any other exception occurs
*/
public void createPersistent(String path, Object data, boolean createParents) throws ZkInterruptedException,
IllegalArgumentException, ZkException,
RuntimeException {
try {
create(path, data, CreateMode.PERSISTENT);
} catch (ZkNodeExistsException e) {
if (!createParents) {
throw e;
}
} catch (ZkNoNodeException e) {
if (!createParents) {
throw e;
}
String parentDir = path.substring(0, path.lastIndexOf('/'));
createPersistent(parentDir, createParents);
createPersistent(path, data, createParents);
}
}
}

4. CanalToStringStyle

4.1 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Otter项目内部使用的ToStringStyle
*
* <pre>
* 默认Style输出格式:
* Person[name=John Doe,age=33,smoker=false ,time=2010-04-01 00:00:00]
* </pre>
*
* @author jianghang 2010-6-18 上午11:35:27
*/
public class CanalToStringStyle extends ToStringStyle {

private static final long serialVersionUID = -6568177374288222145L;

private static final String DEFAULT_TIME = "yyyy-MM-dd HH:mm:ss";
private static final String DEFAULT_DAY = "yyyy-MM-dd";

/**
* <pre>
* 输出格式:
* Person[name=John Doe,age=33,smoker=false ,time=2010-04-01 00:00:00]
* </pre>
*/
public static final ToStringStyle TIME_STYLE = new OtterDateStyle(DEFAULT_TIME);

/**
* <pre>
* 输出格式:
* Person[name=John Doe,age=33,smoker=false ,day=2010-04-01]
* </pre>
*/
public static final ToStringStyle DAY_STYLE = new OtterDateStyle(DEFAULT_DAY);

/**
* <pre>
* 输出格式:
* Person[name=John Doe,age=33,smoker=false ,time=2010-04-01 00:00:00]
* </pre>
*/
public static final ToStringStyle DEFAULT_STYLE = CanalToStringStyle.TIME_STYLE;

// =========================== 自定义style =============================

/**
* 支持日期格式化的ToStringStyle
*
* @author li.jinl
*/
private static class OtterDateStyle extends ToStringStyle {

private static final long serialVersionUID = 5208917932254652886L;

// 日期format格式
private String pattern;

public OtterDateStyle(String pattern){
super();
this.setUseShortClassName(true);
this.setUseIdentityHashCode(false);
// 设置日期format格式
this.pattern = pattern;
}

protected void appendDetail(StringBuffer buffer, String fieldName, Object value) {
// 增加自定义的date对象处理
if (value instanceof Date) {
value = new SimpleDateFormat(pattern).format(value);
}
// 后续可以增加其他自定义对象处理
buffer.append(value);
}
}
}

4.2 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class Message implements Serializable {

private static final long serialVersionUID = 1234034768477580009L;
private long id;
private List<CanalEntry.Entry> entries = new ArrayList<>();
// row data for performance, see:
// https://github.com/alibaba/canal/issues/726
private boolean raw = true;
private List<ByteString> rawEntries = new ArrayList<>();

public Message(long id, List<Entry> entries){
this.id = id;
this.entries = entries == null ? new ArrayList<>() : entries;
this.raw = false;
}

public Message(long id, boolean raw, List entries){
this.id = id;
if (raw) {
this.rawEntries = entries == null ? new ArrayList<>() : entries;
} else {
this.entries = entries == null ? new ArrayList<>() : entries;
}
this.raw = raw;
}

public Message(long id){
this.id = id;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public List<Entry> getEntries() {
return entries;
}

public void setEntries(List<CanalEntry.Entry> entries) {
this.entries = entries;
}

public void addEntry(CanalEntry.Entry entry) {
this.entries.add(entry);
}

public void setRawEntries(List<ByteString> rawEntries) {
this.rawEntries = rawEntries;
}

public void addRawEntry(ByteString rawEntry) {
this.rawEntries.add(rawEntry);
}

public List<ByteString> getRawEntries() {
return rawEntries;
}

public boolean isRaw() {
return raw;
}

public void setRaw(boolean raw) {
this.raw = raw;
}

public String toString() {
return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
}

}

5. ExecutorTemplate

5.1 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
* 多线程执行器模板代码,otter中好多地方都写多线程,比较多的都是重复的逻辑代码,抽象一下做个模板把
*
* <pre>
* 示例代码:
* ExecutorTemplate template = new ExecutorTemplate(executor);
* ...
* try {
* for ( ....) {
* template.submit(new Runnable() {})
* }
*
* List<?> result = template.waitForResult();
* // do result
* } finally {
* template.clear();
* }
*
* 注意:该模板工程,不支持多业务并发调用,会出现数据混乱
* </pre>
*/
public class ExecutorTemplate {

private volatile ThreadPoolExecutor executor = null;
private volatile List<Future> futures = null;

public ExecutorTemplate(ThreadPoolExecutor executor){
this.futures = Collections.synchronizedList(new ArrayList<>());
this.executor = executor;
}

public void submit(Runnable task) {
Future future = executor.submit(task, null);
futures.add(future);
check(future);
}

public void submit(Callable task) {
Future future = executor.submit(task);
futures.add(future);
check(future);
}

private void check(Future future) {
if (future.isDone()) {
// 立即判断一次,因为使用了CallerRun可能当场跑出结果,针对有异常时快速响应,而不是等跑完所有的才抛异常
try {
future.get();
} catch (Throwable e) {
// 取消完之后立马退出
cacelAllFutures();
throw new RuntimeException(e);
}
}
}

public synchronized List<?> waitForResult() {
List result = new ArrayList();
RuntimeException exception = null;

for (Future future : futures) {
try {
result.add(future.get());
} catch (Throwable e) {
exception = new RuntimeException(e);
// 如何一个future出现了异常,就退出
break;
}
}

if (exception != null) {
cacelAllFutures();
throw exception;
} else {
return result;
}
}

public void cacelAllFutures() {
for (Future future : futures) {
if (!future.isDone() && !future.isCancelled()) {
future.cancel(true);
}
}
}

public void clear() {
futures.clear();
}

}

5.2 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < length; i++) {
com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
if (dataPartition != null) {
final int index = i;
template.submit(() -> {
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
CanalMessageSerializerUtil.serializer(dataPartition,
mqProperties.isFilterTransactionEntry()));
sendMessage(data, index);
});
}
}
// 等所有分片发送完毕
template.waitForResult();

6. 针对server的running节点的封装

6.1 源码

6.1.1 ServerRunningData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 服务端running状态信息
*
* @author jianghang 2012-11-22 下午03:11:30
* @version 1.0.0
*/
public class ServerRunningData implements Serializable {

private static final long serialVersionUID = 92260481691855281L;

@Deprecated
private Long cid;
private String address;
private boolean active = true;

public ServerRunningData(){
}

public ServerRunningData(String address){
this.address = address;
}

public Long getCid() {
return cid;
}

public void setCid(Long cid) {
this.cid = cid;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

public boolean isActive() {
return active;
}

public void setActive(boolean active) {
this.active = active;
}

public String toString() {
return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
}

}

6.1.2 ServerRunningListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 触发一下mainstem发生切换
*
* @author jianghang 2012-9-11 下午02:26:03
* @version 1.0.0
*/
public interface ServerRunningListener {

/**
* 启动时回调做点事情
*/
public void processStart();

/**
* 关闭时回调做点事情
*/
public void processStop();

/**
* 触发现在轮到自己做为active,需要载入上一个active的上下文数据
*/
public void processActiveEnter();

/**
* 触发一下当前active模式失败
*/
public void processActiveExit();

}

6.1.3 ServerRunningMonitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/**
* 针对server的running节点控制
*
* @author jianghang 2012-11-22 下午02:59:42
* @version 1.0.0
*/
public class ServerRunningMonitor extends AbstractCanalLifeCycle {

private static final Logger logger = LoggerFactory.getLogger(ServerRunningMonitor.class);
private ZkClientx zkClient;
private String destination;
private IZkDataListener dataListener;
private BooleanMutex mutex = new BooleanMutex(false);
private volatile boolean release = false;
// 当前服务节点状态信息
private ServerRunningData serverData;
// 当前实际运行的节点状态信息
private volatile ServerRunningData activeData;
private ScheduledExecutorService delayExecutor;
private int delayTime = 5;
private ServerRunningListener listener;

public ServerRunningMonitor(ServerRunningData serverData){
this();
this.serverData = serverData;
}

public ServerRunningMonitor(){
// 创建父节点
dataListener = new IZkDataListener() {

public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}

if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
releaseRunning();// 彻底释放mainstem
}

activeData = (ServerRunningData) runningData;
}

public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的状态就是本机,则即时触发一下active抢占
initRunning();
} else {
// 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
delayExecutor.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS);
}
}

};

}

public void init() {
processStart();
}

public synchronized void start() {
super.start();
try {
processStart();
if (zkClient != null) {
delayExecutor = Executors.newScheduledThreadPool(1);
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);

initRunning();
} else {
processActiveEnter();// 没有zk,直接启动
}
} catch (Exception e) {
logger.error("start failed", e);
// 没有正常启动,重置一下状态,避免干扰下一次start
stop();
}

}

public boolean release() {
if (zkClient != null) {
releaseRunning(); // 尝试一下release
return true;
} else {
processActiveExit(); // 没有zk,直接退出
return false;
}
}

public synchronized void stop() {
super.stop();

if (zkClient != null) {
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.unsubscribeDataChanges(path, dataListener);
if (delayExecutor != null) {
delayExecutor.shutdown();
delayExecutor = null;
}

releaseRunning(); // 尝试一下release
} else {
processActiveExit(); // 没有zk,直接启动
}
processStop();
}

private void initRunning() {
if (!isStart()) {
return;
}

String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();// 触发一下事件
mutex.set(true);
release = false;
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在节点,立即尝试一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
initRunning();
}
}

/**
* 阻塞等待自己成为active,如果自己成为active,立马返回
*
* @throws InterruptedException
*/
public void waitForActive() throws InterruptedException {
initRunning();
mutex.get();
}

/**
* 检查当前的状态
*/
public boolean check() {
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
try {
byte[] bytes = zkClient.readData(path);
ServerRunningData eventData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
activeData = eventData;// 更新下为最新值
// 检查下nid是否为自己
boolean result = isMine(activeData.getAddress());
if (!result) {
logger.warn("canal is running in node[{}] , but not in node[{}]",
activeData.getAddress(),
serverData.getAddress());
}
return result;
} catch (ZkNoNodeException e) {
logger.warn("canal is not run any in node");
return false;
} catch (ZkInterruptedException e) {
logger.warn("canal check is interrupt");
Thread.interrupted();// 清除interrupt标记
return check();
} catch (ZkException e) {
logger.warn("canal check is failed");
return false;
}
}

private boolean releaseRunning() {
if (check()) {
release = true;
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.delete(path);
mutex.set(false);
processActiveExit();
return true;
}

return false;
}

// ====================== helper method ======================

private boolean isMine(String address) {
return address.equals(serverData.getAddress());
}

private void processStart() {
if (listener != null) {
try {
listener.processStart();
} catch (Exception e) {
logger.error("processStart failed", e);
}
}
}

private void processStop() {
if (listener != null) {
try {
listener.processStop();
} catch (Exception e) {
logger.error("processStop failed", e);
}
}
}

private void processActiveEnter() {
if (listener != null) {
listener.processActiveEnter();
}
}

private void processActiveExit() {
if (listener != null) {
try {
listener.processActiveExit();
} catch (Exception e) {
logger.error("processActiveExit failed", e);
}
}
}

public void setListener(ServerRunningListener listener) {
this.listener = listener;
}

// ===================== setter / getter =======================

public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}

public void setServerData(ServerRunningData serverData) {
this.serverData = serverData;
}

public void setDestination(String destination) {
this.destination = destination;
}

public void setZkClient(ZkClientx zkClient) {
this.zkClient = zkClient;
}

}

6.1.4 ServerRunningMonitors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* {@linkplain ServerRunningMonitor}管理容器,使用static进行数据全局共享
*
* @author jianghang 2012-12-3 下午09:32:06
* @version 1.0.0
*/
public class ServerRunningMonitors {

private static ServerRunningData serverData;
private static Map runningMonitors; // <String,
// ServerRunningMonitor>

public static ServerRunningData getServerData() {
return serverData;
}

public static Map<String, ServerRunningMonitor> getRunningMonitors() {
return runningMonitors;
}

public static ServerRunningMonitor getRunningMonitor(String destination) {
return (ServerRunningMonitor) runningMonitors.get(destination);
}

public static void setServerData(ServerRunningData serverData) {
ServerRunningMonitors.serverData = serverData;
}

public static void setRunningMonitors(Map runningMonitors) {
ServerRunningMonitors.runningMonitors = runningMonitors;
}

}

6.2 使用

1
2
3
4
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.