docker 容器中的 NiFi 无法与 kafka 通信:TimoutException,kafkacat ist 工作正常

NiFi in docker container fails to talk to kafka: TimoutException, kafkacat ist working just fine

我已经通过 docker 设置了 NiFi (1.11.4) 和 Kafka(2.5)(docker-下面的撰写文件,实际的 NiFi 流程定义 https://github.com/geoHeil/streaming-reference)。 当尝试跟进结合处理器的基本入门教程(例如 https://towardsdatascience.com/big-data-managing-the-flow-of-data-with-apache-nifi-and-apache-kafka-af674cd8f926)时,例如:

我运行进入超时异常问题:

nifi_1            | 2020-06-10 11:15:47,311 ERROR [kafka-producer-network-thread | producer-2] o.a.n.p.k.pubsub.PublishKafkaRecord_2_0 PublishKafkaRecord_2_0[id=959f0e64-0172-1000-0000-0000650181a4] Failed to send StandardFlowFileRecord[uuid=944464e4-94ea-48dc-89fa-d19c34f163e7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1591771086044-1, container=default, section=1], offset=227962, length=422],offset=0,name=c8dd1dd2-0ffe-4875-9d45-902ea331c210,size=422] to Kafka: org.apache.kafka.common.errors.TimeoutException: Expiring 64 record(s) for test-0: 30029 ms has passed since batch creation plus linger time
nifi_1            | org.apache.kafka.common.errors.TimeoutException: Expiring 64 record(s) for test-0: 30029 ms has passed since batch creation plus linger time
nifi_1            | 2020-06-10 11:15:47,311 ERROR [kafka-producer-network-thread | producer-2] o.a.n.p.k.pubsub.PublishKafkaRecord_2_0 PublishKafkaRecord_2_0[id=959f0e64-0172-1000-0000-0000650181a4] Failed to send StandardFlowFileRecord[uuid=944464e4-94ea-48dc-89fa-d19c34f163e7,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1591771086044-1, container=default, section=1], offset=227962, length=422],offset=0,name=c8dd1dd2-0ffe-4875-9d45-902ea331c210,size=422] to Kafka: org.apache.kafka.common.errors.TimeoutException: Expiring 64 record(s) for test-0: 30029 ms has passed since batch creation plus linger time
nifi_1            | org.apache.kafka.common.errors.TimeoutException: Expiring 64 record(s) for test-0: 30029 ms has passed since batch creation plus linger time

但是,一个:

kafkacat -C -b localhost:9092 -t test #starts listener
kafkacat -P -b localhost:9092 -t test #starts producer

通过 kafka 实例很好地传输事件。

docker-compose 文件如下所示:

version: "3"
services:
  nifi:
    image: apache/nifi:1.11.4
    ports:
      - 8080:8080 # Unsecured HTTP Web Port
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_CLUSTER_IS_NODE=true
      - NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082
      - NIFI_ZK_CONNECT_STRING=zookeeper:2181
      - NIFI_ELECTION_MAX_WAIT=1 min
    links:
      - broker
      - zookeeper
    volumes:
      - ./for_nifi/conf:/opt/nifi/nifi-current/conf
  zookeeper: #https://github.com/confluentinc/cp-all-in-one/blob/5.5.0-post/cp-all-in-one/docker-compose.yml#L5
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

您使用错误的端口连接到代理。通过连接到 9092,您连接到向客户端通告 localhost:9092 以进行后续连接的侦听器。这就是当您从本地计算机使用 kafkacat 时它起作用的原因(因为 9092 暴露给您的本地计算机)

如果您使用 broker:29092,则代理将为客户端提供正确的连接地址(即 broker:29092)。

要了解有关广告听众的更多信息,请参阅 this blog