从 concurrent.futures 获取结果,它在 python ThreadPoolExecutor 中运行一个 kafka 消费者

get result from concurrent.futures which runs a kafka consumer in a python ThreadPoolExecuter

我正在根据主题可能具有的分区数动态创建 kafka 消费者。 (使用 KafkaConsumer 形式 Kafka-Python )

创建消费者后,我会使用 ThreadPoolExecuter 启动线程,这些线程开始侦听这些消费者分区上的特定主题

注意:整个代码都属于 flask API 端点。目标是监听 REST 调用,在生产者上发送消息,然后在消费者上监听响应,然后 return 对 REST 调用的响应

# function that listens to consumer messages in each thread
def _get_me_response(consumer_id, consumer):
    for message in consumer:
        message = message.value
        break
    consumer.commit()
    return consumer_id, message

# ThreadPoolExecuter to start threads
  with ThreadPoolExecutor(max_workers=len(consumers)) as executor:
      futures = []
      for consumer_id, consumer in consumers.items():
          futures.append(executor.submit(_get_me_response,
                                         consumer=consumer,
                                         consumer_id=consumer_id
                                         )
                         )

现在一旦我提交了我的线程,我就可以成功地接收到每个线程上的消息。

我的问题在于收集期货的结果并使用该结果响应 REST 端点

大多数在线示例都展示了如何通过“打印”语句从 futures 中获取结果,无论获得什么结果。现在,这是我获取期货响应的代码:

 # code for gathering result from future in whichever order they arrive
 for future in as_completed(futures):
     resp_cid, response = future.result()
     print(json.dumps(response))
     if response['match_status'] == 1:
         break

 return response

我想做的是,如果我收到任何一个未来的回应,我想去 return 声明而不是等待其他期货完成(他们永远不会因为他们在消费者身上有一个永不结束的 for 循环)。打印语句在控制台中打印成功,但我无法退出循环


我试过使用一个全局变量,我

  1. 默认设置为 False
  2. 用于运行消费者函数内部的 while 循环
  3. 当我收到未来的回复时设置为 True

但这似乎没有按预期工作。

全局变量使用代码:

RETURN_CONSUMER_FLAG = False                          <---- Defining global variable

# function that listens to consumer messages in each thread with global var check
def _get_me_response(consumer_id, consumer):
    while not RETURN_CONSUMER_FLAG:                   <---- Looping on global variable
        for message in consumer:
            message = message.value
            consumer.commit()
            return consumer_id, message
...
...

# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
    resp_cid, response = future.result()
    print(json.dumps(response))
    if response['match_status'] == 1:
        global RETURN_CONSUMER_FLAG                   <---- Setting global variable to TRUE
        RETURN_CONSUMER_FLAG = True

return response

我检查了 as_completed 的代码,这似乎是因为它是一个收益生成器而不是 return 函数,所以它一直在等待所有 futures 完成工作才能退出循环。

知道如何解决这个问题吗?

由于@Louis Lac 的建议不适合我的问题,我最终像这样向 Kafka 消费者提供超时:

consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
                         bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                         enable_auto_commit=True,
                         auto_offset_reset='latest',
                         max_poll_records=1,
                         max_poll_interval_ms=300000,
                         consumer_timeout_ms=300000)

这并不完全符合我的期望,但它确实有效。我的这部分代码对应用程序来说非常重要,所以我可以接受这种妥协。