1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private CanalMQProducer canalMQProducer = null;


public synchronized void start() throws Throwable {
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (!"tcp".equalsIgnoreCase(serverMode)) {
ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
canalMQProducer = loader
.getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
if (canalMQProducer != null) {
canalMQProducer = new ProxyCanalMQProducer(canalMQProducer);
canalMQProducer.init(properties);
}
}
}

1. ExtensionLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
/**
* SPI 类加载器
*
* @author rewerma 2018-8-19 下午11:30:49
* @version 1.0.0
*/
public class ExtensionLoader<T> {

private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);

private static final String SERVICES_DIRECTORY = "META-INF/services/";

private static final String CANAL_DIRECTORY = "META-INF/canal/";

private static final String DEFAULT_CLASSLOADER_POLICY = "internal";

private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");

private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();

private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();

private static final ConcurrentMap<String, Object> EXTENSION_KEY_INSTANCE = new ConcurrentHashMap<>();

private final Class<?> type;

private final String classLoaderPolicy;

private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>();

private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();

private String cachedDefaultName;

private ConcurrentHashMap<String, IllegalStateException> exceptions = new ConcurrentHashMap<>();

private static <T> boolean withExtensionAnnotation(Class<T> type) {
return type.isAnnotationPresent(SPI.class);
}

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
return getExtensionLoader(type, DEFAULT_CLASSLOADER_POLICY);
}

@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type, String classLoaderPolicy) {
if (type == null) throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @"
+ SPI.class.getSimpleName() + " Annotation!");
}

ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type, classLoaderPolicy));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

public ExtensionLoader(Class<?> type){
this.type = type;
this.classLoaderPolicy = DEFAULT_CLASSLOADER_POLICY;
}

public ExtensionLoader(Class<?> type, String classLoaderPolicy){
this.type = type;
this.classLoaderPolicy = classLoaderPolicy;
}

/**
* 返回指定名字的扩展
*
* @param name
* @return
*/
@SuppressWarnings("unchecked")
public T getExtension(String name, String spiDir, String standbyDir) {
if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension(spiDir, standbyDir);
}
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name, spiDir, standbyDir);
holder.set(instance);
}
}
}
return (T) instance;
}

@SuppressWarnings("unchecked")
public T getExtension(String name, String key, String spiDir, String standbyDir) {
if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension(spiDir, standbyDir);
}
String extKey = name + "-" + StringUtils.trimToEmpty(key);
Holder<Object> holder = cachedInstances.get(extKey);
if (holder == null) {
cachedInstances.putIfAbsent(extKey, new Holder<>());
holder = cachedInstances.get(extKey);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name, key, spiDir, standbyDir);
holder.set(instance);
}
}
}
return (T) instance;
}

/**
* 返回缺省的扩展,如果没有设置则返回<code>null</code>
*/
public T getDefaultExtension(String spiDir, String standbyDir) {
getExtensionClasses(spiDir, standbyDir);
if (null == cachedDefaultName || cachedDefaultName.length() == 0 || "true".equals(cachedDefaultName)) {
return null;
}
return getExtension(cachedDefaultName, spiDir, standbyDir);
}

@SuppressWarnings("unchecked")
public T createExtension(String name, String spiDir, String standbyDir) {
Class<?> clazz = getExtensionClasses(spiDir, standbyDir).get(name);
if (clazz == null) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+ ") could not be instantiated: class could not be found");
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+ ") could not be instantiated: " + t.getMessage(), t);
}
}

@SuppressWarnings("unchecked")
private T createExtension(String name, String key, String spiDir, String standbyDir) {
Class<?> clazz = getExtensionClasses(spiDir, standbyDir).get(name);
if (clazz == null) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+ ") could not be instantiated: class could not be found");
}
try {
T instance = (T) EXTENSION_KEY_INSTANCE.get(name + "-" + key);
if (instance == null) {
EXTENSION_KEY_INSTANCE.putIfAbsent(name + "-" + key, clazz.newInstance());
instance = (T) EXTENSION_KEY_INSTANCE.get(name + "-" + key);
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+ ") could not be instantiated: " + t.getMessage(), t);
}
}

private Map<String, Class<?>> getExtensionClasses(String spiDir, String standbyDir) {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses(spiDir, standbyDir);
cachedClasses.set(classes);
}
}
}

return classes;
}

private String getJarDirectoryPath() {
URL url = Thread.currentThread().getContextClassLoader().getResource("");
String dirtyPath;
if (url != null) {
dirtyPath = url.toString();
} else {
File file = new File("");
dirtyPath = file.getAbsolutePath();
}
String jarPath = dirtyPath.replaceAll("^.*file:/", ""); // removes
// file:/ and
// everything
// before it
jarPath = jarPath.replaceAll("jar!.*", "jar"); // removes everything
// after .jar, if .jar
// exists in dirtyPath
jarPath = jarPath.replaceAll("%20", " "); // necessary if path has
// spaces within
if (!jarPath.endsWith(".jar")) { // this is needed if you plan to run
// the app using Spring Tools Suit play
// button.
jarPath = jarPath.replaceAll("/classes/.*", "/classes/");
}
Path path = Paths.get(jarPath).getParent(); // Paths - from java 8
if (path != null) {
return path.toString();
}
return null;
}

private Map<String, Class<?>> loadExtensionClasses(String spiDir, String standbyDir) {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}

Map<String, Class<?>> extensionClasses = new HashMap<>();

if (spiDir != null && standbyDir != null) {
// 1. plugin folder,customized extension classLoader
// (jar_dir/plugin)
String dir = File.separator + this.getJarDirectoryPath() + spiDir; // +
// "plugin";

File externalLibDir = new File(dir);
if (!externalLibDir.exists()) {
externalLibDir = new File(File.separator + this.getJarDirectoryPath() + standbyDir);
}
logger.info("extension classpath dir: " + externalLibDir.getAbsolutePath());
if (externalLibDir.exists()) {
File[] files = externalLibDir.listFiles((dir1, name) -> name.endsWith(".jar"));
if (files != null) {
for (File f : files) {
URL url;
try {
url = f.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException("load extension jar failed!", e);
}

ClassLoader parent = Thread.currentThread().getContextClassLoader();
URLClassLoader localClassLoader;
if (classLoaderPolicy == null || "".equals(classLoaderPolicy)
|| DEFAULT_CLASSLOADER_POLICY.equalsIgnoreCase(classLoaderPolicy)) {
localClassLoader = new URLClassExtensionLoader(new URL[] { url });
} else {
localClassLoader = new URLClassLoader(new URL[] { url }, parent);
}

loadFile(extensionClasses, CANAL_DIRECTORY, localClassLoader);
loadFile(extensionClasses, SERVICES_DIRECTORY, localClassLoader);
}
}
}
}

// 2. load inner extension class with default classLoader
ClassLoader classLoader = findClassLoader();
loadFile(extensionClasses, CANAL_DIRECTORY, classLoader);
loadFile(extensionClasses, SERVICES_DIRECTORY, classLoader);

return extensionClasses;
}

private void loadFile(Map<String, Class<?>> extensionClasses, String dir, ClassLoader classLoader) {
String fileName = dir + type.getName();
try {
Enumeration<URL> urls;
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
try {
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(url.openStream(), StandardCharsets.UTF_8));
String line = null;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
Class<?> clazz = classLoader.loadClass(line);
// Class<?> clazz =
// Class.forName(line, true,
// classLoader);
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: "
+ type
+ ", class line: "
+ clazz.getName()
+ "), class "
+ clazz.getName()
+ "is not subtype of interface.");
} else {
try {
clazz.getConstructor(type);
} catch (NoSuchMethodException e) {
clazz.getConstructor();
String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
for (String n : names) {
if (!cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
cachedNames.remove(clazz);
throw new IllegalStateException("Duplicate extension "
+ type.getName()
+ " name " + n + " on "
+ c.getName() + " and "
+ clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: "
+ type
+ ", class line: "
+ line
+ ") in "
+ url
+ ", cause: "
+ t.getMessage(),
t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
if (reader != null) {
reader.close();
}
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
+ ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName
+ ").", t);
}
}

private static ClassLoader findClassLoader() {
return ExtensionLoader.class.getClassLoader();
}

@Override
public String toString() {
return this.getClass().getName() + "[" + type.getName() + "]";
}

private static class Holder<T> {

private volatile T value;

private void set(T value) {
this.value = value;
}

private T get() {
return value;
}

}
}

2. URLClassExtensionLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class URLClassExtensionLoader extends URLClassLoader {

public URLClassExtensionLoader(URL[] urls){
super(urls);
}

@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
Class<?> c = findLoadedClass(name);
if (c != null) {
return c;
}

if (name.startsWith("java.") || name.startsWith("org.slf4j.") || name.startsWith("org.apache.logging")
|| name.startsWith("org.apache.zookeeper.") || name.startsWith("org.I0Itec.zkclient.")
|| name.startsWith("org.apache.commons.logging.")) {
// || name.startsWith("org.apache.hadoop."))
// {
c = super.loadClass(name);
}
if (c != null) return c;

try {
// 先加载jar内的class,可避免jar冲突
c = findClass(name);
} catch (ClassNotFoundException e) {
c = null;
}
if (c != null) {
return c;
}

return super.loadClass(name);
}

@Override
public Enumeration<URL> getResources(String name) throws IOException {
@SuppressWarnings("unchecked")
Enumeration<URL>[] tmp = (Enumeration<URL>[]) new Enumeration<?>[2];

tmp[0] = findResources(name); // local class
// path first
// tmp[1] = super.getResources(name);

return new CompoundEnumeration<>(tmp);
}

private static class CompoundEnumeration<E> implements Enumeration<E> {

private Enumeration<E>[] enums;
private int index = 0;

public CompoundEnumeration(Enumeration<E>[] enums){
this.enums = enums;
}

private boolean next() {
while (this.index < this.enums.length) {
if (this.enums[this.index] != null && this.enums[this.index].hasMoreElements()) {
return true;
}

++this.index;
}

return false;
}

public boolean hasMoreElements() {
return this.next();
}

public E nextElement() {
if (!this.next()) {
throw new NoSuchElementException();
} else {
return this.enums[this.index].nextElement();
}
}
}
}

