Python-Kafka:无限轮询主题

Python-Kafka: Keep polling topic infinitely

我正在使用 python-kafka 来收听 kafka 主题并使用该记录。我想让它无限轮询而不退出。这是我的代码:

def test():
    consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
    for msg in consumer:
        print(msg.value)

这段代码只是读取数据,直接退出。有没有办法在没有推送消息的情况下继续收听话题?

持续监控主题的任何相关示例对我来说也很棒。

使用confluent_kafka

import time
from confluent_kafka import Consumer


consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-1',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {message.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")

使用kafka-python

import time
from kafka import KafkaConsumer

consumer = KafkaConsumer(
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     group_id='my-consumer-1',
)
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {message.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")