在 Docker 中连接到 Kafka 运行

Connect to Kafka running in Docker

我在我的本地机器上设置了一个单节点 Kafka Docker 容器,就像 the Confluent documentation(步骤 2-3)中描述的那样。

此外,我还公开了 Zookeeper 的端口 2181 和 Kafka 的端口 9092,这样我就可以从本地计算机上的客户端 运行 连接到它们:

$ docker run -d \
    -p 2181:2181 \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:4.1.0

$ docker run -d \
    --net=confluent \
    --name=kafka \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0

问题: 当我尝试从主机连接到 Kafka 时,连接失败,因为它 can't resolve address: kafka:9092.

这是我的 Java 代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();

异常:

java.io.IOException: Can't resolve address: kafka:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-2.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-2.0.0.jar:na]
    ... 7 common frames omitted

问题:如何在Docker中连接到Kafka运行?我的代码是来自主机的 运行,而不是 Docker。

注意:我知道理论上我可以尝试使用 DNS 设置 /etc/hosts 但这是一种解决方法 - 它不应该那样。

也有类似的问题,但它是基于ches/kafka图像。我使用基于 confluentinc 的图像,这不一样。

当你第一次连接到一个kafka节点时,它会给你所有的kafka节点和url连接的位置。然后您的应用程序将尝试直接连接到每个 kafka。

问题始终是卡夫卡会给你什么url?这就是为什么有 KAFKA_ADVERTISED_LISTENERS 将被 kafka 用来告诉世界如何访问它的原因。

现在对于您的用例,有多个小问题需要考虑:

假设您设置 plaintext://kafka:9092

  • 如果您的 docker compose 中有一个使用 kafka 的应用程序,这没问题。此应用程序将从 kafka 获得具有 kafka 的 URL,可通过 docker 网络解析。
  • 如果您尝试从您的主系统或不在同一 docker 网络中的另一个容器连接,这将失败,因为无法解析 kafka 名称。

==> 要解决此问题,您需要有一个特定的 DNS 服务器,例如服务发现服务器,但这对小东西来说是个大麻烦。或者您手动将 kafka 名称设置为每个 /etc/hosts

中的容器 ip

如果设置plaintext://localhost:9092

  • 如果您有端口映射(启动 kafka 时使用 -p 9092:9092),这在您的系统上就没问题
  • 如果您从容器上的应用程序进行测试(相同 docker 网络或不同)(localhost 是容器本身而不是 kafka),这将失败

==> 如果你有这个并且希望在另一个容器中使用 kafka 客户端,解决这个问题的一种方法是为两个容器共享网络(相同的 ip)

最后一个选项:在名称中设置一个 IP:plaintext://x.y.z.a:9092(kafka 广告url 不能是文档中所述的 0.0.0.0 https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners

这对每个人来说都可以...但是您如何获得 x.y.z.a 名称?

唯一的方法是在启动容器时硬编码此 ip:docker run .... --net confluent --ip 10.x.y.z ...。注意需要将ip适配为confluent子网中的一个有效ip。

免责声明

tl;dr - A simple port forward from the container to the host will not work, and no hosts files should be modified. What exact IP/hostname + port do you want to connect to? Make sure that value is set as advertised.listeners on the broker. Make sure that address and the servers listed as part of bootstrap.servers are actually resolvable (ping an IP/hostname, use netcat to check ports...)

Below gives answers for commonly used Kafka images, but it's all the same Apache Kafka running in a container.
You're just dependent on how it is configured. And which variables make it so.


The below answer uses confluentinc docker images to address the question that was asked, not wurstmeister/kafka. More specifically, the latter images are not well-maintained despite being the one of the most popular Kafka docker image.

以下部分尝试汇总使用另一张图片所需的所有详细信息。

wurstmeister/kafka

Refer their README section on listener configuration, Also read their Connectivity wiki.

bitnami/kafka

If you want a small container, try these. The images are much smaller than the Confluent ones and are much more well maintained than wurstmeister. Refer their README for listener configuration.

debezium/kafka

Docs on it are mentioned here.