3. SPI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* SPI装载器注解
*
* @author rewerma @ 2018-10-20
* @version 1.0.0
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
public @interface SPI {

// Default SPI name
String value() default "";
}

4. CanalMQProducer

image-20250602221043198

4.1 AbstractMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* MQ producer 抽象类
*
* @author rewerma 2020-01-27
* @version 1.0.0
*/
public abstract class AbstractMQProducer implements CanalMQProducer {

protected MQProperties mqProperties;

protected ThreadPoolExecutor sendExecutor;
protected ThreadPoolExecutor buildExecutor;

@Override
public void init(Properties properties) {
// parse canal mq properties
loadCanalMqProperties(properties);

int parallelBuildThreadSize = mqProperties.getParallelBuildThreadSize();
buildExecutor = new ThreadPoolExecutor(parallelBuildThreadSize,
parallelBuildThreadSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(parallelBuildThreadSize * 2),
new NamedThreadFactory("MQ-Parallel-Builder"),
new ThreadPoolExecutor.CallerRunsPolicy());

int parallelSendThreadSize = mqProperties.getParallelSendThreadSize();
sendExecutor = new ThreadPoolExecutor(parallelSendThreadSize,
parallelSendThreadSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(parallelSendThreadSize * 2),
new NamedThreadFactory("MQ-Parallel-Sender"),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
public MQProperties getMqProperties() {
return this.mqProperties;
}

@Override
public void stop() {
if (buildExecutor != null) {
buildExecutor.shutdownNow();
}

if (sendExecutor != null) {
sendExecutor.shutdownNow();
}
}

/**
* 初始化配置
* <p>
* canal.mq.flat.message = true <br/>
* canal.mq.database.hash = true <br/>
* canal.mq.filter.transaction.entry = true <br/>
* canal.mq.parallel.build.thread.size = 8 <br/>
* canal.mq.parallel.send.thread.size = 8 <br/>
* canal.mq.batch.size = 50 <br/>
* canal.mq.timeout = 100 <br/>
* canal.mq.access.channel = local <br/>
* </p>
*
* @param properties 总配置对象
*/
private void loadCanalMqProperties(Properties properties) {
String flatMessage = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_FLAT_MESSAGE);
if (!StringUtils.isEmpty(flatMessage)) {
mqProperties.setFlatMessage(Boolean.parseBoolean(flatMessage));
}

String databaseHash = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_DATABASE_HASH);
if (!StringUtils.isEmpty(databaseHash)) {
mqProperties.setDatabaseHash(Boolean.parseBoolean(databaseHash));
}
String filterTranEntry = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_FILTER_TRANSACTION_ENTRY);
if (!StringUtils.isEmpty(filterTranEntry)) {
mqProperties.setFilterTransactionEntry(Boolean.parseBoolean(filterTranEntry));
}
String parallelBuildThreadSize = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_BUILD_THREAD_SIZE);
if (!StringUtils.isEmpty(parallelBuildThreadSize)) {
mqProperties.setParallelBuildThreadSize(Integer.parseInt(parallelBuildThreadSize));
}
String parallelSendThreadSize = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_SEND_THREAD_SIZE);
if (!StringUtils.isEmpty(parallelSendThreadSize)) {
mqProperties.setParallelSendThreadSize(Integer.parseInt(parallelSendThreadSize));
}
String batchSize = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_CANAL_BATCH_SIZE);
if (!StringUtils.isEmpty(batchSize)) {
mqProperties.setBatchSize(Integer.parseInt(batchSize));
}
String timeOut = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_CANAL_GET_TIMEOUT);
if (!StringUtils.isEmpty(timeOut)) {
mqProperties.setFetchTimeout(Integer.parseInt(timeOut));
}
String accessChannel = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_MQ_ACCESS_CHANNEL);
if (!StringUtils.isEmpty(accessChannel)) {
mqProperties.setAccessChannel(accessChannel);
}
String aliyunAccessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
if (!StringUtils.isEmpty(aliyunAccessKey)) {
mqProperties.setAliyunAccessKey(aliyunAccessKey);
}
String aliyunSecretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);
if (!StringUtils.isEmpty(aliyunSecretKey)) {
mqProperties.setAliyunSecretKey(aliyunSecretKey);
}
String aliyunUid = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_UID);
if (!StringUtils.isEmpty(aliyunUid)) {
mqProperties.setAliyunUid(Integer.parseInt(aliyunUid));
}
}

/**
* 兼容下<=1.1.4的mq配置项
*/
protected void doMoreCompatibleConvert(String oldKey, String newKey, Properties properties) {
String value = PropertiesUtils.getProperty(properties, oldKey);
if (StringUtils.isNotEmpty(value)) {
properties.setProperty(newKey, value);
}
}
}

