在 Python 3 代码中使用 asyncio,我怎样才能 (re)start/stop 循环非阻塞 websocket IO?

With asyncio in Python 3 code, how can I (re)start/stop non-blocking websocket IO recurrently?

在我的 phone 语音识别项目中 Python 的 asynciowebsockets 模块基本上用于启用异步模式下客户端和服务器之间的数据交换。要识别的音频流通过本地 wav 文件从 PBX 通道(Asterisk PBX 适用)内部到达客户端,该文件累积了从接听电话到挂断事件的所有数据。在对话进行时,异步生产者将通话记录块(每块不超过 16 kB)推送到异步队列,以便消费者协程可以在发送到识别引擎服务器之前将数据写入缓冲区(我的选择是 Vosk instance with Kaldi engine 设计为使用 websocket 接口连接)。一旦缓冲区超过特定容量(例如可能为 288 kB),数据应由 send 函数刷新以识别并由 recv 返回(作为语音的转录本)。实时识别在这里很重要,因此我需要保证像 recv 这样的套接字操作不会在整个 websocket 会话中停止两个协程(它们应该能够保持基于队列的数据流直到挂断事件)。让我们看一下整个程序,首先有一个main,其中实例化了一个事件循环以及几个任务:

import logging
import asyncio
import time
from concurrent.futures._base import CancelledError  

from .transcription import Transcriber, get_record_size_info

logging.basicConfig(level=logging.DEBUG)
record_file_name = '/var/spool/asterisk/monitor/callrecord.wav'    

def main():
    transcriber = Transcriber()       

    logging.getLogger('asyncio').setLevel(logging.ERROR)
    logging.getLogger('asyncio.coroutines').setLevel(logging.ERROR)
    logging.getLogger('websockets.server').setLevel(logging.ERROR)
    logging.getLogger('websockets.protocol').setLevel(logging.ERROR)

    loop = asyncio.get_event_loop()    
    time.sleep(2)

    prod_task = loop.create_task(transcriber.run_producer(transcriber._queue))
    consum_task = loop.create_task(transcriber.run_consumer(transcriber._queue))
            
    tasks = [prod_task, consum_task]
            
    executed, remaining = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)) 
    logging.debug('Tasks completed: %s', executed)
    logging.debug('Tasks in progress: %s', remaining)

    for task in remaining:
        logging.info('Dropping task %s: %s', task, task.cancel())
    try:
        loop.run_until_complete(asyncio.gather(*remaining))
    except CancelledError:
        for running_task in remaining:
        logging.debug('Task dropped %s: %s', running_task, running_task.cancelled())                                         
            
    loop.stop()    
    loop.close()         

if __name__ == '__main__':
    main()       

Producer/consumer 实施如下:

from queue import Queue
from concurrent.futures._base import CancelledError 
from pathlib import Path

import logging
import asyncio
import websockets
import json

ASR_WS_ADDRESS = 'ws://127.0.0.1:2700'

class Transcriber:

    def __init__(self):
        self._queue = asyncio.Queue()
        self._buffer = b''
        self._current_record_size = 0 # terminate reading from wav file if current size of record is equal to total payload 
        self._record_file_name = '/var/spool/asterisk/monitor/callrecord.wav'
        self._total_payload = 0 # total of bytes written to buffer since call recording started  
           
    async def run_producer(self, qu):
        with open(self._record_file_name, 'rb') as record:
            print('call record file size: ' + str(get_record_size_info(self._record_file_name)))
            self._current_record_size = get_record_size_info(self._record_file_name)
            while True:
                await asyncio.sleep(0.5)
                chunk = record.read(16000)           
                qu.put_nowait(chunk)            
                qsize = qu.qsize()

    async def run_consumer(self, qu):
        while True:            
            data = await qu.get()             
            await asyncio.sleep(1)
            self._buffer += data                      
            self._current_record_size = get_record_size_info(self._record_file_name)            
            print('now buffer contains : ' + str(len(self._buffer)) + ' bytes')
            print('current record size: ' + str(self._current_record_size) + ' bytes')
            print('current total payload: ' + str(self._total_payload) + ' bytes')           
           
            if len(self._buffer) >= 288000:                
                await self.do_recognition()
                self._total_payload += len(data)
                self._buffer = b''               
            elif len(data) == 0 and self._current_record_size == self._total_payload:
                print('looks like recording is complete...')
                await self.do_recognition()               
                self._queue._queue.clear() # remove items from queue before loop gets close      
                self._queue._finished.set()
                self._queue._unfinished_tasks = 0               
                raise Exception('cancel both tasks and close loop')
            else:
                self._total_payload += len(data)
                continue
    
    async def do_recognition(self):
        async with websockets.connect(ASR_WS_ADDRESS) as ws:        
            logging.debug('Sending %s to Vosk-hosted Kaldi engine', len(self._buffer))
            await ws.send(self._buffer)
                                     
            response = json.loads(await ws.recv())
            try:
                result = response['partial']
                if len(result) > 0:
                print('\n')
                print(result + '\n')
            except KeyError:
                result = response['text']
                if len(result) > 0:
                print('\n')
                print(result + '\n')    

def get_record_size_info(record_file_name):
    return Path(record_file_name).stat().st_size

这是我几天来一直在思考的一个问题:如何以非阻塞方式 运行 do_recognition 方法来避免 2-3 秒的停顿一次 recv 执行开始?与更长的通话对话相比,我需要触发更多的语音识别请求,即本质上阻塞程序对实时性能来说是灾难性的。由于在我的案例中经常执行 stop/resume,我在 SO 上看到的每个解决方案(具体来说,, , )都没有解决这个问题,所以我正在寻找任何处理这个问题的指示分别。请分享一些想法可以应用哪些解决方法来实现我想要的改进,我自己在 asyncio 方面的经验远远不足以有效地调整上述内容。

如果我对问题的理解正确,您可能想将 await self.do_recognition() 替换为 asyncio.create_task(self.do_recognition()) 以使 do_recognition 在后台执行。如果你需要支持 Python 3.6 和更早版本,你可以使用 loop.create_task(...)asyncio.ensure_future(...),在这种情况下它们都做同样的事情。

这样做时,您还需要提取 self._buffer 的值并将其作为参数传递给 do_recognition,以便它可以独立于到达的新数据发送缓冲区内容.

两个与问题无关的注释:

  • 该代码正在访问队列的内部实现属性,在生产代码中应避免这种情况,因为它可能随时停止工作,即使在 Python 的错误修复版本中也是如此。以 _ 开头的属性,如 _finished_unfinished_tasks 不在向后兼容性保证范围内,可以删除、重命名或更改含义,恕不另行通知。

  • 您可以从公开公开它的顶级 asyncio 包导入 CancelledError。您不需要引用内部 concurrent.futures._base 模块,它恰好是 class 由实现定义的地方。