1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| public synchronized void start() throws Throwable { String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); if (!"tcp".equalsIgnoreCase(serverMode)) { ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class); canalMQProducer = loader .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR); if (canalMQProducer != null) { canalMQProducer = new ProxyCanalMQProducer(canalMQProducer); canalMQProducer.init(properties); } }
if (canalMQProducer != null) { MQProperties mqProperties = canalMQProducer.getMqProperties(); System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true"); if (mqProperties.isFlatMessage()) { System.setProperty("canal.instance.memory.rawEntry", "false"); } }
logger.info("## start the canal server."); controller = new CanalController(properties); controller.start(); logger.info("## the canal server is running now ......"); shutdownThread = new Thread(() -> { try { logger.info("## stop the canal server"); controller.stop(); CanalLauncher.runningLatch.countDown(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal Server:", e); } finally { logger.info("## canal server is down."); } }); Runtime.getRuntime().addShutdownHook(shutdownThread);
if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); String destinations = CanalController.getDestinations(properties); canalMQStarter.start(destinations); controller.setCanalMQStarter(canalMQStarter); }
String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT); if (canalAdmin == null && StringUtils.isNotEmpty(port)) { String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER); String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); CanalAdminController canalAdmin = new CanalAdminController(this); canalAdmin.setUser(user); canalAdmin.setPasswd(passwd); String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}", port, user, passwd, ip);
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance(); canalAdminWithNetty.setCanalAdmin(canalAdmin); canalAdminWithNetty.setPort(Integer.parseInt(port)); canalAdminWithNetty.setIp(ip); canalAdminWithNetty.start(); this.canalAdmin = canalAdminWithNetty; }
running = true; }
|