4.2 CanalKafkaProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/**
* kafka producer SPI 实现
*
* @author rewerma 2018-6-11 下午05:30:49
* @version 1.0.0
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@SPI("kafka")
public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQProducer {

private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);

private static final String PREFIX_KAFKA_CONFIG = "kafka.";

private Producer<String, byte[]> producer;

@Override
public void init(Properties properties) {
KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();
this.mqProperties = kafkaProducerConfig;
super.init(properties);
// load properties
this.loadKafkaProperties(properties);

Properties kafkaProperties = new Properties();
kafkaProperties.putAll(kafkaProducerConfig.getKafkaProperties());
kafkaProperties.put("max.in.flight.requests.per.connection", 1);
kafkaProperties.put("key.serializer", StringSerializer.class);
if (kafkaProducerConfig.isKerberosEnabled()) {
File krb5File = new File(kafkaProducerConfig.getKrb5File());
File jaasFile = new File(kafkaProducerConfig.getJaasFile());
if (krb5File.exists() && jaasFile.exists()) {
// 配置kerberos认证,需要使用绝对路径
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
kafkaProperties.put("security.protocol", "SASL_PLAINTEXT");
kafkaProperties.put("sasl.kerberos.service.name", "kafka");
} else {
String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
logger.error(errorMsg);
throw new RuntimeException(errorMsg);
}
}
kafkaProperties.put("value.serializer", KafkaMessageSerializer.class);
producer = new KafkaProducer<>(kafkaProperties);
}

private void loadKafkaProperties(Properties properties) {
KafkaProducerConfig kafkaProducerConfig = (KafkaProducerConfig) this.mqProperties;
Map<String, Object> kafkaProperties = kafkaProducerConfig.getKafkaProperties();
// 兼容下<=1.1.4的mq配置
doMoreCompatibleConvert("canal.mq.servers", "kafka.bootstrap.servers", properties);
doMoreCompatibleConvert("canal.mq.acks", "kafka.acks", properties);
doMoreCompatibleConvert("canal.mq.compressionType", "kafka.compression.type", properties);
doMoreCompatibleConvert("canal.mq.retries", "kafka.retries", properties);
doMoreCompatibleConvert("canal.mq.batchSize", "kafka.batch.size", properties);
doMoreCompatibleConvert("canal.mq.lingerMs", "kafka.linger.ms", properties);
doMoreCompatibleConvert("canal.mq.maxRequestSize", "kafka.max.request.size", properties);
doMoreCompatibleConvert("canal.mq.bufferMemory", "kafka.buffer.memory", properties);
doMoreCompatibleConvert("canal.mq.kafka.kerberos.enable", "kafka.kerberos.enable", properties);
doMoreCompatibleConvert("canal.mq.kafka.kerberos.krb5.file", "kafka.kerberos.krb5.file", properties);
doMoreCompatibleConvert("canal.mq.kafka.kerberos.jaas.file", "kafka.kerberos.jaas.file", properties);

for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String key = (String) entry.getKey();
Object value = entry.getValue();
if (key.startsWith(PREFIX_KAFKA_CONFIG) && value != null) {
// check env config
value = PropertiesUtils.getProperty(properties, key);
key = key.substring(PREFIX_KAFKA_CONFIG.length());
kafkaProperties.put(key, value);
}
}
String kerberosEnabled = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
if (!StringUtils.isEmpty(kerberosEnabled)) {
kafkaProducerConfig.setKerberosEnabled(Boolean.parseBoolean(kerberosEnabled));
}
String krb5File = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5_FILE);
if (!StringUtils.isEmpty(krb5File)) {
kafkaProducerConfig.setKrb5File(krb5File);
}
String jaasFile = PropertiesUtils.getProperty(properties, KafkaConstants.CANAL_MQ_KAFKA_KERBEROS_JAAS_FILE);
if (!StringUtils.isEmpty(jaasFile)) {
kafkaProducerConfig.setJaasFile(jaasFile);
}
}

@Override
public void stop() {
try {
logger.info("## stop the kafka producer");
if (producer != null) {
producer.close();
}
super.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping kafka producer:", e);
} finally {
logger.info("## kafka producer is down.");
}
}

@Override
public void send(MQDestination mqDestination, Message message, Callback callback) {
ExecutorTemplate template = new ExecutorTemplate(sendExecutor);

try {
List result;
if (!StringUtils.isEmpty(mqDestination.getDynamicTopic())) {
// 动态topic路由计算,只是基于schema/table,不涉及proto数据反序列化
Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
mqDestination.getTopic(),
mqDestination.getDynamicTopic());

// 针对不同的topic,引入多线程提升效率
for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
final String topicName = entry.getKey().replace('.', '_');
final Message messageSub = entry.getValue();
template.submit((Callable) () -> {
try {
return send(mqDestination, topicName, messageSub, mqProperties.isFlatMessage());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

result = template.waitForResult();
} else {
result = new ArrayList();
List<Future> futures = send(mqDestination,
mqDestination.getTopic(),
message,
mqProperties.isFlatMessage());
result.add(futures);
}

// 一个批次的所有topic和分区的队列,都采用异步的模式进行多线程批量发送
// 最后在集结点进行flush等待,确保所有数据都写出成功
// 注意:kafka的异步模式如果要保证顺序性,需要设置max.in.flight.requests.per.connection=1,确保在网络异常重试时有排他性
producer.flush();
// flush操作也有可能是发送失败,这里需要异步关注一下发送结果,针对有异常的直接出发rollback
for (Object obj : result) {
List<Future> futures = (List<Future>) obj;
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

callback.commit();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
callback.rollback();
} finally {
template.clear();
}
}

private List<Future> send(MQDestination mqDestination, String topicName, Message message, boolean flat) {
List<ProducerRecord<String, byte[]>> records = new ArrayList<>();
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, mqDestination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = mqDestination.getPartitionsNum();
}
if (!flat) {
if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
// 并发构造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
partitionNum,
mqDestination.getPartitionHash(),
this.mqProperties.isDatabaseHash());
int length = messages.length;
for (int i = 0; i < length; i++) {
Message messagePartition = messages[i];
if (messagePartition != null) {
records.add(new ProducerRecord<>(topicName,
i,
null,
CanalMessageSerializerUtil.serializer(messagePartition,
mqProperties.isFilterTransactionEntry())));
}
}
} else {
final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0;
records.add(new ProducerRecord<>(topicName,
partition,
null,
CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry())));
}
} else {
// 发送扁平数据json
// 并发构造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
for (FlatMessage flatMessage : flatMessages) {
if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
mqDestination.getPartitionHash(),
this.mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
FlatMessage flatMessagePart = partitionFlatMessage[i];
if (flatMessagePart != null) {
records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart,
JSONWriter.Feature.WriteNulls,
JSONWriter.Feature.LargeObject)));
}
}
} else {
final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0;
records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage,
JSONWriter.Feature.WriteNulls,
JSONWriter.Feature.LargeObject)));
}
}
}

return produce(records);
}

private List<Future> produce(List<ProducerRecord<String, byte[]>> records) {
List<Future> futures = new ArrayList<>();
// 异步发送,因为在partition hash的时候已经按照每个分区合并了消息,走到这一步不需要考虑单个分区内的顺序问题
for (ProducerRecord record : records) {
futures.add(producer.send(record));
}

return futures;
}

}

4.3 CanalPulsarMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
/**
* PulsarMQ Producer SPI 实现
*
* @author chad 2021/9/2
* @version 1.0.0
*/
@SPI("pulsarmq")
public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQProducer {

/**
* 消息体分区属性名称
*/
public static final String MSG_PROPERTY_PARTITION_NAME = "partitionNum";
private static final Logger logger = LoggerFactory
.getLogger(CanalPulsarMQProducer.class);
private static final Map<String, Producer<byte[]>> PRODUCERS = new HashMap<>();
protected ThreadPoolExecutor sendPartitionExecutor;
/**
* pulsar客户端,管理连接
*/
protected PulsarClient client;
/**
* Pulsar admin 客户端
*/
protected PulsarAdmin pulsarAdmin;

@Override
public void init(Properties properties) {
// 加载配置
PulsarMQProducerConfig pulsarMQProducerConfig = new PulsarMQProducerConfig();
this.mqProperties = pulsarMQProducerConfig;
super.init(properties);
loadPulsarMQProperties(properties);

// 初始化连接客户端
try {
ClientBuilder builder = PulsarClient.builder()
// 填写pulsar的连接地址
.serviceUrl(pulsarMQProducerConfig.getServerUrl());
if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
// 角色权限认证的token
builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
}
if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getListenerName())) {
// listener name
builder.listenerName(pulsarMQProducerConfig.getListenerName());
}

client = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}

// 初始化Pulsar admin
if (StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
try {
pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}

// 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
parallelPartitionSendThreadSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
new ThreadPoolExecutor.CallerRunsPolicy());
}

/**
* 加载配置
*
* @param properties
* @return void
* @date 2021/9/15 11:22
* @author chad
* @since 1 by chad at 2021/9/15 新增
*/
private void loadPulsarMQProperties(Properties properties) {
PulsarMQProducerConfig tmpProperties = (PulsarMQProducerConfig) this.mqProperties;
String serverUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_SERVER_URL);
if (!StringUtils.isEmpty(serverUrl)) {
tmpProperties.setServerUrl(serverUrl);
}

String roleToken = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
if (!StringUtils.isEmpty(roleToken)) {
tmpProperties.setRoleToken(roleToken);
}
String topicTenantPrefix = PropertiesUtils.getProperty(properties,
PulsarMQConstants.PULSARMQ_TOPIC_TENANT_PREFIX);
if (!StringUtils.isEmpty(topicTenantPrefix)) {
tmpProperties.setTopicTenantPrefix(topicTenantPrefix);
}
String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL);
if (!StringUtils.isEmpty(adminServerUrl)) {
tmpProperties.setAdminServerUrl(adminServerUrl);
}
String listenerName = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_LISTENER_NAME);
if (!StringUtils.isEmpty(listenerName)) {
tmpProperties.setListenerName(listenerName);
}

String enableChunkingStr = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ENABLE_CHUNKING);
if (!StringUtils.isEmpty(enableChunkingStr)) {
tmpProperties.setEnableChunking(Boolean.parseBoolean(enableChunkingStr));
}

String compressionType = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_COMPRESSION_TYPE);
if (!StringUtils.isEmpty(compressionType)) {
tmpProperties.setCompressionType(compressionType);
}

if (logger.isDebugEnabled()) {
logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
}
}

/**
* 发送消息,处理的任务:
* <p>
* 1. 动态 Topic,根据schema.table或schema来匹配topic配置,将改变发送到指定的一个或多个具体的Topic<br/>
* 2. 使用线程池发送多个消息,单个消息不使用线程池
* </p>
*
* @param destination 消息目标信息
* @param message 消息
* @param callback 消息发送结果回调
* @return void
* @date 2021/9/2 22:01
* @author chad
* @since 1.0.0 by chad at 2021/9/2: 新增
*/
@Override
public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Message message, Callback callback) {

ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
try {
if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
// 动态topic
Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
.messageTopics(message, destination.getTopic(), destination.getDynamicTopic());

for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
String topicName = entry.getKey().replace('.', '_');
com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
template.submit(() -> {
try {
send(destination, topicName, messageSub);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

template.waitForResult();
} else {
send(destination, destination.getTopic(), message);
}

callback.commit();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
callback.rollback();
} finally {
template.clear();
}
}

/**
* 发送单条消息到指定topic。区分是否发送扁平消息
*
* @param destination
* @param topicName
* @param message
* @return void
* @date 2021/9/2 22:05
* @author chad
* @since 1.0.0 by chad at 2021/9/2: 新增
*/
public void send(final MQDestination destination, String topicName,
com.alibaba.otter.canal.protocol.Message message) {

// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
// 创建多分区topic
if (pulsarAdmin != null && partitionNum != null && partitionNum > 0 && PRODUCERS.get(topicName) == null) {
createMultipleTopic(topicName, partitionNum);
}

ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
if (!mqProperties.isFlatMessage()) {
// 动态计算目标分区
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
for (MQMessageUtils.EntryRowData r : datas) {
CanalEntry.Entry entry = r.entry;
if (null == entry) {
continue;
}
// 串行分区
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
// 发送
int len = messages.length;
for (int i = 0; i < len; i++) {
final int partition = i;
com.alibaba.otter.canal.protocol.Message m = messages[i];
template.submit(() -> {
sendMessage(topicName, partition, m);
});
}
}
} else {
// 默认分区
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
sendMessage(topicName, partition, message);
}
} else {
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());

// 初始化分区合并队列
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
int len = partitionNum;
for (int i = 0; i < len; i++) {
partitionFlatMessages.add(new ArrayList<>());
}

for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
// 增加null判断,issue #3267
if (partitionFlatMessage[i] != null) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
}

for (int i = 0; i < len; i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
if (flatMessagePart != null && flatMessagePart.size() > 0) {
final int partition = i;
template.submit(() -> {
// 批量发送
sendMessage(topicName, partition, flatMessagePart);
});
}
}

// 批量等所有分区的结果
template.waitForResult();
} else {
// 默认分区
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
sendMessage(topicName, partition, flatMessages);
}
}
}

