Asyncio with multiprocessing:生产者-消费者模型

Asyncio with multiprocessing : Producers-Consumers model

我正在尝试检索股票价格并在价格出现时对其进行处理。我是并发初学者,但我认为这种设置似乎适合异步生产者-消费者模型,在该模型中,每个生产者检索股票价格,然后通过队列将其传递给消费者。现在,由于工作 CPU 密集,消费者可以并行处理股票价格(多处理)。因此,我会有多个消费者已经在工作,而并非所有生产者都已完成检索数据。此外,我想实施一个步骤,如果消费者发现它正在处理的股票价格无效,我们会为该股票生成一个新的消费者工作。

到目前为止,我有以下玩具代码可以让我到达那里,但我的 process_data 功能(消费者)有问题。

from concurrent.futures import ProcessPoolExecutor
import asyncio
import random
import time
random.seed(444)

#producers
async def retrieve_data(ticker, q):
    '''
    Pretend we're using aiohttp to retrieve stock prices from a URL
    Place a tuple of stock ticker and price into asyn queue as it becomes available
    '''
    start = time.perf_counter() # start timer
    await asyncio.sleep(random.randint(4, 8)) # pretend we're calling some URL
    price = random.randint(1, 100) # pretend this is the price we retrieved
    print(f'{ticker} : {price} retrieved in {time.perf_counter() - start:0.1f} seconds') 
    await q.put((ticker, price)) # place the price into the asyncio queue
    

#consumers
async def process_data(q):
    while True:
        data = await q.get()
        print(f"processing: {data}")
        with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, data_processor, data)
            #if output of data_processing failed, send ticker back to queue to retrieve data again
            if not result[2]: 
                print(f'{result[0]} data invalid. Retrieving again...')
                await retrieve_data(result[0], q) # add a new task
                q.task_done() # end this task
            else:
                q.task_done() # so that q.join() knows when the task is done
            
async def main(tickers):       
    q = asyncio.Queue()
    producers = [asyncio.create_task(retrieve_data(ticker, q)) for ticker in tickers]
    consumers = [asyncio.create_task(process_data(q))]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too. blocks until all items in the queue have been received and processed
    for c in consumers:
        c.cancel() #cancel the consumer tasks, which would otherwise hang up and wait endlessly for additional queue items to appear
    

    
'''
RUN IN JUPYTER NOTEBOOK
'''
start = time.perf_counter()
tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
await main(tickers)
print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

'''
RUN IN TERMINAL
'''
# if __name__ == "__main__":
#     start = time.perf_counter()
#     tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
#     asyncio.run(main(tickers))
#     print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

下面的 data_processor() 函数,由上面的 process_data() 调用,需要位于 Jupyter notebook 中的不同单元格中,或者位于单独的模块中(据我所知,以避免酸洗错误)

from multiprocessing import current_process

def data_processor(data):
    ticker = data[0]
    price = data[1]
    
    print(f'Started {ticker} - {current_process().name}')
    start = time.perf_counter() # start time counter
    time.sleep(random.randint(4, 5)) # mimic some random processing time
    
    # pretend we're processing the price. Let the processing outcome be invalid if the price is an odd number
    if price % 2==0:
        is_valid = True
    else:
        is_valid = False
    
    print(f"{ticker}'s price {price} validity: --{is_valid}--"
          f' Elapsed time: {time.perf_counter() - start:0.2f} seconds')
    return (ticker, price, is_valid)

问题

  1. 我没有使用 python 的多处理模块,而是使用了 concurrent.futures' ProcessPoolExecutor,我读到它与 asyncio (What kind of problems (if any) would there be combining asyncio with multiprocessing?) 兼容。但似乎我必须在检索执行程序调用的函数的输出 (result) 和能够并行 运行 多个子进程之间做出选择。使用下面的构造,子进程 运行 顺序进行,而不是并行进行。

    with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, data_processor, data)  
    

删除 loop.run_in_executor(executor, data_processor, data) 前面的 result = await 允许 运行 多个消费者并行,但我无法从父进程收集他们的结果。为此,我需要 await。然后代码块的其余部分当然会失败。

如何并行处理这些子进程 运行 并提供输出?也许它需要一个不同于生产者-消费者模型的结构或其他东西

  1. 再次请求无效股票价格的代码部分有效(前提是我可以从上面得到结果),但在调用它的子进程中是运行并阻止创建新的消费者,直到请求得到满足。有办法解决这个问题吗?

    #if output of data_processing failed, send ticker back to queue to retrieve data again
    if not result[2]: 
            print(f'{result[0]} data invalid. Retrieving again...')
            await retrieve_data(result[0], q) # add a new task
            q.task_done() # end this task
        else:
            q.task_done() # so that q.join() knows when the task is done
    

But it seems that I have to choose between retrieving the output (result) of the function called by the executor and being able to run several subprocesses in parallel.

幸运的是情况并非如此,您也可以使用 asyncio.gather() 一次等待多个项目。但是你从队列中一个一个获取数据项,所以你没有一批项要处理。最简单的解决方案是启动多个消费者。替换

# the single-element list looks suspicious anyway
consumers = [asyncio.create_task(process_data(q))]

与:

# now we have an actual list
consumers = [asyncio.create_task(process_data(q)) for _ in range(16)]

每个消费者都会等待一个单独的任务完成,但这没关系,因为您将拥有一个并行工作的整个池,这正是您想要的。

此外,您可能希望将 executor 设置为全局变量并且 使用 with,以便进程池由所有消费者共享并且持续时间与程序一样长。这样,消费者将重用已经生成的工作进程,而不必为从队列中接收到的每个作业生成一个新进程。 (这就是拥有进程“池”的全部意义所在。)在这种情况下,您可能希望在程序中不再需要执行程序的地方添加 executor.shutdown()