使用多处理连续使用 Kafka 并以特定时间间隔更新队列
Consume Kafka continuously and update queue at specific intervals using multiprocessing
我正在尝试持续使用来自 kafka 的事件。同一个应用程序还使用这些消耗的数据来执行一些分析并以 n 秒为间隔更新数据库(假设 n = 60 秒)。
在同一个应用中,如果process1 = Kafka Consumer , process2= Data Analysis and database update logic.
process1 is to be run continuously
process2 is to be executed once every n=60 seconds
process2
与计算和数据库更新有关,因此需要 5-10 秒来执行。我不希望 process1
在 process2
执行期间停止。因此,我正在使用 multiprocessing module
(如果我在 python 中使用 Threading
模块,process1,process2
将是 thread1,thread2
,但由于我阅读了有关 GIL 的内容Threading
模块无法利用多核架构,我决定使用 multiprocessing
模块。)在这种情况下实现并发。 (如果我对上述 GIL
或 Threading
模块限制的理解不正确,我深表歉意,请随时纠正我)。
我的应用程序在两个进程之间有一个相当简单的交互,其中 process1
只是用它在 60 秒内收到的所有消息填充队列,并在 60 秒结束时,只是将所有消息传输到process2
。
我在使用此传输逻辑时遇到问题。如何将队列的内容从 process1
传输到 process2
(我猜那是主进程还是另一个进程?这是我的另一个问题,除了主进程之外我应该实例化 2 个进程吗进程?)在 60 秒结束时清除队列内容,以便它在另一次迭代中再次开始。
到目前为止我有以下内容:
import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue
def kafka_init():
client=KafkaClient('kafka1.wpit.nile.works')
consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
return consumer
def consumeMessages(q):
print "thread started"
while not q.empty():
try:
print q.get(True,1)
Queue.Empty:
break
print "thread ended"
if __name__=="__main__":
starttime=time.time()
timeout=starttime+ 10 #timeout of read in seconds
consumer=kafka_init()
q=Queue()
p=Process(target=consumeMessages,args=q)
while(True):
q.put(consumer.get_message())
if time.time()>timeout:
#transfer logic from process1 to main process here.
print "Start time",starttime
print "End time",time.time()
p.start()
p.join()
break
如有任何帮助,我们将不胜感激。
您正在处理的问题不是特定于 kafka 的,因此我将使用泛型 "messages",它们只是整数。
在我看来,主要问题是,一方面你想处理
消息一旦产生,另一方面只想更新
每 60 秒更新一次数据库。
如果您使用 q.get()
,默认情况下此方法调用将阻塞,直到队列中有可用消息为止。这可能需要 60 秒以上的时间,这会使数据库更新延迟太久。所以我们不能使用阻塞 q.get
。我们需要使用带有超时的 q.get
以便调用是非阻塞的:
import time
import multiprocessing as mp
import random
import Queue
def process_messages(q):
messages = []
start = time.time()
while True:
try:
message = q.get(timeout=1)
except Queue.Empty:
pass
else:
messages.append(message)
print('Doing data analysis on {}'.format(message))
end = time.time()
if end-start > 60:
print('Updating database: {}'.format(messages))
start = end
messages = []
def get_messages(q):
while True:
time.sleep(random.uniform(0,5))
message = random.randrange(100)
q.put(message)
if __name__ == "__main__":
q = mp.Queue()
proc1 = mp.Process(target=get_messages, args=[q])
proc1.start()
proc2 = mp.Process(target=process_messages, args=[q])
proc2.start()
proc1.join()
proc2.join()
产生如下输出:
Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]
我正在尝试持续使用来自 kafka 的事件。同一个应用程序还使用这些消耗的数据来执行一些分析并以 n 秒为间隔更新数据库(假设 n = 60 秒)。
在同一个应用中,如果process1 = Kafka Consumer , process2= Data Analysis and database update logic.
process1 is to be run continuously
process2 is to be executed once every n=60 seconds
process2
与计算和数据库更新有关,因此需要 5-10 秒来执行。我不希望 process1
在 process2
执行期间停止。因此,我正在使用 multiprocessing module
(如果我在 python 中使用 Threading
模块,process1,process2
将是 thread1,thread2
,但由于我阅读了有关 GIL 的内容Threading
模块无法利用多核架构,我决定使用 multiprocessing
模块。)在这种情况下实现并发。 (如果我对上述 GIL
或 Threading
模块限制的理解不正确,我深表歉意,请随时纠正我)。
我的应用程序在两个进程之间有一个相当简单的交互,其中 process1
只是用它在 60 秒内收到的所有消息填充队列,并在 60 秒结束时,只是将所有消息传输到process2
。
我在使用此传输逻辑时遇到问题。如何将队列的内容从 process1
传输到 process2
(我猜那是主进程还是另一个进程?这是我的另一个问题,除了主进程之外我应该实例化 2 个进程吗进程?)在 60 秒结束时清除队列内容,以便它在另一次迭代中再次开始。
到目前为止我有以下内容:
import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue
def kafka_init():
client=KafkaClient('kafka1.wpit.nile.works')
consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
return consumer
def consumeMessages(q):
print "thread started"
while not q.empty():
try:
print q.get(True,1)
Queue.Empty:
break
print "thread ended"
if __name__=="__main__":
starttime=time.time()
timeout=starttime+ 10 #timeout of read in seconds
consumer=kafka_init()
q=Queue()
p=Process(target=consumeMessages,args=q)
while(True):
q.put(consumer.get_message())
if time.time()>timeout:
#transfer logic from process1 to main process here.
print "Start time",starttime
print "End time",time.time()
p.start()
p.join()
break
如有任何帮助,我们将不胜感激。
您正在处理的问题不是特定于 kafka 的,因此我将使用泛型 "messages",它们只是整数。
在我看来,主要问题是,一方面你想处理 消息一旦产生,另一方面只想更新 每 60 秒更新一次数据库。
如果您使用 q.get()
,默认情况下此方法调用将阻塞,直到队列中有可用消息为止。这可能需要 60 秒以上的时间,这会使数据库更新延迟太久。所以我们不能使用阻塞 q.get
。我们需要使用带有超时的 q.get
以便调用是非阻塞的:
import time
import multiprocessing as mp
import random
import Queue
def process_messages(q):
messages = []
start = time.time()
while True:
try:
message = q.get(timeout=1)
except Queue.Empty:
pass
else:
messages.append(message)
print('Doing data analysis on {}'.format(message))
end = time.time()
if end-start > 60:
print('Updating database: {}'.format(messages))
start = end
messages = []
def get_messages(q):
while True:
time.sleep(random.uniform(0,5))
message = random.randrange(100)
q.put(message)
if __name__ == "__main__":
q = mp.Queue()
proc1 = mp.Process(target=get_messages, args=[q])
proc1.start()
proc2 = mp.Process(target=process_messages, args=[q])
proc2.start()
proc1.join()
proc2.join()
产生如下输出:
Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]