Python Asyncio 脚本没有完成

Python Asyncio script don't finish

我正在构建一个使用 asyncio 和 3 个队列的 python 脚本。 我正在分 4 个步骤处理来自不同来源的数据,我的想法是使用队列来保存一个步骤的结果,以便尽快在下一步中使用。 该脚本正在执行它应该执行的操作,但出于某种原因,我无法弄清楚所有数据何时处理完毕,脚本没有完成。 为了尝试理解这个问题,我构建了一个简化版本的脚本,我在其中进行了简单的数学运算。

首先,我用 50 个介于 0 和 10 之间的随机数填充第一个队列。 接下来,我获取存储在 queue1 中的数字,将其平方并将结果放入 queue2。 接下来我得到存储在 queue2 中的平方数,将其加倍并将结果存储在 queue3 中。 最后,我将最终结果存储在 queue3 中并将其附加到数据帧并将结果保存到文件中。

正如我所说。上述过程有效,但当我处理完所有元素 queue3 时,我预计该过程会完成。

这是我为演示我的问题而构建的玩具代码的第一个版本

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue):
    for k in range(50):
        r=random.randint(0,10)
        #await asyncio.sleep(r)
        await queue.put((k,r))

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    await queue1.join()
    await queue2.join() 
    await queue3.join()

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

对这个问题进行一些研究后我发现了另一个问题

[1]: 建议不要使用 queue.join 并使用 sentinel shutdonw。

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue1):
    for k in range(50):
        r=random.randint(0,10)
        queue1.put_nowait((k,r))
    queue1.put_nowait(None)

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        if r is None:
            await queue2.put(None)
            break

        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        if r is None:
            await queue3.put(None)
            break

        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        if r is None:
            break

        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

但是并没有解决问题。我也曾尝试从 class 定义中删除函数,但效果不佳。

我开始使用 asyncio 模块,我认为我犯了一些我看不到的基本错误。欢迎任何提示。

更新

我进一步简化了问题并得到了一些可以引出答案的有趣效果。我创建了另一个玩具代码,它只使用一个队列来存储初始随机数。该代码从该队列中获取数字的平方并在终端中打印。代码和平完成。所以我认为这个问题在某种程度上可能与我使用多个队列这一事实有关。

import asyncio
import random

class asyncio_toy():

    def __init__(self):
        ...

    async def generate_random_number(self,i:int,queue):
        for _ in range(i):
            r=random.randint(0,5)
            await asyncio.sleep(r)
            await queue.put((i,r))
    
    async def square_scan(self,k,queue):
        while True:
            (i,r) = await queue.get()
            print(f'prod {i} - cons {k} - {r} - {r*r}')
            queue.task_done()

    async def main(self):
        queue = asyncio.Queue()
        prod = [asyncio.create_task(self.generate_random_number(n,queue)) for n in range(5)]
        cons = [asyncio.create_task(self.square_scan(k,queue)) for k in range(4)]
        
        await asyncio.gather(*prod)
        await queue.join() 
        
        for c in cons:
            c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

如果我发送 None 五次,代码对我有用,因为有五个函数使用相同的 queue,并且它们都需要 None 才能退出 while -loop.

 for x in range(5): 
     queue.put(None)

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])

    async def generate_random_number(self, i:int, queue):
        for k in range(10):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k,r))
            
        for x in range(5):
            await queue.put(None)
            
        
    async def square_it(self,n,queue1,queue2):
        while True:
            #print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
            r = await queue1.get()
            
            if r is None:
                print('exit: SQUARE IT', n)  
                await queue2.put(None)
                break
            
            k, r = r
            await asyncio.sleep(1)
            await queue2.put((k, r*r))
            queue1.task_done()
            #print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
    
    async def double_it(self,n,queue2,queue3):
        while True:
            #print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
            r = await queue2.get()
            
            if r is None:
                print('exit: DOUBLE IT', n)  
                await queue3.put(None)
                break
            
            k, r = r
            await asyncio.sleep(1)
            await queue3.put((k, 2*r))
            queue2.task_done()
            #print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
    
    async def save_it(self,n,queue3):
        while True:
            #print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
            r = await queue3.get()
            
            if r is None:
                print('exit: SAVE IT', n)  
                break
            
            k, r = r
            await asyncio.sleep(1)
            self.df.loc[len(self.df)]=[k, r]
            self.df.to_csv('final_result.csv')
            queue3.task_done()
            #print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')
    
    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result
    
        rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
        save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
        
        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')
        
        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()
    
        print('cancel')
        for a in square_scan:
            a.cancel()
    
        for b in double_scan:
            b.cancel()
    
        for c in save_scan:
            c.cancel()
    
### testing
if __name__ == '__main__':

    toy = asyncio_toy()

    asyncio.run(toy.main())

编辑:

