等待每个人连接到卡夫卡(node-rdkafka)中新消费者组的主题
Waiting for every to connect to a topic with new consumer group in kafka (node-rdkafka)
我正在构建一个 websocket 后端,它将连接到一个主题(只有一个分区)并从最早的位置使用数据并继续使用新数据直到 websocket 连接断开。在同一时间可以存在多个 websocket 连接。
为了确保从一开始就消耗所有数据,每次建立 websocket 连接时,我都会创建一个新的消费者组并订阅该主题
const Kafka = require('node-rdkafka')
const { v4: uuidv4 } = require('uuid')
const kafkaConfig = (uuid) => ({
'group.id': `my-topic-${uuid}`,
'metadata.broker.list': KAFKA_URL,
})
const topicName= 'test-topic'
const consumer = new Kafka.KafkaConsumer(kafkaConfig(uuidv4()), {
'auto.offset.reset': 'earliest',
})
console.log('attempting to connect to topic')
consumer.connect({ topic: topicName, timeout: 300 }, (err) => {
if (err) {
console.log('error connecting consumer to topic', topicName)
throw err
}
console.log(`consumer connected to topic ${topicName}`)
consumer.subscribe([topicName])
consumer.consume((_err, data) => {
// send data to websocket
})
})
这似乎符合预期。但是,当我尝试将 consumers/consumer 组的数量超过 4 个时,消费者连接似乎在无限期地等待。在上面的片段中,我会看到日志 'attempting to connect' 但后面什么也没有。
看了kafka的文档,好像是没有consumer group数量限制的。
我 运行 Kafka/Zookeper 在我本地主机上的 docker 容器中,我没有对主题设置任何限制。
我的docker文件
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
labels:
- 'custom.project=faster-cms'
- 'custom.service=kafka'
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
KAFKA_LOG4J_LOGGERS: 'kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO'
CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
我的问题是,为什么连接会无限期地等待,如何提高消费者限制或在无限期卡住时抛出错误。
显然这是 node-rdkafka
包中的限制。默认的 cunsumer/producer 组限制是 5。如果你想增加限制,在 .env
文件中设置环境变量 UV_THREADPOOL_SIZE
并且包会增加组的限制。
我正在构建一个 websocket 后端,它将连接到一个主题(只有一个分区)并从最早的位置使用数据并继续使用新数据直到 websocket 连接断开。在同一时间可以存在多个 websocket 连接。
为了确保从一开始就消耗所有数据,每次建立 websocket 连接时,我都会创建一个新的消费者组并订阅该主题
const Kafka = require('node-rdkafka')
const { v4: uuidv4 } = require('uuid')
const kafkaConfig = (uuid) => ({
'group.id': `my-topic-${uuid}`,
'metadata.broker.list': KAFKA_URL,
})
const topicName= 'test-topic'
const consumer = new Kafka.KafkaConsumer(kafkaConfig(uuidv4()), {
'auto.offset.reset': 'earliest',
})
console.log('attempting to connect to topic')
consumer.connect({ topic: topicName, timeout: 300 }, (err) => {
if (err) {
console.log('error connecting consumer to topic', topicName)
throw err
}
console.log(`consumer connected to topic ${topicName}`)
consumer.subscribe([topicName])
consumer.consume((_err, data) => {
// send data to websocket
})
})
这似乎符合预期。但是,当我尝试将 consumers/consumer 组的数量超过 4 个时,消费者连接似乎在无限期地等待。在上面的片段中,我会看到日志 'attempting to connect' 但后面什么也没有。
看了kafka的文档,好像是没有consumer group数量限制的。
我 运行 Kafka/Zookeper 在我本地主机上的 docker 容器中,我没有对主题设置任何限制。
我的docker文件
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
labels:
- 'custom.project=faster-cms'
- 'custom.service=kafka'
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG4J_ROOT_LOGLEVEL: INFO
KAFKA_LOG4J_LOGGERS: 'kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO'
CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
我的问题是,为什么连接会无限期地等待,如何提高消费者限制或在无限期卡住时抛出错误。
显然这是 node-rdkafka
包中的限制。默认的 cunsumer/producer 组限制是 5。如果你想增加限制,在 .env
文件中设置环境变量 UV_THREADPOOL_SIZE
并且包会增加组的限制。