Kafka Connect + Zookepeer 未连接
Kafka Connect + Zookepeer not connecting
我有一个部署在kafka集群中的kafka connect插件(在独立模式下,只是为了测试,想法是分布式的)。这个 kafka connect 插件使用 curator 连接到集群的 zookeper,并从中提取一些信息来决定如何处理消息。
代码是这样的:
protected CuratorFramework getCurator(final String zkConnection) {
final CuratorFramework curator = CuratorFrameworkFactory.newClient(zkConnection,
new ExponentialBackoffRetry(1000, 3));
curator.start();
return curator;
}
treeCache = new TreeCache(curator, settings.getConfigurationRoot());
...
treeCache.start()
它在 treeCache 启动时超时,本地 zookeeper 中存在配置根路径(确认在 zookeeper 中执行 ls-shell,对于我尝试过的 zkConnection 字符串:
- localhost:2181(zookeeper 在那个端口 运行)
- localhost:2181, localhost:2182, localhost:2183
- :2181
- :2181,:2182,:2183
- 127.0.0.1:2181
...等等
云中的 kafka 流模块 运行 使用同一段代码与该动物园管理员连接...知道这里发生了什么吗?
谢谢!!
发现错误!我在 try-with-resources 语句中调用了 getCurator,这使馆长在启动 treeCache 之前关闭,一旦我将馆长移出 try-with-resources 语句并在 SinkTask 的停止方法中关闭它, 一切正常。
干杯!
我有一个部署在kafka集群中的kafka connect插件(在独立模式下,只是为了测试,想法是分布式的)。这个 kafka connect 插件使用 curator 连接到集群的 zookeper,并从中提取一些信息来决定如何处理消息。
代码是这样的:
protected CuratorFramework getCurator(final String zkConnection) {
final CuratorFramework curator = CuratorFrameworkFactory.newClient(zkConnection,
new ExponentialBackoffRetry(1000, 3));
curator.start();
return curator;
}
treeCache = new TreeCache(curator, settings.getConfigurationRoot());
...
treeCache.start()
它在 treeCache 启动时超时,本地 zookeeper 中存在配置根路径(确认在 zookeeper 中执行 ls-shell,对于我尝试过的 zkConnection 字符串:
- localhost:2181(zookeeper 在那个端口 运行)
- localhost:2181, localhost:2182, localhost:2183
- :2181
- :2181,:2182,:2183
- 127.0.0.1:2181 ...等等
云中的 kafka 流模块 运行 使用同一段代码与该动物园管理员连接...知道这里发生了什么吗?
谢谢!!
发现错误!我在 try-with-resources 语句中调用了 getCurator,这使馆长在启动 treeCache 之前关闭,一旦我将馆长移出 try-with-resources 语句并在 SinkTask 的停止方法中关闭它, 一切正常。
干杯!