使用 while runningrunning = False 停止所有线程的版本。

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])
        self.running_square_it = True
        self.running_double_it = True
        self.running_save_it = True
        
    async def generate_random_number(self, i, queue):
        for k in range(20):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k, r))
            
        #for x in range(5):
        await queue.put(None)
            
        
    async def square_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
 
        while self.running_square_it:
        
            if not queue_input.empty():
                r = await queue_input.get()
            
                if r is None:
                    print('exit: SQUARE IT', n)  
                    await queue_output.put(None)
                    self.running_square_it = False
                else:
                    k, r = r
                    await queue_output.put((k, r*r))
                    
            await asyncio.sleep(0.1)  # need it to run other loops
            
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
    
    
    async def double_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
        
        while self.running_double_it:

            if not queue_input.empty():
                r = await queue_input.get()
            
                if r is None:
                    print('exit: DOUBLE IT', n)  
                    await queue_output.put(None)
                    self.running_double_it = False
                else:
                    k, r = r
                    await queue_output.put((k, 2*r))

            await asyncio.sleep(0.1)  # need it to run other loops
            
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
    
    
    async def save_it(self, n, queue_input):
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue_input.qsize()).zfill(2)}')
        
        while self.running_save_it:
            
            if not queue_input.empty():
                r = await queue_input.get()
                
                if r is None:
                    print('exit: SAVE IT', n)  
                    self.running_save_it = False
                else:            
                    k, r = r
                    self.df.loc[len(self.df)] = [k, r]
                    self.df.to_csv('final_result.csv')
 
            await asyncio.sleep(0.1)  # need it to run other loops
           
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue_input.qsize()).zfill(2)}')
    
    
    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result
    
        rand_gen    = [asyncio.create_task(self.generate_random_number(n, queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k, queue1, queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k, queue2, queue3)) for k in range(10)]
        save_scan   = [asyncio.create_task(self.save_it(k, queue3)) for k in range(5)]
        
        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')
        
        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()
    
        print('cancel')
        #for a in square_scan:
        #    a.cancel()
    
        #for b in double_scan:
        #    b.cancel()
    
        #for c in save_scan:
        #    c.cancel()
    
### testing
if __name__ == '__main__':
    toy = asyncio_toy()
    asyncio.run(toy.main())

furas提出的想法很有趣,但稍微玩了一下我发现了一个问题。如果我在执行 square 和 double 操作之前添加一个随机延迟(以模拟实际操作),最终我们可能会遇到其中一个队列为空但已处理一些数据的情况,因此脚本在处理所有数据之前被中断。为了解决这个问题,我想出了一个为每个任务使用令牌(一个简单的布尔值列表)的想法,所以除了队列的大小(或者它是否为空)之外,我还检查是否所有令牌都被释放(false ) 以及是否完成了上一步。此解决方案适用于每个步骤中的任意数量的任务以及执行每个任务所需的任何时间。这是代码

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

# ANSI colors
c = (
    "3[0m",   # End of color
    "3[36m",  # Cyan
    "3[91m",  # Red
    "3[35m",  # Magenta
)

class asyncio_toy():
def __init__(self,num_of_tasks):
    self.df = pd.DataFrame(columns=['id','final_value'])

    self.square_it_tolken=[False]*num_of_tasks[0]
    self.double_it_tolken=[False]*num_of_tasks[1]
    self.save_it_tolken=[False]*num_of_tasks[2]

    self.running_square = True
    self.running_double = True
    self.running_saveit = True

    self.num_of_tasks=num_of_tasks

async def generate_random_number(self,i:int,queue1):
    for k in range(50):
        r=random.randint(0,10)
        queue1.put_nowait((k,r))

async def square_it(self,n,queue1,queue2):
    while self.running_square:
        
        #if queue1.qsize()>0:
        if not queue1.empty():

            print(c[1]+ f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2: {str(queue2.qsize()).zfill(2)}')
            self.square_it_tolken[n]=True # take a token
            print(self.square_it_tolken)

            r=await queue1.get()
            await asyncio.sleep(random.randint(15,30))
            await queue2.put((r[0],r[1]*r[1]))

            print(c[1]+ f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2: {str(queue2.qsize()).zfill(2)}')
            self.square_it_tolken[n]=False # return a token
            print(self.square_it_tolken)

        if queue1.empty() and all(not token for token in self.square_it_tolken):
            print('exit: SQUARE IT', n)
            self.running_square = False

        await asyncio.sleep(0.1) # need it to run other loops

async def double_it(self,n,queue2,queue3):
    while self.running_double:
        
        #if queue2.qsize()>0:
        if not queue2.empty():

            print(c[2]+ f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3: {str(queue3.qsize()).zfill(2)}')
            self.double_it_tolken[n]=True # take a token
            print(self.double_it_tolken)

            r=await queue2.get()
            await asyncio.sleep(random.randint(5,15))
            await queue3.put((r[0],2*r[1]))

            print(c[2]+f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3: {str(queue3.qsize()).zfill(2)}')
            self.double_it_tolken[n]=False # release a token
            print(self.double_it_tolken)

        if queue2.empty() and all(not token for token in self.double_it_tolken) and not self.running_square:
            print('exit: DOUBLE IT', n)
            self.running_double = False

        await asyncio.sleep(0.1)

async def save_it(self,n,queue3):
    while self.running_saveit:
        
        #if queue3.qsize()>0:
        if not queue3.empty():

            print(c[3]+f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
            self.save_it_tolken[n]=True # take a token
            print(self.save_it_tolken)

            r=await queue3.get()
            await asyncio.sleep(random.randint(1,5))
            self.df.loc[len(self.df)]=[r[0],r[1]]
            self.df.to_csv('final_result.csv')

            print(c[3]+f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')
            self.save_it_tolken[n]=False # release a token
            print(self.save_it_tolken)

        if queue3.empty() and all(not token for token in self.save_it_tolken) and not self.running_double:
            print('exit: SAVE IT', n)
            self.running_saveit = False

        await asyncio.sleep(0.1)
        
async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(self.num_of_tasks[0])]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(self.num_of_tasks[1])]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(self.num_of_tasks[2])]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)
    

### testing
if __name__ == '__main__':

    toy=asyncio_toy([5,10,5])

    asyncio.run(toy.main())