等待每个人连接到卡夫卡(node-rdkafka)中新消费者组的主题

Waiting for every to connect to a topic with new consumer group in kafka (node-rdkafka)

我正在构建一个 websocket 后端,它将连接到一个主题(只有一个分区)并从最早的位置使用数据并继续使用新数据直到 websocket 连接断开。在同一时间可以存在多个 websocket 连接。

为了确保从一开始就消耗所有数据,每次建立 websocket 连接时,我都会创建一个新的消费者组并订阅该主题

const Kafka = require('node-rdkafka')
const { v4: uuidv4 } = require('uuid')

const kafkaConfig = (uuid) => ({
  'group.id': `my-topic-${uuid}`,
  'metadata.broker.list': KAFKA_URL,
})
const topicName= 'test-topic'
const consumer = new Kafka.KafkaConsumer(kafkaConfig(uuidv4()), {
  'auto.offset.reset': 'earliest',
})

console.log('attempting to connect to topic')
consumer.connect({ topic: topicName, timeout: 300 }, (err) => {
   if (err) {
      console.log('error connecting consumer to topic', topicName)
      throw err
   }
   console.log(`consumer connected to topic ${topicName}`)
   consumer.subscribe([topicName])
   consumer.consume((_err, data) => {
       // send data to websocket 
   })
})

这似乎符合预期。但是,当我尝试将 consumers/consumer 组的数量超过 4 个时,消费者连接似乎在无限期地等待。在上面的片段中,我会看到日志 'attempting to connect' 但后面什么也没有。

看了kafka的文档,好像是没有consumer group数量限制的。

我 运行 Kafka/Zookeper 在我本地主机上的 docker 容器中,我没有对主题设置任何限制。

我的docker文件

zookeeper:
  image: confluentinc/cp-zookeeper:latest
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    labels:
      - 'custom.project=faster-cms'
      - 'custom.service=kafka'
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
      KAFKA_LOG4J_LOGGERS: 'kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO'
      CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'

我的问题是,为什么连接会无限期地等待,如何提高消费者限制或在无限期卡住时抛出错误。

显然这是 node-rdkafka 包中的限制。默认的 cunsumer/producer 组限制是 5。如果你想增加限制,在 .env 文件中设置环境变量 UV_THREADPOOL_SIZE 并且包会增加组的限制。