Python 中基于 confluent-kafka 的消费者不起作用
confluent-kafka based consumer in Python does not work
对 kafka 和 Avro 非常陌生。我遇到了一个问题,似乎无法弄清楚这里出了什么问题。我写了一个使用 Avro 作为序列化格式的 kafka 的生产者和消费者。生产者代码工作正常。在 运行 之后,当我 运行 kafka-avro-console-consumer
时,它给我的代码如下 -
bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test --property schema.registry.url=http://127.0.0.1:8081 --from-beginning
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
但是,当我尝试使用 python 执行相同操作时(在最基本的 example 之后),我编写了以下代码 -
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
class AvroConsumerAdapter(object):
def __init__(self, topic='test'):
self.topic = topic
self.consumer = AvroConsumer({'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://127.0.0.1:8081',
'group.id': 'mygroup'})
self.consumer.subscribe([topic])
def start_consuming(self):
running = True
while running:
try:
msg = self.consumer.poll(10)
if msg:
print(msg.value())
if not msg.error():
print("Here - 1")
print(msg.value())
elif msg.error().code() != KafkaError._PARTITION_EOF:
print("here-2")
print(msg.error())
running = False
else:
print('Here-3')
print(msg.error())
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
except Exception as ex:
print(ex)
running = False
self.consumer.close()
这个客户端永远呆在那里,从不打印任何东西。我不确定这里出了什么问题。谁能帮我解决这个问题。
查看 topic config options -- 如果要处理主题中当前的所有数据,则需要设置 auto.offset.reset': 'smallest'
。默认情况下它是 largest
这意味着它只会显示生成的新数据行。您可以通过留下当前的 Python 代码 运行 并向该主题生成新消息来验证这一点 - 您应该会看到 Python 代码接收它们。
对 kafka 和 Avro 非常陌生。我遇到了一个问题,似乎无法弄清楚这里出了什么问题。我写了一个使用 Avro 作为序列化格式的 kafka 的生产者和消费者。生产者代码工作正常。在 运行 之后,当我 运行 kafka-avro-console-consumer
时,它给我的代码如下 -
bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test --property schema.registry.url=http://127.0.0.1:8081 --from-beginning
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
{"name":{"string":"Hello World!"}}
但是,当我尝试使用 python 执行相同操作时(在最基本的 example 之后),我编写了以下代码 -
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
class AvroConsumerAdapter(object):
def __init__(self, topic='test'):
self.topic = topic
self.consumer = AvroConsumer({'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://127.0.0.1:8081',
'group.id': 'mygroup'})
self.consumer.subscribe([topic])
def start_consuming(self):
running = True
while running:
try:
msg = self.consumer.poll(10)
if msg:
print(msg.value())
if not msg.error():
print("Here - 1")
print(msg.value())
elif msg.error().code() != KafkaError._PARTITION_EOF:
print("here-2")
print(msg.error())
running = False
else:
print('Here-3')
print(msg.error())
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
except Exception as ex:
print(ex)
running = False
self.consumer.close()
这个客户端永远呆在那里,从不打印任何东西。我不确定这里出了什么问题。谁能帮我解决这个问题。
查看 topic config options -- 如果要处理主题中当前的所有数据,则需要设置 auto.offset.reset': 'smallest'
。默认情况下它是 largest
这意味着它只会显示生成的新数据行。您可以通过留下当前的 Python 代码 运行 并向该主题生成新消息来验证这一点 - 您应该会看到 Python 代码接收它们。