Python - Kafka:消费者失败
Python - Kafka: consumer failing
我有一个简单的生产者-消费者设置:1 个生产者(作为线程)和 2 个消费者(作为 2 个进程)。
生产者的运行方法:
def run(self):
producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
api_version=(0, 10))
while not self.stop_event.is_set():
self.logger.info("Checking for new changes")
self.check_for_new_changes(producer)
self.logger.info("Sleeping for {minutes}
minutes...".format(minutes=self.time_to_sleep / 60))
time.sleep(self.time_to_sleep)
producer.close()
基本上它会检查更改,如果发现新更改则发送消息,然后休眠 5 分钟。
运行方法:
def run(self):
if self.group_id:
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,
consumer_timeout_ms=1000,
api_version=(0, 10),
group_id=self.group_id)
else:
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,
consumer_timeout_ms=1000,
api_version=(0, 10))
consumer.subscribe(['new_change'])
while not self.stop_event.is_set():
for msg in consumer:
self.logger.info("New message:\n{msg}".format(msg=msg))
self.process_new_change(json.loads(msg.value))
if self.stop_event.is_set():
consumer.close()
return
consumer.close()
它似乎工作正常,但在 运行 宁了一段时间后,我在协调器日志中收到了这些消息:
[2017-12-17 02:06:40,639] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-f5cdcad3-bc1a-4623-a42b-f5de5e8bded1 in group meta_data_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Preparing to rebalance group meta_data_consumer with old generation 15 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Group meta_data_consumer with generation 16 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:41,784] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-bdea8ce3-922f-4ee1-9959-13341e1730f5 in group failures_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Preparing to rebalance group failures_consumer with old generation 9 (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Group failures_consumer with generation 10 is now empty (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator)
这会杀死我的消费者,他们会停止 运行ning。
我在消费者日志中没有看到任何异常或错误。
什么可能导致他们失败?
我认为您的 zookeeper.session.timeout.ms
设置低于 5 分钟。从动物园管理员设置中调整超时。看看它是否仍然失败。如果是这样,那么您应该在 kafka 配置中调整超时。 group.max.session.timeout.ms
、rebalance.timeout.ms
、heartbeat.interval.ms
应相应调整。您的客户休眠了 5 分钟,在此期间,超过了其中一个超时值,并且组协调器尝试重新平衡消费者,认为这些消费者已经失败。
我有一个简单的生产者-消费者设置:1 个生产者(作为线程)和 2 个消费者(作为 2 个进程)。 生产者的运行方法:
def run(self):
producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
api_version=(0, 10))
while not self.stop_event.is_set():
self.logger.info("Checking for new changes")
self.check_for_new_changes(producer)
self.logger.info("Sleeping for {minutes}
minutes...".format(minutes=self.time_to_sleep / 60))
time.sleep(self.time_to_sleep)
producer.close()
基本上它会检查更改,如果发现新更改则发送消息,然后休眠 5 分钟。
运行方法:
def run(self):
if self.group_id:
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,
consumer_timeout_ms=1000,
api_version=(0, 10),
group_id=self.group_id)
else:
consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers,
consumer_timeout_ms=1000,
api_version=(0, 10))
consumer.subscribe(['new_change'])
while not self.stop_event.is_set():
for msg in consumer:
self.logger.info("New message:\n{msg}".format(msg=msg))
self.process_new_change(json.loads(msg.value))
if self.stop_event.is_set():
consumer.close()
return
consumer.close()
它似乎工作正常,但在 运行 宁了一段时间后,我在协调器日志中收到了这些消息:
[2017-12-17 02:06:40,639] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-f5cdcad3-bc1a-4623-a42b-f5de5e8bded1 in group meta_data_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Preparing to rebalance group meta_data_consumer with old generation 15 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Group meta_data_consumer with generation 16 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:41,784] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-bdea8ce3-922f-4ee1-9959-13341e1730f5 in group failures_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Preparing to rebalance group failures_consumer with old generation 9 (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator)
[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Group failures_consumer with generation 10 is now empty (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator)
这会杀死我的消费者,他们会停止 运行ning。 我在消费者日志中没有看到任何异常或错误。
什么可能导致他们失败?
我认为您的 zookeeper.session.timeout.ms
设置低于 5 分钟。从动物园管理员设置中调整超时。看看它是否仍然失败。如果是这样,那么您应该在 kafka 配置中调整超时。 group.max.session.timeout.ms
、rebalance.timeout.ms
、heartbeat.interval.ms
应相应调整。您的客户休眠了 5 分钟,在此期间,超过了其中一个超时值,并且组协调器尝试重新平衡消费者,认为这些消费者已经失败。