无法连接到 Kafka docker 实例
Cannot connect to the Kafka docker instance
我正在使用 bitnami/kafka:最新和 bitnami/zookeper:最新图像。
我在 Github 中的示例项目:https://github.com/KostasD21/kafka-docker-demo
如果你 运行 命令 docker-compose up 然后 运行 应用程序,你会看到 Spring Boot 应用程序无法连接到 kafka 容器(目前 运行ning 在主机 kafka:9092 下)
来自应用程序的堆栈跟踪:
2022-05-31 08:59:18.255 WARN 144414 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Error connecting to node kafka:9092 (id: 1001 rack: null)
java.net.UnknownHostException: kafka
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) ~[na:na]
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1367) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1301) ~[na:na]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access0(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.access0(NetworkClient.java:73) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1520) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247) ~[spring-kafka-2.8.4.jar:2.8.4]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
来自 docker 容器的日志:
2022-05-31 05:51:24,236] INFO [RequestSendThread controllerId=1001] Starting (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently active brokers in the cluster: Set(1001) (kafka.controller.KafkaController)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Current list of topics in the cluster: Set(__consumer_offsets, test-topic) (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Fetching topic deletions in progress (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics to be deleted: (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics ineligible for deletion: (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] Initializing topic deletion manager (kafka.controller.KafkaController)[2022-05-31 05:51:24,242] INFO [Topic Deletion Manager 1001] Initializing manager with initial deletions: Set(), initial ineligible deletions: Set() (kafka.controller.TopicDeletionManager)[2022-05-31 05:51:24,242] INFO [Controller id=1001] Sending update metadata request (kafka.controller.KafkaController)[2022-05-31 05:51:24,243] INFO [ExpirationReaper--1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2022-05-31 05:51:24,244] INFO [Controller id=1001 epoch=3] Sending UpdateMetadata request to brokers Set(1001) for 0 partitions (state.change.logger)[2022-05-31 05:51:24,249] INFO [ReplicaStateMachine controllerId=1001] Initializing replica state (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,252] INFO [ReplicaStateMachine controllerId=1001] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,253] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to kafka:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,258] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
目前卡在这个通讯中。问题可能与 docker 网络有关,但我不是这方面的专家。
检查这个答案:
从您的应用程序的角度来看,Kafka 正在侦听 localhost
(kafka
主机名如果它是 运行 在同一个 docker 网络中)
本质上,您需要添加一个协议映射器和一个侦听器来宣布可以在本地主机访问 kafka(请参阅前面的答案),将您的 kafka 容器绑定到 29092
的端口更改为 docker-compose 文件)并让您的应用程序连接到 localhost:29092
我正在使用 bitnami/kafka:最新和 bitnami/zookeper:最新图像。
我在 Github 中的示例项目:https://github.com/KostasD21/kafka-docker-demo
如果你 运行 命令 docker-compose up 然后 运行 应用程序,你会看到 Spring Boot 应用程序无法连接到 kafka 容器(目前 运行ning 在主机 kafka:9092 下)
来自应用程序的堆栈跟踪:
2022-05-31 08:59:18.255 WARN 144414 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Error connecting to node kafka:9092 (id: 1001 rack: null)
java.net.UnknownHostException: kafka
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801) ~[na:na]
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1367) ~[na:na]
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1301) ~[na:na]
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access0(ClusterConnectionStates.java:468) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:979) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.access0(NetworkClient.java:73) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-3.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1520) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-2.8.4.jar:2.8.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247) ~[spring-kafka-2.8.4.jar:2.8.4]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
来自 docker 容器的日志:
2022-05-31 05:51:24,236] INFO [RequestSendThread controllerId=1001] Starting (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently active brokers in the cluster: Set(1001) (kafka.controller.KafkaController)[2022-05-31 05:51:24,236] INFO [Controller id=1001] Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Current list of topics in the cluster: Set(__consumer_offsets, test-topic) (kafka.controller.KafkaController)[2022-05-31 05:51:24,237] INFO [Controller id=1001] Fetching topic deletions in progress (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics to be deleted: (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] List of topics ineligible for deletion: (kafka.controller.KafkaController)[2022-05-31 05:51:24,241] INFO [Controller id=1001] Initializing topic deletion manager (kafka.controller.KafkaController)[2022-05-31 05:51:24,242] INFO [Topic Deletion Manager 1001] Initializing manager with initial deletions: Set(), initial ineligible deletions: Set() (kafka.controller.TopicDeletionManager)[2022-05-31 05:51:24,242] INFO [Controller id=1001] Sending update metadata request (kafka.controller.KafkaController)[2022-05-31 05:51:24,243] INFO [ExpirationReaper--1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2022-05-31 05:51:24,244] INFO [Controller id=1001 epoch=3] Sending UpdateMetadata request to brokers Set(1001) for 0 partitions (state.change.logger)[2022-05-31 05:51:24,249] INFO [ReplicaStateMachine controllerId=1001] Initializing replica state (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,252] INFO [ReplicaStateMachine controllerId=1001] Triggering online replica state changes (kafka.controller.ZkReplicaStateMachine)[2022-05-31 05:51:24,253] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to kafka:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)[2022-05-31 05:51:24,258] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
目前卡在这个通讯中。问题可能与 docker 网络有关,但我不是这方面的专家。
检查这个答案:
从您的应用程序的角度来看,Kafka 正在侦听 localhost
(kafka
主机名如果它是 运行 在同一个 docker 网络中)
本质上,您需要添加一个协议映射器和一个侦听器来宣布可以在本地主机访问 kafka(请参阅前面的答案),将您的 kafka 容器绑定到 29092
的端口更改为 docker-compose 文件)并让您的应用程序连接到 localhost:29092