/**
* 发送原始消息,需要做分区处理
*
* @param topic topic
* @param partitionNum 目标分区
* @param msg 原始消息内容
* @return void
* @date 2021/9/10 17:55
* @author chad
* @since 1 by chad at 2021/9/10 新增
*/
private void sendMessage(String topic, int partitionNum, com.alibaba.otter.canal.protocol.Message msg) {
Producer<byte[]> producer = getProducer(topic);
byte[] msgBytes = CanalMessageSerializerUtil.serializer(msg, mqProperties.isFilterTransactionEntry());
try {
MessageId msgResultId = producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partitionNum))
.value(msgBytes)
.send();
// todo 判断发送结果
if (logger.isDebugEnabled()) {
logger.debug("Send Message to topic:{} Result: {}", topic, msgResultId);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

/**
* 发送扁平消息
*
* @param topic topic主题
* @param flatMessages 扁平消息
* @return void
* @date 2021/9/10 18:22
* @author chad
* @since 1 by chad at 2021/9/10 新增
*/
private void sendMessage(String topic, int partition, List<FlatMessage> flatMessages) {
Producer<byte[]> producer = getProducer(topic);
for (FlatMessage f : flatMessages) {
try {
MessageId msgResultId = producer.newMessage()
.property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition))
.value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject))
.send()
//
;
if (logger.isDebugEnabled()) {
logger.debug("Send Messages to topic:{} Result: {}", topic, msgResultId);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}

/**
* 创建多分区topic
*
* @param topic
* @param partitionNum
*/
private void createMultipleTopic(String topic, Integer partitionNum) {
// 拼接topic前缀
PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
String prefix = pulsarMQProperties.getTopicTenantPrefix();
String fullTopic = topic;
if (!StringUtils.isEmpty(prefix)) {
if (!prefix.endsWith("/")) {
fullTopic = "/" + fullTopic;
}
fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic;
}

// 创建分区topic
try {
pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum);
} catch (PulsarAdminException e) {
// TODO 无论是否报错,都继续后续的操作,此处不进行阻塞
}
}

/**
* 获取topic
*
* @param topic
* @return
*/
private Producer<byte[]> getProducer(String topic) {
Producer producer = PRODUCERS.get(topic);
if (null == producer || !producer.isConnected()) {
try {
synchronized (PRODUCERS) {
producer = PRODUCERS.get(topic);
if (null != producer && producer.isConnected()) {
return producer;
}

// 拼接topic前缀
PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
String prefix = pulsarMQProperties.getTopicTenantPrefix();
String fullTopic = topic;
if (!StringUtils.isEmpty(prefix)) {
if (!prefix.endsWith("/")) {
fullTopic = "/" + fullTopic;
}
fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic;
}

// 创建指定topic的生产者
ProducerBuilder producerBuilder = client.newProducer();
if (pulsarMQProperties.getEnableChunking()) {
producerBuilder.enableChunking(true);
producerBuilder.enableBatching(false);
}

if (!StringUtils.isEmpty(pulsarMQProperties.getCompressionType())) {
switch (pulsarMQProperties.getCompressionType().toLowerCase()) {
case "lz4":
producerBuilder.compressionType(CompressionType.LZ4);
break;
case "zlib":
producerBuilder.compressionType(CompressionType.ZLIB);
break;
case "zstd":
producerBuilder.compressionType(CompressionType.ZSTD);
break;
case "snappy":
producerBuilder.compressionType(CompressionType.SNAPPY);
break;
}
}

producer = producerBuilder.topic(fullTopic)
// 指定路由器
.messageRouter(new MessageRouterImpl(topic))
.create();

// 放入缓存
PRODUCERS.put(topic, producer);
}
} catch (PulsarClientException e) {
logger.error("create producer failed for topic: " + topic, e);
throw new RuntimeException(e);
}
}

return producer;
}

@Override
public void stop() {
logger.info("## Stop PulsarMQ producer##");

for (Producer p : PRODUCERS.values()) {
try {
if (null != p && p.isConnected()) {
p.close();
}
} catch (PulsarClientException e) {
logger.warn("close producer name: {}, topic: {}, error: {}",
p.getProducerName(),
p.getTopic(),
e.getMessage());
}
}

super.stop();
}

/**
* Pulsar自定义路由策略
*
* @author chad
* @version 1
* @since 1 by chad at 2021/9/10 新增
* @since 2 by chad at 2021/9/17 修改为msg自带目标分区
*/
private static class MessageRouterImpl implements MessageRouter {

private String topicLocal;

public MessageRouterImpl(String topicLocal){
this.topicLocal = topicLocal;
}

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
String partitionStr = msg.getProperty(MSG_PROPERTY_PARTITION_NAME);
int partition = 0;
if (!StringUtils.isEmpty(partitionStr)) {
try {
partition = Integer.parseInt(partitionStr);
} catch (NumberFormatException e) {
logger
.warn("Parse msg {} property failed for value: {}", MSG_PROPERTY_PARTITION_NAME, partitionStr);
}
}
// topic创建时设置的分区数
Integer partitionNum = metadata.numPartitions();
// 如果 partition 超出 partitionNum,取余数
if (null != partitionNum && partition >= partitionNum) {
partition = partition % partitionNum;
}
return partition;
}
}
}

4.4 CanalRabbitMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/**
* RabbitMQ Producer SPI 实现
*
* @author rewerma 2020-01-27
* @version 1.0.0
*/
@SPI("rabbitmq")
public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQProducer {

private static final Logger logger = LoggerFactory.getLogger(CanalRabbitMQProducer.class);

private Connection connect;
private Channel channel;

@Override
public void init(Properties properties) {
RabbitMQProducerConfig rabbitMQProperties = new RabbitMQProducerConfig();
this.mqProperties = rabbitMQProperties;
super.init(properties);
loadRabbitMQProperties(properties);

ConnectionFactory factory = new ConnectionFactory();
String servers = rabbitMQProperties.getHost();
if (servers.startsWith("amqp")) {
try {
factory.setUri(servers);
} catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException ex) {
throw new CanalException("failed to parse host", ex);
}
} else if (servers.contains(":")) {
String[] serverHostAndPort = AddressUtils.splitIPAndPort(servers);
factory.setHost(serverHostAndPort[0]);
factory.setPort(Integer.parseInt(serverHostAndPort[1]));
} else {
factory.setHost(servers);
}

if (mqProperties.getAliyunAccessKey().length() > 0 && mqProperties.getAliyunSecretKey().length() > 0
&& mqProperties.getAliyunUid() > 0) {
factory.setCredentialsProvider(new AliyunCredentialsProvider(mqProperties.getAliyunAccessKey(),
mqProperties.getAliyunSecretKey(),
mqProperties.getAliyunUid()));
} else {
factory.setUsername(rabbitMQProperties.getUsername());
factory.setPassword(rabbitMQProperties.getPassword());
}
factory.setVirtualHost(rabbitMQProperties.getVirtualHost());
try {
connect = factory.newConnection();
channel = connect.createChannel();
String queue = rabbitMQProperties.getQueue();
String exchange = rabbitMQProperties.getExchange();
String deliveryMode = rabbitMQProperties.getDeliveryMode();
String routingKey = rabbitMQProperties.getRoutingKey();
if (!StringUtils.isEmpty(queue)) {
channel.queueDeclare(queue, true, false, false, null);
}
if (!StringUtils.isEmpty(queue) && !StringUtils.isEmpty(exchange) && !StringUtils.isEmpty(deliveryMode)
&& !StringUtils.isEmpty(routingKey)) {
channel.exchangeDeclare(exchange, deliveryMode, true, false, false, null);
channel.queueBind(queue, exchange, routingKey);
}
} catch (IOException | TimeoutException ex) {
throw new CanalException("Start RabbitMQ producer error", ex);
}
}

private void loadRabbitMQProperties(Properties properties) {
RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
// 兼容下<=1.1.4的mq配置
doMoreCompatibleConvert("canal.mq.servers", "rabbitmq.host", properties);

String host = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_HOST);
if (!StringUtils.isEmpty(host)) {
rabbitMQProperties.setHost(host);
}
String vhost = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_VIRTUAL_HOST);
if (!StringUtils.isEmpty(vhost)) {
rabbitMQProperties.setVirtualHost(vhost);
}
String exchange = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_EXCHANGE);
if (!StringUtils.isEmpty(exchange)) {
rabbitMQProperties.setExchange(exchange);
}
String username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
if (!StringUtils.isEmpty(username)) {
rabbitMQProperties.setUsername(username);
}
String password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
if (!StringUtils.isEmpty(password)) {
rabbitMQProperties.setPassword(password);
}
String queue = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_QUEUE);
if (!StringUtils.isEmpty(queue)) {
rabbitMQProperties.setQueue(queue);
}
String routingKey = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_ROUTING_KEY);
if (!StringUtils.isEmpty(routingKey)) {
rabbitMQProperties.setRoutingKey(routingKey);
}
String deliveryMode = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_DELIVERY_MODE);
if (!StringUtils.isEmpty(deliveryMode)) {
rabbitMQProperties.setDeliveryMode(deliveryMode);
}
}

@Override
public void send(final MQDestination destination, Message message, Callback callback) {
ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
try {
if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
// 动态topic
Map<String, Message> messageMap = MQMessageUtils
.messageTopics(message, destination.getTopic(), destination.getDynamicTopic());

for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
final String topicName = entry.getKey().replace('.', '_');
final com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();

template.submit(() -> send(destination, topicName, messageSub));
}

template.waitForResult();
} else {
send(destination, destination.getTopic(), message);
}
callback.commit();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
callback.rollback();
} finally {
template.clear();
}
}

