使用多处理连续使用 Kafka 并以特定时间间隔更新队列

Consume Kafka continuously and update queue at specific intervals using multiprocessing

我正在尝试持续使用来自 kafka 的事件。同一个应用程序还使用这些消耗的数据来执行一些分析并以 n 秒为间隔更新数据库(假设 n = 60 秒)。

在同一个应用中,如果process1 = Kafka Consumer , process2= Data Analysis and database update logic.

process1 is to be run continuously
process2 is to be executed once every n=60 seconds 

process2 与计算和数据库更新有关,因此需要 5-10 秒来执行。我不希望 process1process2 执行期间停止。因此,我正在使用 multiprocessing module(如果我在 python 中使用 Threading 模块,process1,process2 将是 thread1,thread2,但由于我阅读了有关 GIL 的内容Threading 模块无法利用多核架构,我决定使用 multiprocessing 模块。)在这种情况下实现并发。 (如果我对上述 GILThreading 模块限制的理解不正确,我深表歉意,请随时纠正我)。

我的应用程序在两个进程之间有一个相当简单的交互,其中 process1 只是用它在 60 秒内收到的所有消息填充队列,并在 60 秒结束时,只是将所有消息传输到process2

我在使用此传输逻辑时遇到问题。如何将队列的内容从 process1 传输到 process2 (我猜那是主进程还是另一个进程?这是我的另一个问题,除了主进程之外我应该实例化 2 个进程吗进程?)在 60 秒结束时清除队列内容,以便它在另一次迭代中再次开始。

到目前为止我有以下内容:

import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue

def kafka_init():
    client=KafkaClient('kafka1.wpit.nile.works')
    consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
    return consumer

def consumeMessages(q):
    print "thread started"
    while not q.empty():
        try:
            print q.get(True,1)
        Queue.Empty:
            break
    print "thread ended"
if __name__=="__main__":
    starttime=time.time()
    timeout=starttime+ 10 #timeout of read in seconds
    consumer=kafka_init()
    q=Queue()
    p=Process(target=consumeMessages,args=q)
    while(True):
        q.put(consumer.get_message())
        if time.time()>timeout:
            #transfer logic from process1 to main process here.
            print "Start time",starttime
            print "End time",time.time()
            p.start()
            p.join()
            break

如有任何帮助,我们将不胜感激。

您正在处理的问题不是特定于 kafka 的,因此我将使用泛型 "messages",它们只是整数。

在我看来,主要问题是,一方面你想处理 消息一旦产生,另一方面只想更新 每 60 秒更新一次数据库。

如果您使用 q.get(),默认情况下此方法调用将阻塞,直到队列中有可用消息为止。这可能需要 60 秒以上的时间,这会使数据库更新延迟太久。所以我们不能使用阻塞 q.get。我们需要使用带有超时的 q.get 以便调用是非阻塞的:

import time
import multiprocessing as mp
import random
import Queue

def process_messages(q):
    messages = []
    start = time.time()
    while True:
        try:
            message = q.get(timeout=1)
        except Queue.Empty:
            pass
        else:
            messages.append(message)
            print('Doing data analysis on {}'.format(message))
        end = time.time()
        if end-start > 60:
            print('Updating database: {}'.format(messages))
            start = end
            messages = []

def get_messages(q):
    while True:
        time.sleep(random.uniform(0,5))
        message = random.randrange(100)
        q.put(message)

if __name__ == "__main__":
    q = mp.Queue()

    proc1 = mp.Process(target=get_messages, args=[q])
    proc1.start()

    proc2 = mp.Process(target=process_messages, args=[q])
    proc2.start()

    proc1.join()
    proc2.join()

产生如下输出:

Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]