同一组下不同分区的Kafka消费者仍然间歇性地消费相同的消息
Kafka Consumers on different partitions under same group are still consuming same messages intermittently
我有1个消费者组和5个消费者。也有 5 个分区,因此每个消费者获得 1 个分区。
CLI 还显示
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1 TopicId: kJqfk1FoRSWtkkjfsgw9FSg PartitionCount: 5 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: Topic-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 4 Leader: 0 Replicas: 0 Isr: 0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} 正确显示每个分区的不同消息。
但是,我经常看到 2 个或更多消费者处理同一条消息并且是 kafka 的新手,我真的无法弄清楚这个问题。
我正在使用 pykafka 来消费消息:
class CB_Kafka_Consumer:
def __init__(self):
self._connect_kafka_consumer()
module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
''' Get DB session object '''
self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")
def _connect_kafka_consumer(self):
self._consumer = None
try:
self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
topic = self._client.topics[kafka_topic]
self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)
module_logger.info("Created a Kafka Consumer")
except Exception as ex:
module_logger.error('Exception while connecting Kafka')
traceback.print_exc()
def start_consuming(self):
module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
while True:
for msg in self._consumer:
self._consumer.commit_offsets()
message = json.loads(msg.value.decode('utf-8'))
module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
module_logger.debug(pprint.pformat(message))
self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
self._consumer.close()
打印消息的分区和偏移量。您应该看到它们实际上是您正在处理的独特事件。
如果相同,“10 分钟到 4 小时”过程很可能导致消费者组重新平衡(Kafka 要求您每隔 毫秒 调用一次记录轮询,通过默认),并且您遇到 at-least-once 处理语义,因此需要自己处理重复项。
我看到您在代码中使用了一些数据库客户端,因此建议使用 Kafka Connect 框架,而不是编写您自己的 Consumer
我有1个消费者组和5个消费者。也有 5 个分区,因此每个消费者获得 1 个分区。
CLI 还显示
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic Topic-1
Topic: Topic-1 TopicId: kJqfk1FoRSWtkkjfsgw9FSg PartitionCount: 5 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: Topic-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: Topic-1 Partition: 4 Leader: 0 Replicas: 0 Isr: 0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic-1 --from-beginning --partition {n} 正确显示每个分区的不同消息。
但是,我经常看到 2 个或更多消费者处理同一条消息并且是 kafka 的新手,我真的无法弄清楚这个问题。
我正在使用 pykafka 来消费消息:
class CB_Kafka_Consumer:
def __init__(self):
self._connect_kafka_consumer()
module_logger.info(f"Kafka Consumer at {kafka_broker_url}:{kafka_broker_port}")
''' Get DB session object '''
self.session = get_db_session(SQL_USERNAME, SQL_PASSWORD, SQL_SERVER_IP, SQL_DATABASE)
module_logger.info(f"Connected to MySQL at {SQL_SERVER_IP}")
def _connect_kafka_consumer(self):
self._consumer = None
try:
self._client = KafkaClient(f"{kafka_broker_url}:{kafka_broker_port}")
topic = self._client.topics[kafka_topic]
self._consumer = topic.get_simple_consumer(consumer_group=CONSUMER_GROUP_NAME)
module_logger.info("Created a Kafka Consumer")
except Exception as ex:
module_logger.error('Exception while connecting Kafka')
traceback.print_exc()
def start_consuming(self):
module_logger.info("*"*10 + " Staring to Consume Messages " + "*"*10)
while True:
for msg in self._consumer:
self._consumer.commit_offsets()
message = json.loads(msg.value.decode('utf-8'))
module_logger.debug(f"\n----- RECEIVED MESSAGE ----------")
module_logger.debug(pprint.pformat(message))
self.process_message(message) #Logic for processing messages (takes anywhere between 10min to 4 hours for the task to complete)
self._consumer.close()
打印消息的分区和偏移量。您应该看到它们实际上是您正在处理的独特事件。
如果相同,“10 分钟到 4 小时”过程很可能导致消费者组重新平衡(Kafka 要求您每隔 毫秒 调用一次记录轮询,通过默认),并且您遇到 at-least-once 处理语义,因此需要自己处理重复项。
我看到您在代码中使用了一些数据库客户端,因此建议使用 Kafka Connect 框架,而不是编写您自己的 Consumer