从 concurrent.futures 获取结果,它在 python ThreadPoolExecutor 中运行一个 kafka 消费者
get result from concurrent.futures which runs a kafka consumer in a python ThreadPoolExecuter
我正在根据主题可能具有的分区数动态创建 kafka 消费者。 (使用 KafkaConsumer 形式 Kafka-Python )
创建消费者后,我会使用 ThreadPoolExecuter 启动线程,这些线程开始侦听这些消费者分区上的特定主题
注意:整个代码都属于 flask API 端点。目标是监听 REST 调用,在生产者上发送消息,然后在消费者上监听响应,然后 return 对 REST 调用的响应
# function that listens to consumer messages in each thread
def _get_me_response(consumer_id, consumer):
for message in consumer:
message = message.value
break
consumer.commit()
return consumer_id, message
# ThreadPoolExecuter to start threads
with ThreadPoolExecutor(max_workers=len(consumers)) as executor:
futures = []
for consumer_id, consumer in consumers.items():
futures.append(executor.submit(_get_me_response,
consumer=consumer,
consumer_id=consumer_id
)
)
现在一旦我提交了我的线程,我就可以成功地接收到每个线程上的消息。
我的问题在于收集期货的结果并使用该结果响应 REST 端点
大多数在线示例都展示了如何通过“打印”语句从 futures 中获取结果,无论获得什么结果。现在,这是我获取期货响应的代码:
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
break
return response
我想做的是,如果我收到任何一个未来的回应,我想去 return 声明而不是等待其他期货完成(他们永远不会因为他们在消费者身上有一个永不结束的 for 循环)。打印语句在控制台中打印成功,但我无法退出循环
我试过使用一个全局变量,我
- 默认设置为 False
- 用于运行消费者函数内部的 while 循环
- 当我收到未来的回复时设置为 True
但这似乎没有按预期工作。
全局变量使用代码:
RETURN_CONSUMER_FLAG = False <---- Defining global variable
# function that listens to consumer messages in each thread with global var check
def _get_me_response(consumer_id, consumer):
while not RETURN_CONSUMER_FLAG: <---- Looping on global variable
for message in consumer:
message = message.value
consumer.commit()
return consumer_id, message
...
...
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
global RETURN_CONSUMER_FLAG <---- Setting global variable to TRUE
RETURN_CONSUMER_FLAG = True
return response
我检查了 as_completed 的代码,这似乎是因为它是一个收益生成器而不是 return 函数,所以它一直在等待所有 futures 完成工作才能退出循环。
知道如何解决这个问题吗?
由于@Louis Lac 的建议不适合我的问题,我最终像这样向 Kafka 消费者提供超时:
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=True,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000,
consumer_timeout_ms=300000)
这并不完全符合我的期望,但它确实有效。我的这部分代码对应用程序来说非常重要,所以我可以接受这种妥协。
我正在根据主题可能具有的分区数动态创建 kafka 消费者。 (使用 KafkaConsumer 形式 Kafka-Python )
创建消费者后,我会使用 ThreadPoolExecuter 启动线程,这些线程开始侦听这些消费者分区上的特定主题
注意:整个代码都属于 flask API 端点。目标是监听 REST 调用,在生产者上发送消息,然后在消费者上监听响应,然后 return 对 REST 调用的响应
# function that listens to consumer messages in each thread
def _get_me_response(consumer_id, consumer):
for message in consumer:
message = message.value
break
consumer.commit()
return consumer_id, message
# ThreadPoolExecuter to start threads
with ThreadPoolExecutor(max_workers=len(consumers)) as executor:
futures = []
for consumer_id, consumer in consumers.items():
futures.append(executor.submit(_get_me_response,
consumer=consumer,
consumer_id=consumer_id
)
)
现在一旦我提交了我的线程,我就可以成功地接收到每个线程上的消息。
我的问题在于收集期货的结果并使用该结果响应 REST 端点
大多数在线示例都展示了如何通过“打印”语句从 futures 中获取结果,无论获得什么结果。现在,这是我获取期货响应的代码:
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
break
return response
我想做的是,如果我收到任何一个未来的回应,我想去 return 声明而不是等待其他期货完成(他们永远不会因为他们在消费者身上有一个永不结束的 for 循环)。打印语句在控制台中打印成功,但我无法退出循环
我试过使用一个全局变量,我
- 默认设置为 False
- 用于运行消费者函数内部的 while 循环
- 当我收到未来的回复时设置为 True
但这似乎没有按预期工作。
全局变量使用代码:
RETURN_CONSUMER_FLAG = False <---- Defining global variable
# function that listens to consumer messages in each thread with global var check
def _get_me_response(consumer_id, consumer):
while not RETURN_CONSUMER_FLAG: <---- Looping on global variable
for message in consumer:
message = message.value
consumer.commit()
return consumer_id, message
...
...
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
global RETURN_CONSUMER_FLAG <---- Setting global variable to TRUE
RETURN_CONSUMER_FLAG = True
return response
我检查了 as_completed 的代码,这似乎是因为它是一个收益生成器而不是 return 函数,所以它一直在等待所有 futures 完成工作才能退出循环。
知道如何解决这个问题吗?
由于@Louis Lac 的建议不适合我的问题,我最终像这样向 Kafka 消费者提供超时:
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=True,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000,
consumer_timeout_ms=300000)
这并不完全符合我的期望,但它确实有效。我的这部分代码对应用程序来说非常重要,所以我可以接受这种妥协。