Confluent Platform 无法正确 send/consume 消息进入 Kafka 主题

Confluent Platform unable to properly send/consume message into the Kafka topic

目前,我使用带有 io.fabric8 docker-maven-plugin 的 Maven 来自动启动 Kafka 和 ZooKeeper。这是我当前的配置,运行良好:

<properties>
    <zookeeper.port>2181</zookeeper.port>

    <kafka.host>127.0.0.1</kafka.host>
    <kafka.port>9092</kafka.port>
</properties>

<plugin>
    <groupId>io.fabric8</groupId>
    <artifactId>docker-maven-plugin</artifactId>
    <version>${docker-maven-plugin.version}</version>

    <configuration>
        <showLogs>true</showLogs>
        <images>

            <image>
                <name>wurstmeister/zookeeper:${zookeeper.version}</name>
                <alias>zookeeper</alias>
                <run>
                    <ports>
                        <port>${zookeeper.port}:2181</port>
                    </ports>
                </run>
            </image>

            <image>
                <name>wurstmeister/kafka:${kafka.version}</name>
                <alias>kafka</alias>
                <run>
                    <ports>
                        <port>${kafka.port}:9092</port>
                    </ports>
                    <links>
                        <link>zookeeper:zookeeper</link>
                    </links>
                    <env>
                        <KAFKA_ADVERTISED_HOST_NAME>${local.ip}
                        </KAFKA_ADVERTISED_HOST_NAME>
                        <KAFKA_ADVERTISED_PORT>${kafka.port}</KAFKA_ADVERTISED_PORT>
                        <KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
                        <KAFKA_MESSAGE_MAX_BYTES>15000000</KAFKA_MESSAGE_MAX_BYTES>
                    </env>
                </run>
            </image>

            <image>
                <name>confluentinc/cp-ksql-server:5.0.0</name>
                <alias>cp-ksql-server</alias>
                <run>
                    <ports>
                        <port>8088:8088</port>
                    </ports>
                    <links>
                        <link>kafka:kafka</link>
                    </links>
                    <env>
                        <KSQL_BOOTSTRAP_SERVERS>${local.ip}:9092</KSQL_BOOTSTRAP_SERVERS>
                        <KSQL_LISTENERS>http://0.0.0.0:8088/</KSQL_LISTENERS>
                        <KSQL_KSQL_SERVICE_ID>confluent_test_2</KSQL_KSQL_SERVICE_ID>
                    </env>
                </run>
            </image>

        </images>

    </configuration>
</plugin>

现在我需要更多的灵活性和监控工具,以便能够正确分析我的本地 Kafka 实例,这就是为什么我决定开始使用 All-In-One Confluent Platform https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html

我执行了上面文档中描述的步骤,并且能够通过 docker-compose up -d 命令对以下 docker-compose.yml https://github.com/confluentinc/cp-docker-images/tree/master/examples/cp-all-in-one. After that I was able to access Confluent Control Center on http://localhost:9021

运行 Confluent Platform

我的应用程序也启动了,但消息没有正确发送或使用。这是日志:

2018-08-25 20:54:33.786  INFO 11304 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.1.0
2018-08-25 20:54:33.786  INFO 11304 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fdcf75ea326b8e07
2018-08-25 20:54:33.794  INFO 11304 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: NCbZ7abTTdu6SQHhthMbIw
2018-08-25 20:54:33.827  INFO 11304 --- [nio-8080-exec-1] c.p.domain.service.post.PostServiceImpl  : Message sent to the post creation queue: Post [id=5b8197d9e67db62c28ebe8d5, status=PENDING, externalPostId=2bdbd597-cc30-4478-b946-e873c8d53eec, chatName= ....]

2018-08-25 20:55:03.875 ERROR 11304 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='Post [id=5b8197d9e67db62c28ebe8d5, status=PENDING, externalPostId=2bdbd597-cc30-4478-b946-e873c8d53e...' to topic post.send:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for post.send-0: 30047 ms has passed since batch creation plus linger time

我可以在那里看到我的主题:

那里可能有什么问题以及如何解决?

已更新

我在我的应用程序中启用了 Kafka 的 DEBUG 日志,现在可以看到以下错误:

java.io.IOException: Can't resolve address: broker:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:792) [kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1002) [kafka-clients-1.1.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_171]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_171]
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-1.1.0.jar:na]
    ... 6 common frames omitted

Can't resolve address: broker:9092

Kafka 将它所知道的侦听器返回给客户端

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092

基本上,bootstrap.servers只需要至少一个地址,但其余的Kafka集群地址将返回给客户端,以便与各个代理建立连接。

当您在 Docker 网络外部连接到 localhost:9092 时,Docker 端口转发将工作,但随后 Kafka returns 返回您的客户端 broker:9092,并且客户端断开连接,因为您没有在主机和 Docker 网络之间设置 DNS(除非使用 Docker Swarm,否则您不需要设置)。


您的解决方案有两种选择

  1. 在您的代码中使用 localhost:29092,事情可能会开始变得更好
  2. 运行 您的 Java 代码位于同一网络上的 Docker 容器中,由 Compose 文件设置并保留 kafka:9092 引用(这类似于您的KSQL 和控制中心容器可以正常工作)