我可以在生产一批消息后从 Kafka 产品中调用一次 get_delivery_report 吗?
Can I call get_delivery_report from a Kafka produce once after a batch of messages is produced?
我正在使用 pykafka 并且有一个异步生产并使用 delivery_reports 的生产者。我知道必须使用 "get_delivery_report" 方法读取传递报告,并且我知道必须在与生成的消息相同的线程中调用它。但是, get_delievery_report 是否必须在每次调用 produce 之后调用,还是可以调用一次?如果发生不止一次,将 get_delivery_report return 所有失败的发送。例如,假设我异步发送 100 条消息:
for x in xrange(100):
with topic.get_producer(delivery_reports=Try, sync=False) as producer:
producer.produce("Test Message")
msg, exc = producer.get_delivery_report()
还是必须是:
for x in xrange(100):
with topic.get_producer(delivery_reports=Try, sync=False) as producer:
producer.produce("Test Message")
msg, exc = producer.get_delivery_report()
第一个似乎 运行 比第二个快得多。
除了此代码过度使用 topic.get_producer
的问题(在我的回答中解决 here), the reason that the first example runs much faster than the second is that the second is effectively running in synchronous mode. That is, every single message produced results in a wait for delivery confirmation before the next message can be produced. If you're interested in writing an application that produces asynchronously, you're probably more interested in something closer to the first example. The correct way to do this is laid out in the pykafka readme:
with topic.get_producer(delivery_reports=True) as producer:
count = 0
while True:
count += 1
producer.produce('test msg', partition_key='{}'.format(count))
if count % 10 ** 5 == 0: # adjust this or bring lots of RAM ;)
while True:
try:
msg, exc = producer.get_delivery_report(block=False)
if exc is not None:
print 'Failed to deliver msg {}: {}'.format(
msg.partition_key, repr(exc))
else:
print 'Successfully delivered msg {}'.format(
msg.partition_key)
except Queue.Empty:
break
此代码异步生成 10 ** 5
消息,然后停止生成以使用传递报告队列,其中每条生成的消息包含一个报告。它打印任何报告的交付错误,并允许在整个队列被消耗后恢复生产。 10 ** 5
数字可以根据您的内存限制进行调整 - 它有效地限制了传递报告队列的大小。
我正在使用 pykafka 并且有一个异步生产并使用 delivery_reports 的生产者。我知道必须使用 "get_delivery_report" 方法读取传递报告,并且我知道必须在与生成的消息相同的线程中调用它。但是, get_delievery_report 是否必须在每次调用 produce 之后调用,还是可以调用一次?如果发生不止一次,将 get_delivery_report return 所有失败的发送。例如,假设我异步发送 100 条消息:
for x in xrange(100):
with topic.get_producer(delivery_reports=Try, sync=False) as producer:
producer.produce("Test Message")
msg, exc = producer.get_delivery_report()
还是必须是:
for x in xrange(100):
with topic.get_producer(delivery_reports=Try, sync=False) as producer:
producer.produce("Test Message")
msg, exc = producer.get_delivery_report()
第一个似乎 运行 比第二个快得多。
除了此代码过度使用 topic.get_producer
的问题(在我的回答中解决 here), the reason that the first example runs much faster than the second is that the second is effectively running in synchronous mode. That is, every single message produced results in a wait for delivery confirmation before the next message can be produced. If you're interested in writing an application that produces asynchronously, you're probably more interested in something closer to the first example. The correct way to do this is laid out in the pykafka readme:
with topic.get_producer(delivery_reports=True) as producer:
count = 0
while True:
count += 1
producer.produce('test msg', partition_key='{}'.format(count))
if count % 10 ** 5 == 0: # adjust this or bring lots of RAM ;)
while True:
try:
msg, exc = producer.get_delivery_report(block=False)
if exc is not None:
print 'Failed to deliver msg {}: {}'.format(
msg.partition_key, repr(exc))
else:
print 'Successfully delivered msg {}'.format(
msg.partition_key)
except Queue.Empty:
break
此代码异步生成 10 ** 5
消息,然后停止生成以使用传递报告队列,其中每条生成的消息包含一个报告。它打印任何报告的交付错误,并允许在整个队列被消耗后恢复生产。 10 ** 5
数字可以根据您的内存限制进行调整 - 它有效地限制了传递报告队列的大小。