private void send(MQDestination canalDestination, String topicName, Message messageSub) {
if (!mqProperties.isFlatMessage()) {
byte[] message = CanalMessageSerializerUtil.serializer(messageSub, mqProperties.isFilterTransactionEntry());
if (logger.isDebugEnabled()) {
logger.debug("send message:{} to destination:{}", message, canalDestination.getCanalDestination());
}
sendMessage(topicName, message);
} else {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(messageSub, buildExecutor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, messageSub.getId());
for (FlatMessage flatMessage : flatMessages) {
byte[] message = JSON
.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject);
if (logger.isDebugEnabled()) {
logger.debug("send message:{} to destination:{}", message, canalDestination.getCanalDestination());
}
sendMessage(topicName, message);
}
}

}

private void sendMessage(String queueName, byte[] message) {
// tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
try {
RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
channel.basicPublish(rabbitMQProperties.getExchange(),
queueName,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

@Override
public void stop() {
logger.info("## Stop RabbitMQ producer##");
try {
this.channel.close();
this.connect.close();
super.stop();
} catch (AlreadyClosedException ex) {
logger.error("Connection is already closed", ex);
} catch (IOException | TimeoutException ex) {
throw new CanalException("Stop RabbitMQ producer error", ex);
}

super.stop();
}
}

4.5 CanalRocketMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/**
* RocketMQ Producer SPI 实现
*
* @author rewerma 2020-01-27
* @version 1.0.0
*/
@SPI("rocketmq")
public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {

private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);

private DefaultMQProducer defaultMQProducer;
private static final String CLOUD_ACCESS_CHANNEL = "cloud";
private static final String NAMESPACE_SEPARATOR = "%";
protected ThreadPoolExecutor sendPartitionExecutor;

@Override
public void init(Properties properties) {
RocketMQProducerConfig rocketMQProperties = new RocketMQProducerConfig();
this.mqProperties = rocketMQProperties;
super.init(properties);
loadRocketMQProperties(properties);

RPCHook rpcHook = null;
if (mqProperties.getAliyunAccessKey().length() > 0 && mqProperties.getAliyunSecretKey().length() > 0) {
SessionCredentials sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey(mqProperties.getAliyunAccessKey());
sessionCredentials.setSecretKey(mqProperties.getAliyunSecretKey());
rpcHook = new AclClientRPCHook(sessionCredentials);
}

defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(),
rpcHook,
rocketMQProperties.isEnableMessageTrace(),
rocketMQProperties.getCustomizedTraceTopic());
if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())) {
defaultMQProducer.setAccessChannel(AccessChannel.CLOUD);
}
if (!StringUtils.isEmpty(rocketMQProperties.getNamespace())) {
defaultMQProducer.setNamespace(rocketMQProperties.getNamespace());
}
defaultMQProducer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetryTimesWhenSendFailed());
defaultMQProducer.setVipChannelEnabled(rocketMQProperties.isVipChannelEnabled());
logger.info("##Start RocketMQ producer##");
try {
defaultMQProducer.start();
} catch (MQClientException ex) {
throw new CanalException("Start RocketMQ producer error", ex);
}

int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
parallelPartitionSendThreadSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
new ThreadPoolExecutor.CallerRunsPolicy());
}

private void loadRocketMQProperties(Properties properties) {
RocketMQProducerConfig rocketMQProperties = (RocketMQProducerConfig) this.mqProperties;
// 兼容下<=1.1.4的mq配置
doMoreCompatibleConvert("canal.mq.servers", "rocketmq.namesrv.addr", properties);
doMoreCompatibleConvert("canal.mq.producerGroup", "rocketmq.producer.group", properties);
doMoreCompatibleConvert("canal.mq.namespace", "rocketmq.namespace", properties);
doMoreCompatibleConvert("canal.mq.retries", "rocketmq.retry.times.when.send.failed", properties);

String producerGroup = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_PRODUCER_GROUP);
if (!StringUtils.isEmpty(producerGroup)) {
rocketMQProperties.setProducerGroup(producerGroup);
}
String enableMessageTrace = PropertiesUtils.getProperty(properties,
RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
if (!StringUtils.isEmpty(enableMessageTrace)) {
rocketMQProperties.setEnableMessageTrace(Boolean.parseBoolean(enableMessageTrace));
}
String customizedTraceTopic = PropertiesUtils.getProperty(properties,
RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
if (!StringUtils.isEmpty(customizedTraceTopic)) {
rocketMQProperties.setCustomizedTraceTopic(customizedTraceTopic);
}
String namespace = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_NAMESPACE);
if (!StringUtils.isEmpty(namespace)) {
rocketMQProperties.setNamespace(namespace);
}
String namesrvAddr = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
if (!StringUtils.isEmpty(namesrvAddr)) {
rocketMQProperties.setNamesrvAddr(namesrvAddr);
}
String retry = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED);
if (!StringUtils.isEmpty(retry)) {
rocketMQProperties.setRetryTimesWhenSendFailed(Integer.parseInt(retry));
}
String vipChannelEnabled = PropertiesUtils.getProperty(properties,
RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
if (!StringUtils.isEmpty(vipChannelEnabled)) {
rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
}
String tag = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_TAG);
if (!StringUtils.isEmpty(tag)) {
rocketMQProperties.setTag(tag);
}
}

@Override
public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Message message, Callback callback) {
ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
try {
if (!StringUtils.isEmpty(destination.getDynamicTopic())) {
// 动态topic
Map<String, com.alibaba.otter.canal.protocol.Message> messageMap = MQMessageUtils
.messageTopics(message, destination.getTopic(), destination.getDynamicTopic());

for (Map.Entry<String, com.alibaba.otter.canal.protocol.Message> entry : messageMap.entrySet()) {
String topicName = entry.getKey().replace('.', '_');
com.alibaba.otter.canal.protocol.Message messageSub = entry.getValue();
template.submit(() -> {
try {
send(destination, topicName, messageSub);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

template.waitForResult();
} else {
send(destination, destination.getTopic(), message);
}

callback.commit();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
callback.rollback();
} finally {
template.clear();
}
}

public void send(final MQDestination destination, String topicName,
com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());

// 获取topic的队列数为分区数
if (partitionNum == null) {
partitionNum = getTopicDynamicQueuesSize(destination.getEnableDynamicQueuePartition(), topicName);
}

if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
if (!mqProperties.isFlatMessage()) {
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = messages.length;

ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < length; i++) {
com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
if (dataPartition != null) {
final int index = i;
template.submit(() -> {
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
CanalMessageSerializerUtil.serializer(dataPartition,
mqProperties.isFilterTransactionEntry()));
sendMessage(data, index);
});
}
}
// 等所有分片发送完毕
template.waitForResult();
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
sendMessage(data, partition);
}
} else {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
// 初始化分区合并队列
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
for (int i = 0; i < partitionNum; i++) {
partitionFlatMessages.add(new ArrayList<>());
}

for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
// 增加null判断,issue #3267
if (partitionFlatMessage[i] != null) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
}

ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < partitionFlatMessages.size(); i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
if (flatMessagePart != null && flatMessagePart.size() > 0) {
final int index = i;
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage,
JSONWriter.Feature.WriteNulls,
JSONWriter.Feature.LargeObject)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, index);
});
}
}

// 批量等所有分区的结果
template.waitForResult();
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
List<Message> messages = flatMessages.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, partition);
}
}
}