Note: advertised host and port settings are deprecated. Advertised listeners covers both. Similar to the Confluent containers, Debezium can use KAFKA_ prefixed broker settings to update its properties.

Others

spotify/kafka is deprecated and outdated.
fast-data-dev or lensesio/box are great for an all in one solution, but are bloated if you only want Kafka
Your own Dockerfile - Why? Is something incomplete with these others? Start with a pull request, not starting from scratch.

补充阅读,全功能 docker-compose,和网络图,见this blog by @rmoff

回答

The Confluent quickstart (Docker) document 假定所有生产和消费请求都在 Docker 网络内。

您可以通过 运行 您的 Kafka 客户端代码在其自己的容器中解决连接到 kafka:9092 的问题,因为它使用 Docker 网桥,但否则您将需要添加更多环境变量以在外部公开容器,同时仍然让它在 Docker 网络中工作。

首先添加PLAINTEXT_HOST:PLAINTEXT的协议映射,将监听器协议映射到Kafka协议

键:KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
值:PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

然后在不同的端口上设置两个通告的侦听器。 (此处的 kafka 指的是 docker 容器名称;它也可能被命名为 broker,因此请仔细检查您的服务 + 主机名)。注意协议匹配上面映射的右侧值

密钥:KAFKA_ADVERTISED_LISTENERS
值:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092

当运行容器时,为主机端口映射添加-p 29092:29092


tl;博士

(采用以上设置)

如果仍然无法正常工作,可以将 KAFKA_LISTENERS 设置为包括 <PROTOCOL>://0.0.0.0:<PORT>,其中两个选项都与广告设置和 Docker-转发端口

相匹配

同一台机器上的客户端,不在容器中

通告 localhost 和关联的端口将允许您在容器外部进行连接,正如您所期望的那样。

换句话说,当 运行 任何 Kafka 客户端 Docker 网络之外(包括您可能已在本地安装的 CLI 工具)时,使用 localhost:29092 用于 bootstrap 服务器和 localhost:2181 用于 Zookeeper(需要 Docker 端口转发)

另一台机器上的客户端

如果尝试从外部服务器连接,您需要公布主机 的外部 hostname/ip 以及本地主机的 as/in 位置.
简单地通过端口转发通告 localhost 是行不通的,因为 Kafka 协议仍会继续通告您配置的侦听器。

此设置需要 Docker 端口转发 路由器端口转发(和防火墙/安全组更改)如果不在同一本地网络中,例如,您的容器运行 在云中并且您想从本地计算机与其交互。

容器中的客户端,在同一台主机上

这是最不容易出错的配置;您可以直接使用 DNS 服务名称。

当 运行 一个应用 在 Docker 网络 中时,使用 kafka:9092(参见上面公布的 PLAINTEXT 侦听器配置)对于 bootstrap 服务器和 zookeeper:2181 对于 Zookeeper,就像任何其他 Docker 服务通信一样(不需要任何端口转发)

如果您使用单独的 docker run 命令,或 Compose 文件,您需要手动定义共享 network

See the example Compose file for the full Confluent stack or more minimal one 对于单个经纪人。

相关问题

附录

任何对 Kubernetes 部署感兴趣的人:https://operatorhub.io/?keyword=Kafka

在动物园管理员之前

  1. docker 容器 运行 --name zookeeper -p 2181:2181 zookeeper

卡夫卡之后

  1. docker 容器 运行 --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.8.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT: //ip_address_of_your_computer_but_not_localhost!!!:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

在kafka消费者和生产者配置中

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

我运行我的项目符合这些规定。祝你好运。

我在 docker 上与 Kafka/Zookeeper/Schema-Registry 和 Mac M1 上的客户分享我的 Mac M1 经验。此设置要求端口 9092 必须从 Mac 使用,而不是从 docker 使用,因此应交换端口

这样

Key: KAFKA_ADVERTISED_LISTENERS
Value: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092

加上端口转发:

ports
   - "9092:9092"

最后,对于我的设置,我必须以这种方式设置监听器密钥

Key: KAFKA_LISTENERS
Value: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092