How to fix 'ERROR:asyncio:Task was destroyed but it is pending! error in Python

How to fix 'ERROR:asyncio:Task was destroyed but it is pending! error in Python

我有一个简单的 websocket 服务器,它应该将相机帧提供给电子应用程序。代码主要取自here。这个想法是,当我打开 websocket 时,python 将启动一个 while 循环,不断发送将由 js 'client' 捕获的帧。但是,为了能够停止这个 while 循环,我需要在执行程序上 运行 这个方法,否则它会卡在里面。只要未设置事件并且该事件是在 websocket 的 on_close 方法上设置的,循环就应该是 运行。

我做了一个模拟相机的单独方法,在实际的应用程序中,帧来自另一个进程。

问题是,在 运行 运行程序一段时间后,我的日志被淹没了:ERROR:asyncio:Task 已被破坏,但它正在等待处理!

我的假设是主 ioloop 和执行程序循环之间存在一些同步问题。我还尝试了 starting/stopping 一个在 opening/closing websocket 时发送帧的线程,但遇到了同样的问题...

这是我的代码:

ws_server.py

import cv2
import numpy
import asyncio

import traceback

from concurrent.futures import ThreadPoolExecutor

from tornado.websocket import WebSocketHandler
from tornado.concurrent import run_on_executor
from tornado.ioloop import IOLoop
from tornado.web import Application

from threading import Event


class StreamHandler(WebSocketHandler):

    executor = ThreadPoolExecutor()

    def check_origin(self, origin):
        return True

    def initialize(self):
        self.stop_event = Event()
        self.camera = self.frame_generator()

    def frame_generator(self):

        while not self.stop_event.is_set():
            frame = numpy.random.randint(0, 255, (800, 800))
            _, frame = cv2.imencode('.jpg', frame)
            frame = frame.tostring()

            yield frame

    def open(self):

        print("Connection opened.")
        self.send_frames()

    @run_on_executor
    def send_frames(self):
        asyncio.set_event_loop(asyncio.new_event_loop())
        while not self.stop_event.is_set():

            img = next(self.camera)

            if img is not None:
                try:
                    self.write_message(img, binary=True)
                except:
                    print(traceback.format_exc())

    def on_close(self):
        self.stop_event.set()
        print("Connection closed.")


def main():

    print("starting server.")
    app = Application([
        (r"/camera", StreamHandler),
    ])
    app.listen(8083)
    IOLoop.instance().start()


if __name__ == '__main__':
    main()

client.js

var img = document.getElementById("liveImg");
var arrayBuffer;

var ws = new WebSocket("ws://localhost:8083/camera");
ws.binaryType = 'arraybuffer';

ws.onopen = function(){
    console.log("connection was established");
};
ws.onmessage = function(evt){
arrayBuffer = evt.data;
img.src = "data:image/jpeg;base64," + encode(new Uint8Array(arrayBuffer));
};

function encode (input) {
    var keyStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
    var output = "";
    var chr1, chr2, chr3, enc1, enc2, enc3, enc4;
    var i = 0;

    while (i < input.length) {
        chr1 = input[i++];
        chr2 = i < input.length ? input[i++] : Number.NaN; // Not sure if the index
        chr3 = i < input.length ? input[i++] : Number.NaN; // checks are needed here

        enc1 = chr1 >> 2;
        enc2 = ((chr1 & 3) << 4) | (chr2 >> 4);
        enc3 = ((chr2 & 15) << 2) | (chr3 >> 6);
        enc4 = chr3 & 63;

        if (isNaN(chr2)) {
            enc3 = enc4 = 64;
        } else if (isNaN(chr3)) {
            enc4 = 64;
        }
        output += keyStr.charAt(enc1) + keyStr.charAt(enc2) +
                  keyStr.charAt(enc3) + keyStr.charAt(enc4);
    }
    return output;
}

index.html

<html>
 <head>
  <title>livecamera</title>
  <img id="liveImg" width="480" height="360"></canvas>
  <script type="text/javascript" src="./client.js"></script>
 </head>
</html>

下面是我的日志中出现的示例:

ERROR:asyncio:Task已被销毁,但正在等待中! 任务:.wrapper() 运行ning 在 /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task 已被销毁但正在等待中! 任务:.wrapper() 运行ning 在 /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task 已被销毁但正在等待中! 任务:.wrapper() 运行ning 在 /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task 已被销毁但正在等待中! 任务:.wrapper() 运行ning 在 /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task 已被销毁但正在等待中! 任务:.wrapper() 运行ning 在 /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task 已被销毁但正在等待中! 任务:.wrapper() 运行ning 在 /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task 已被销毁但正在等待中! 任务:.wrapper() 运行 宁在 /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>>

不要在执行线程中创建新的事件循环。

write_message 必须从处理连接的同一个事件循环线程调用。这意味着如果您使用的是执行器,则必须在执行器和事件循环之间来回传递消息,以便您可以在执行器上执行阻塞任务并在事件循环上编写 websocket 消息。在执行器中创建一个新的事件循环将绕过关于线程中没有事件循环的错误,但这是不正确的——这些警告的重点是你必须使用 same 事件循环,而不是创建一个新的。

在实践中,我建议尽可能多地在事件循环线程上做,并且只将特定的阻塞工作传递给执行程序:

async def send_frames(self):
    while not self.stop_event.is_set():

        img = await IOLoop.current().run_in_executor(self.executor, next, self.camera)

        if img is not None:
            try:
                await self.write_message(img, binary=True)
            except:
                print(traceback.format_exc())