private void sendMessage(Message message, int partition) {
try {
SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {
if (partition >= mqs.size()) {
return mqs.get(partition % mqs.size());
} else {
return mqs.get(partition);
}
}, null);

if (logger.isDebugEnabled()) {
logger.debug("Send Message Result: {}", sendResult);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings("deprecation")
private void sendMessage(List<Message> messages, int partition) {
if (messages.isEmpty()) {
return;
}

// 获取一下messageQueue
DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
String topic = messages.get(0).getTopic();
if (StringUtils.isNotBlank(this.defaultMQProducer.getNamespace())) {
topic = this.defaultMQProducer.getNamespace() + NAMESPACE_SEPARATOR + topic;
}
TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(topic);
if (topicInfo == null) {
for (Message message : messages) {
sendMessage(message, partition);
}
} else {
// 批量发送
List<MessageQueue> queues = topicInfo.getMessageQueueList();
int size = queues.size();
if (size <= 0) {
// 可能是第一次创建
for (Message message : messages) {
sendMessage(message, partition);
}
} else {
MessageQueue queue;
if (partition >= size) {
queue = queues.get(partition % size);
} else {
queue = queues.get(partition);
}

try {
// 阿里云RocketMQ暂不支持批量发送消息,当canal.mq.flatMessage = true时,会发送失败
SendResult sendResult = this.defaultMQProducer.send(messages, queue);
if (logger.isDebugEnabled()) {
logger.debug("Send Message Result: {}", sendResult);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}

@Override
public void stop() {
logger.info("## Stop RocketMQ producer##");
this.defaultMQProducer.shutdown();
if (sendPartitionExecutor != null) {
sendPartitionExecutor.shutdownNow();
}

super.stop();
}

private Integer getTopicDynamicQueuesSize(Boolean enable, String topicName) {
if (enable != null && enable) {
topicName = this.defaultMQProducer.withNamespace(topicName);
DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(topicName);
if (topicInfo == null) {
return null;
} else {
return topicInfo.getMessageQueueList().size();
}
}
return null;
}
}

4.6 ProxyCanalMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ProxyCanalMQProducer implements CanalMQProducer {

private CanalMQProducer canalMQProducer;

public ProxyCanalMQProducer(CanalMQProducer canalMQProducer) {
this.canalMQProducer = canalMQProducer;
}

private ClassLoader changeCL() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
return cl;
}

private void revertCL(ClassLoader cl) {
Thread.currentThread().setContextClassLoader(cl);
}

@Override
public void init(Properties properties) {
ClassLoader cl = changeCL();
try {
canalMQProducer.init(properties);
} finally {
revertCL(cl);
}
}

@Override
public MQProperties getMqProperties() {
ClassLoader cl = changeCL();
try {
return canalMQProducer.getMqProperties();
} finally {
revertCL(cl);
}
}

@Override
public void send(MQDestination canalDestination, Message message, Callback callback) {
ClassLoader cl = changeCL();
try {
canalMQProducer.send(canalDestination, message, callback);
} finally {
revertCL(cl);
}
}

@Override
public void stop() {
ClassLoader cl = changeCL();
try {
canalMQProducer.stop();
} finally {
revertCL(cl);
}
}
}

5. CanalMsgConsumer

image-20250602222423227

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Canal/MQ consumer SPI 接口
*
* @author rewerma @ 2020-02-01
* @version 1.0.0
*/
@SPI("kafka")
public interface CanalMsgConsumer {

/**
* 初始化
*
* @param properties consumer properties
* @param topic topic/destination
* @param groupId mq group id
*/
void init(Properties properties, String topic, String groupId);

/**
* 连接Canal/MQ
*/
void connect();

/**
* 批量拉取数据
*
* @param timeout 超时时间
* @param unit 时间单位
* @return Message列表
*/
List<CommonMessage> getMessage(Long timeout, TimeUnit unit);

/**
* 提交
*/
void ack();

/**
* 回滚
*/
void rollback();

/**
* 断开连接
*/
void disconnect();
}

5.1 CanalKafkaConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* Kafka consumer SPI 实现
*
* @author rewerma @ 2020-02-01
* @version 1.0.0
*/
@SPI("kafka")
public class CanalKafkaConsumer implements CanalMsgConsumer {

private static final String PREFIX_KAFKA_CONFIG = "kafka.";

private KafkaConsumer<String, ?> kafkaConsumer;
private boolean flatMessage = true;
private String topic;

private Map<Integer, Long> currentOffsets = new ConcurrentHashMap<>();
private Properties kafkaProperties = new Properties();

@Override
public void init(Properties properties, String topic, String groupId) {
this.topic = topic;

Boolean flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
if (flatMessage != null) {
this.flatMessage = flatMessage;
}
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String k = (String) entry.getKey();
Object v = entry.getValue();
if (k.startsWith(PREFIX_KAFKA_CONFIG) && v != null) {
// check env config
v = PropertiesUtils.getProperty(properties, k);
kafkaProperties.put(k.substring(PREFIX_KAFKA_CONFIG.length()), v);
}
}
kafkaProperties.put("group.id", groupId);
kafkaProperties.put("key.deserializer", StringDeserializer.class);
kafkaProperties.put("client.id", UUID.randomUUID().toString().substring(0, 6));
}

@Override
public void connect() {
if (this.flatMessage) {
kafkaProperties.put("value.deserializer", StringDeserializer.class);
this.kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties);
} else {
kafkaProperties.put("value.deserializer", KafkaMessageDeserializer.class);
this.kafkaConsumer = new KafkaConsumer<String, Message>(kafkaProperties);
}
kafkaConsumer.subscribe(Collections.singletonList(topic));
}

@SuppressWarnings("unchecked")
@Override
public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
if (!flatMessage) {
ConsumerRecords<String, Message> records = (ConsumerRecords<String, Message>) kafkaConsumer.poll(unit.toMillis(timeout));
if (!records.isEmpty()) {
currentOffsets.clear();
List<CommonMessage> messages = new ArrayList<>();
for (ConsumerRecord<String, Message> record : records) {
if (currentOffsets.get(record.partition()) == null) {
currentOffsets.put(record.partition(), record.offset());
}
messages.addAll(MessageUtil.convert(record.value()));
}
return messages;
}
} else {
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) kafkaConsumer.poll(unit.toMillis(timeout));

if (!records.isEmpty()) {
List<CommonMessage> messages = new ArrayList<>();
currentOffsets.clear();
for (ConsumerRecord<String, String> record : records) {
if (currentOffsets.get(record.partition()) == null) {
currentOffsets.put(record.partition(), record.offset());
}
String flatMessageJson = record.value();
CommonMessage flatMessages = JSON.parseObject(flatMessageJson, CommonMessage.class);
messages.add(flatMessages);
}
return messages;
}
}
return null;
}

@Override
public void rollback() {
// 回滚所有分区
if (kafkaConsumer != null) {
for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
kafkaConsumer.commitSync();
}
}
}

@Override
public void ack() {
if (kafkaConsumer != null) {
kafkaConsumer.commitSync();
}
}

@Override
public void disconnect() {
if (kafkaConsumer != null) {
kafkaConsumer.unsubscribe();
}
if (kafkaConsumer != null) {
kafkaConsumer.close();
kafkaConsumer = null;
}
}
}

5.2 CanalPulsarMQConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/**
* Pulsar consumer SPI 实现
*
* @author rewerma @ 2020-02-01
* @version 1.0.0
*/
@SPI("pulsarmq")
public class CanalPulsarMQConsumer implements CanalMsgConsumer {

/**
* 连接pulsar客户端
*/
private PulsarClient pulsarClient;
private Consumer<byte[]> pulsarMQConsumer;
/**
* 是否为扁平消息
*/
private boolean flatMessage = false;
/**
* 主题名称
*/
private String topic;
/**
* 单线程控制
*/
private volatile Messages<byte[]> lastGetBatchMessage;
/**
* 环境连接URL
*/
private String serviceUrl;
/**
* 角色认证token
*/
private String roleToken;

/**
* listener name
*/
private String listenerName;

/**
* 订阅客户端名称
*/
private String subscriptName;
/**
* 每次批量获取数据的最大条目数,默认30
*/
private int batchSize = 30;
/**
* 与{@code batchSize}一起决定批量获取的数据大小 当:
* <p>
* 1. {@code batchSize} 条消息未消费时<br/>
* 2. 距上一次批量消费时间达到{@code batchTimeoutSeconds}秒时
* </p>
* 任一条件满足,即执行批量消费
*/
private int getBatchTimeoutSeconds = 30;
/**
* 批量处理消息时,一次批量处理的超时时间
* <p>
* 该时间应该根据{@code batchSize}和{@code batchTimeoutSeconds}合理设置
* </p>
*/
private long batchProcessTimeout = 60 * 1000;
/**
* 消费失败后的重试秒数,默认60秒
*/
private int redeliveryDelaySeconds = 60;
/**
* 当客户端接收到消息,30秒还没有返回ack给服务端时,ack超时,会重新消费该消息
*/
private int ackTimeoutSeconds = 30;
/**
* 是否开启消息失败重试功能,默认开启
*/
private boolean isRetry = true;
/**
* <p>
* true重试(-RETRY)和死信队列(-DLQ)后缀为大写,有些地方创建的为小写,需确保正确
* </p>
*/
private boolean isRetryDLQUpperCase = false;
/**
* 最大重试次数
*/
private int maxRedeliveryCount = 128;

@Override
public void init(Properties properties, String topic, String groupId) {
this.topic = topic;
String flatMessageStr = properties.getProperty(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
if (StringUtils.isNotEmpty(flatMessageStr)) {
this.flatMessage = Boolean.parseBoolean(flatMessageStr);
}
this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
this.listenerName = properties.getProperty(PulsarMQConstants.PULSARMQ_LISTENER_NAME);
this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
// 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
if (StringUtils.isEmpty(this.subscriptName)) {
this.subscriptName = groupId;
}

if (StringUtils.isEmpty(this.subscriptName)) {
throw new RuntimeException("Pulsar Consumer subscriptName required");
}
String batchSizeStr = properties.getProperty(CanalConstants.CANAL_MQ_CANAL_BATCH_SIZE);
if (StringUtils.isNotEmpty(batchSizeStr)) {
this.batchSize = Integer.parseInt(batchSizeStr);
}
String getBatchTimeoutSecondsStr = properties.getProperty(PulsarMQConstants.PULSARMQ_GET_BATCH_TIMEOUT_SECONDS);
if (StringUtils.isNotEmpty(getBatchTimeoutSecondsStr)) {
this.getBatchTimeoutSeconds = Integer.parseInt(getBatchTimeoutSecondsStr);
}
String batchProcessTimeoutStr = properties.getProperty(PulsarMQConstants.PULSARMQ_BATCH_PROCESS_TIMEOUT);
if (StringUtils.isNotEmpty(batchProcessTimeoutStr)) {
this.batchProcessTimeout = Integer.parseInt(batchProcessTimeoutStr);
}
String redeliveryDelaySecondsStr = properties.getProperty(PulsarMQConstants.PULSARMQ_REDELIVERY_DELAY_SECONDS);
if (StringUtils.isNotEmpty(redeliveryDelaySecondsStr)) {
this.redeliveryDelaySeconds = Integer.parseInt(redeliveryDelaySecondsStr);
}
String ackTimeoutSecondsStr = properties.getProperty(PulsarMQConstants.PULSARMQ_ACK_TIMEOUT_SECONDS);
if (StringUtils.isNotEmpty(ackTimeoutSecondsStr)) {
this.ackTimeoutSeconds = Integer.parseInt(ackTimeoutSecondsStr);
}
String isRetryStr = properties.getProperty(PulsarMQConstants.PULSARMQ_IS_RETRY);
if (StringUtils.isNotEmpty(isRetryStr)) {
this.isRetry = Boolean.parseBoolean(isRetryStr);
}
String isRetryDLQUpperCaseStr = properties.getProperty(PulsarMQConstants.PULSARMQ_IS_RETRY_DLQ_UPPERCASE);
if (StringUtils.isNotEmpty(isRetryDLQUpperCaseStr)) {
this.isRetryDLQUpperCase = Boolean.parseBoolean(isRetryDLQUpperCaseStr);
}
String maxRedeliveryCountStr = properties.getProperty(PulsarMQConstants.PULSARMQ_MAX_REDELIVERY_COUNT);
if (StringUtils.isNotEmpty(maxRedeliveryCountStr)) {
this.maxRedeliveryCount = Integer.parseInt(maxRedeliveryCountStr);
}
}

@Override
public void connect() {
if (isConsumerActive()) {
return;
}
// 连接创建客户端
try {
// AuthenticationDataProvider
ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl);
if (StringUtils.isNotEmpty(roleToken)) {
builder.authentication(AuthenticationFactory.token(roleToken));
}
if (StringUtils.isNotEmpty(listenerName)) {
builder.authentication(AuthenticationFactory.token(listenerName));
}
pulsarClient = builder.build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer();
if (MQUtil.isPatternTopic(this.topic)) {
// 正则只支持一个
builder.topicsPattern(this.topic);
} else {// 多个topic
builder.topic(this.topic);
}
// 为保证消息的有序性,仅支持单消费实例模式
// 灾备模式,一个分区只能有一个消费者,如果当前消费者不可用,自动切换到其他消费者
builder.subscriptionType(SubscriptionType.Failover);

builder
// 调用consumer.negativeAcknowledge(message) (即nack)来表示消费失败的消息
// 在指定的时间进行重新消费,默认是1分钟。
.negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS)
.subscriptionName(this.subscriptName);
if (this.isRetry) {
DeadLetterPolicy.DeadLetterPolicyBuilder dlqBuilder = DeadLetterPolicy.builder()
// 最大重试次数
.maxRedeliverCount(this.maxRedeliveryCount);
// 指定重试队列,不是多个或通配符topic才能判断重试队列
if (!MQUtil.isPatternTag(this.topic)) {
String retryTopic = this.topic + (this.isRetryDLQUpperCase ? "-RETRY" : "-retry");
dlqBuilder.retryLetterTopic(retryTopic);
String dlqTopic = this.topic + (this.isRetryDLQUpperCase ? "-DLQ" : "-dlq");
dlqBuilder.deadLetterTopic(dlqTopic);
}

// 默认关闭,如果需要重试则开启
builder.enableRetry(true).deadLetterPolicy(dlqBuilder.build());
}

// ack超时
builder.ackTimeout(this.ackTimeoutSeconds, TimeUnit.SECONDS);

// pulsar批量获取消息设置
builder.batchReceivePolicy(new BatchReceivePolicy.Builder().maxNumMessages(this.batchSize)
.timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS)
.build());

try {
this.pulsarMQConsumer = builder.subscribe();
} catch (PulsarClientException e) {
throw new CanalClientException("Subscript pulsar consumer error", e);
}
}

@SuppressWarnings("unchecked")
@Override
public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
List<CommonMessage> messageList = Lists.newArrayList();
try {
Messages<byte[]> messages = pulsarMQConsumer.batchReceive();
if (null == messages || messages.size() == 0) {
return messageList;
}
// 保存当前消费记录,用于ack和rollback
this.lastGetBatchMessage = messages;
for (org.apache.pulsar.client.api.Message<byte[]> msg : messages) {
byte[] data = msg.getData();
if (!this.flatMessage) {
Message message = CanalMessageSerializerUtil.deserializer(data);
List<CommonMessage> list = MessageUtil.convert(message);
messageList.addAll(list);
} else {
CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
messageList.add(commonMessage);
}
}
} catch (PulsarClientException e) {
throw new CanalClientException("Receive pulsar batch message error", e);
}

return messageList;
}

@Override
public void rollback() {
try {
if (isConsumerActive() && hasLastMessages()) {
// 回滚所有消息
this.pulsarMQConsumer.negativeAcknowledge(this.lastGetBatchMessage);
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void ack() {
try {
if (isConsumerActive() && hasLastMessages()) {
// 确认所有消息
this.pulsarMQConsumer.acknowledge(this.lastGetBatchMessage);
}
} catch (PulsarClientException e) {
if (isConsumerActive() && hasLastMessages()) {
this.pulsarMQConsumer.negativeAcknowledge(this.lastGetBatchMessage);
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void disconnect() {
if (null == this.pulsarMQConsumer || !this.pulsarMQConsumer.isConnected()) {
return;
}
try {
// 会导致暂停期间数据丢失
// this.pulsarMQConsumer.unsubscribe();
this.pulsarClient.close();
} catch (PulsarClientException e) {
throw new CanalClientException("Disconnect pulsar consumer error", e);
}
}

/**
* 是否消费可用
*
* @return true消费者可用
*/
private boolean isConsumerActive() {
return null != this.pulsarMQConsumer && this.pulsarMQConsumer.isConnected();
}

/**
* 是否有未确认消息
*
* @return true有正在消费的待确认消息
*/
private boolean hasLastMessages() {
return null != this.lastGetBatchMessage && this.lastGetBatchMessage.size() > 0;
}
}

5.3 CanalRabbitMQConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/**
* RabbitMQ consumer SPI 实现
*
* @author rewerma 2020-02-01
* @version 1.0.0
*/
@SPI("rabbitmq")
public class CanalRabbitMQConsumer implements CanalMsgConsumer {

private static final Logger logger = LoggerFactory
.getLogger(CanalRabbitMQConsumer.class);

// 链接地址
private String nameServer;
// 主机名
private String vhost;
private String queueName;

// 一些鉴权信息
private String accessKey;
private String secretKey;
private Long resourceOwnerId;
private String username;
private String password;

private boolean flatMessage;

private Connection connect;
private Channel channel;

private long batchProcessTimeout = 60 * 1000;
private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;

@Override
public void init(Properties properties, String topic, String groupId) {
this.nameServer = PropertiesUtils.getProperty(properties, "rabbitmq.host");
this.vhost = PropertiesUtils.getProperty(properties, "rabbitmq.virtual.host");
this.queueName = topic;
this.accessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
this.secretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);
this.username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
this.password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
Long resourceOwnerIdPro = (Long) properties.get(RabbitMQConstants.RABBITMQ_RESOURCE_OWNERID);
if (resourceOwnerIdPro != null) {
this.resourceOwnerId = resourceOwnerIdPro;
}
this.flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
}

@Override
public void connect() {
ConnectionFactory factory = new ConnectionFactory();
if (accessKey.length() > 0 && secretKey.length() > 0) {
factory.setCredentialsProvider(new AliyunCredentialsProvider(accessKey, secretKey, resourceOwnerId));
} else {
factory.setUsername(username);
factory.setPassword(password);
}
// 解析出端口 modified by 16075140
if (nameServer != null && nameServer.contains(":")) {
String[] serverHostAndPort = AddressUtils.splitIPAndPort(nameServer);
factory.setHost(serverHostAndPort[0]);
factory.setPort(Integer.parseInt(serverHostAndPort[1]));
} else {
factory.setHost(nameServer);
}

factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(vhost);
try {
connect = factory.newConnection();
channel = connect.createChannel();
} catch (IOException | TimeoutException e) {
throw new CanalClientException("Start RabbitMQ producer error", e);
}

// 不存在连接 则重新连接
if (connect == null) {
this.connect();
}

Consumer consumer = new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {

if (body != null) {
channel.basicAck(envelope.getDeliveryTag(), process(body));
}
}
};
try {
channel.basicConsume(queueName, false, consumer);
} catch (IOException e) {
throw new CanalClientException("error", e);
}
}

private boolean process(byte[] messageData) {
if (logger.isDebugEnabled()) {
logger.debug("Get Message: {}", new String(messageData));
}
List<CommonMessage> messageList = new ArrayList<>();
if (!flatMessage) {
Message message = CanalMessageSerializerUtil.deserializer(messageData);
messageList.addAll(MessageUtil.convert(message));
} else {
CommonMessage commonMessage = JSON.parseObject(messageData, CommonMessage.class);
messageList.add(commonMessage);
}
ConsumerBatchMessage<CommonMessage> batchMessage = new ConsumerBatchMessage<>(messageList);
try {
messageBlockingQueue.put(batchMessage);
} catch (InterruptedException e) {
logger.error("Put message to queue error", e);
throw new RuntimeException(e);
}
boolean isCompleted;
try {
isCompleted = batchMessage.waitFinish(batchProcessTimeout);
} catch (InterruptedException e) {
logger.error("Interrupted when waiting messages to be finished.", e);
throw new RuntimeException(e);
}
boolean isSuccess = batchMessage.isSuccess();
return isCompleted && isSuccess;
}

@Override
public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
try {
if (this.lastGetBatchMessage != null) {
throw new CanalClientException("mq get/ack not support concurrent & async ack");
}

ConsumerBatchMessage<CommonMessage> batchMessage = messageBlockingQueue.poll(timeout, unit);
if (batchMessage != null) {
this.lastGetBatchMessage = batchMessage;
return batchMessage.getData();
}
} catch (InterruptedException ex) {
logger.warn("Get message timeout", ex);
throw new CanalClientException("Failed to fetch the data after: " + timeout);
}
return null;
}

@Override
public void rollback() {
try {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.fail();
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void ack() {
try {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.ack();
}
} catch (Throwable e) {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.fail();
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void disconnect() {
if (channel != null) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
throw new CanalClientException("stop channel error", e);
}
}

if (connect != null) {
try {
connect.close();
} catch (IOException e) {
throw new CanalClientException("stop connect error", e);
}
}
}
}

5.4 CanalRocketMQConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/**
* RocketMQ consumer SPI 实现
*
* @author rewerma 2020-02-01
* @version 1.0.0
*/
@SPI("rocketmq")
public class CanalRocketMQConsumer implements CanalMsgConsumer {

private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQConsumer.class);
private static final String CLOUD_ACCESS_CHANNEL = "cloud";

private String nameServer;
private String topic;
private String groupName;
private DefaultMQPushConsumer rocketMQConsumer;
private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
private int batchSize = -1;
private long batchProcessTimeout = 60 * 1000;
private boolean flatMessage;
private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;
private String accessKey;
private String secretKey;
private String customizedTraceTopic;
private boolean enableMessageTrace = false;
private String accessChannel;
private String namespace;
private String filter = "*";

@Override
public void init(Properties properties, String topic, String groupName) {
this.topic = topic;
this.groupName = groupName;
this.flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
this.accessKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
this.secretKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_SECRET_KEY);
String enableMessageTrace = properties.getProperty(RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
if (StringUtils.isNotEmpty(enableMessageTrace)) {
this.enableMessageTrace = Boolean.parseBoolean(enableMessageTrace);
}
this.customizedTraceTopic = properties.getProperty(RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
this.accessChannel = properties.getProperty(RocketMQConstants.ROCKETMQ_ACCESS_CHANNEL);
this.namespace = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESPACE);
this.nameServer = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
String batchSize = properties.getProperty(RocketMQConstants.ROCKETMQ_BATCH_SIZE);
if (StringUtils.isNotEmpty(batchSize)) {
this.batchSize = Integer.parseInt(batchSize);
}
String subscribeFilter = properties.getProperty(RocketMQConstants.ROCKETMQ_SUBSCRIBE_FILTER);
if (StringUtils.isNotEmpty(subscribeFilter)) {
this.filter = subscribeFilter;
}
}

@Override
public void connect() {
RPCHook rpcHook = null;
if (null != accessKey && accessKey.length() > 0 && null != secretKey && secretKey.length() > 0) {
SessionCredentials sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey(accessKey);
sessionCredentials.setSecretKey(secretKey);
rpcHook = new AclClientRPCHook(sessionCredentials);
}

rocketMQConsumer = new DefaultMQPushConsumer(groupName,
rpcHook,
new AllocateMessageQueueAveragely(),
enableMessageTrace,
customizedTraceTopic);
rocketMQConsumer.setVipChannelEnabled(false);
if (CLOUD_ACCESS_CHANNEL.equals(this.accessChannel)) {
rocketMQConsumer.setAccessChannel(AccessChannel.CLOUD);
}

if (!StringUtils.isEmpty(this.namespace)) {
rocketMQConsumer.setNamespace(this.namespace);
}

if (!StringUtils.isBlank(nameServer)) {
rocketMQConsumer.setNamesrvAddr(nameServer);
}
if (batchSize != -1) {
rocketMQConsumer.setConsumeMessageBatchMaxSize(batchSize);
}

try {
if (rocketMQConsumer == null) {
this.connect();
}
rocketMQConsumer.subscribe(this.topic, this.filter);
rocketMQConsumer.registerMessageListener((MessageListenerOrderly) (messageExts, context) -> {
context.setAutoCommit(true);
boolean isSuccess = process(messageExts);
if (isSuccess) {
return ConsumeOrderlyStatus.SUCCESS;
} else {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});
rocketMQConsumer.start();
} catch (MQClientException ex) {
logger.error("Start RocketMQ consumer error", ex);
}
}

private boolean process(List<MessageExt> messageExts) {
if (logger.isDebugEnabled()) {
logger.debug("Get Message: {}", messageExts);
}
List<CommonMessage> messageList = new ArrayList<>();
for (MessageExt messageExt : messageExts) {
byte[] data = messageExt.getBody();
if (data != null) {
try {
if (!flatMessage) {
Message message = CanalMessageSerializerUtil.deserializer(data);
messageList.addAll(MessageUtil.convert(message));
} else {
CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
messageList.add(commonMessage);
}
} catch (Exception ex) {
logger.error("Add message error", ex);
throw new CanalClientException(ex);
}
} else {
logger.warn("Received message data is null");
}
}
ConsumerBatchMessage<CommonMessage> batchMessage = new ConsumerBatchMessage<>(messageList);
try {
messageBlockingQueue.put(batchMessage);
} catch (InterruptedException e) {
logger.error("Put message to queue error", e);
throw new RuntimeException(e);
}
boolean isCompleted;
try {
isCompleted = batchMessage.waitFinish(batchProcessTimeout);
} catch (InterruptedException e) {
logger.error("Interrupted when waiting messages to be finished.", e);
throw new RuntimeException(e);
}
boolean isSuccess = batchMessage.isSuccess();
return isCompleted && isSuccess;
}

@Override
public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
try {
if (this.lastGetBatchMessage != null) {
throw new CanalClientException("mq get/ack not support concurrent & async ack");
}

ConsumerBatchMessage<CommonMessage> batchMessage = messageBlockingQueue.poll(timeout, unit);
if (batchMessage != null) {
this.lastGetBatchMessage = batchMessage;
return batchMessage.getData();
}
} catch (InterruptedException ex) {
logger.warn("Get message timeout", ex);
throw new CanalClientException("Failed to fetch the data after: " + timeout);
}
return null;
}

@Override
public void rollback() {
try {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.fail();
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void ack() {
try {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.ack();
}
} catch (Throwable e) {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.fail();
}
} finally {
this.lastGetBatchMessage = null;
}
}

@Override
public void disconnect() {
rocketMQConsumer.unsubscribe(topic);
rocketMQConsumer.shutdown();
}
}

5.5 CanalTCPConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* TCP 消费者连接器, 一个destination对应一个SPI实例
*
* @author rewerma 2020-01-30
* @author XuDaojie
* @version 1.1.5
* @since 1.1.5
*/
@SPI("tcp")
public class CanalTCPConsumer implements CanalMsgConsumer {

private Long currentBatchId = null;
private CanalConnector canalConnector;
private int batchSize = 500;

@Override
public void init(Properties properties, String destination, String groupId) {
// load config
String host = properties.getProperty(TCPConstants.CANAL_TCP_HOST);
String username = properties.getProperty(TCPConstants.CANAL_TCP_USERNAME);
String password = properties.getProperty(TCPConstants.CANAL_TCP_PASSWORD);
String zkHosts = properties.getProperty(TCPConstants.CANAL_TCP_ZK_HOSTS);
String batchSizePro = properties.getProperty(TCPConstants.CANAL_TCP_BATCH_SIZE);
if (batchSizePro != null) {
batchSize = Integer.parseInt(batchSizePro);
}
if (StringUtils.isNotBlank(host)) {
String[] ipPort = AddressUtils.splitIPAndPort(host);
SocketAddress sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
this.canalConnector = new SimpleCanalConnector(sa, username, password, destination);
} else {
this.canalConnector = new ClusterCanalConnector(username,
password,
destination,
new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkHosts)));
}
}

@Override
public void connect() {
canalConnector.connect();
canalConnector.subscribe();
}

@Override
public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
try {
Message message = canalConnector.getWithoutAck(batchSize, timeout, unit);
long batchId = message.getId();
currentBatchId = batchId;
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
return null;
} else {
return MessageUtil.convert(message);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

@Override
public void rollback() {
if (currentBatchId != null && currentBatchId != -1) {
canalConnector.rollback(currentBatchId);
currentBatchId = null;
}
}

@Override
public void ack() {
if (currentBatchId != null) {
canalConnector.ack(currentBatchId);
currentBatchId = null;
}
}

@Override
public void disconnect() {
// tcp模式下,因为是单tcp消费,避免adapter异常断开时直接unsubscribe
// unsubscribe发送给canal-server会导致清理cursor位点,如果此时canal-server出现重启,就会丢失binlog数据
// canalConnector.unsubscribe();
canalConnector.disconnect();
}
}

5.6 ProxyCanalMsgConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class ProxyCanalMsgConsumer implements CanalMsgConsumer {

private CanalMsgConsumer canalMsgConsumer;

public ProxyCanalMsgConsumer(CanalMsgConsumer canalMsgConsumer) {
this.canalMsgConsumer = canalMsgConsumer;
}

private ClassLoader changeCL() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(canalMsgConsumer.getClass().getClassLoader());
return cl;
}

private void revertCL(ClassLoader cl) {
Thread.currentThread().setContextClassLoader(cl);
}


@Override
public void init(Properties properties, String topic, String groupId) {
ClassLoader cl = changeCL();
try {
canalMsgConsumer.init(properties, topic, groupId);
} finally {
revertCL(cl);
}
}

@Override
public void connect() {
ClassLoader cl = changeCL();
try {
canalMsgConsumer.connect();
} finally {
revertCL(cl);
}
}

@Override
public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
ClassLoader cl = changeCL();
try {
return canalMsgConsumer.getMessage(timeout, unit);
} finally {
revertCL(cl);
}
}

@Override
public void ack() {
ClassLoader cl = changeCL();
try {
canalMsgConsumer.ack();
} finally {
revertCL(cl);
}
}

@Override
public void rollback() {
ClassLoader cl = changeCL();
try {
canalMsgConsumer.rollback();
} finally {
revertCL(cl);
}
}

@Override
public void disconnect() {
ClassLoader cl = changeCL();
try {
canalMsgConsumer.disconnect();
} finally {
revertCL(cl);
}
}
}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.