django channels asyncio 麻烦 运行 任务顺序

django channels asyncio trouble running task in order

我正在努力让一些代码使用 asyncio 来工作。我对它很陌生。太新了,我不知道如何正确地找出我做错了什么。

我正在使用 Django Channels 运行 一个 AyncJsonWebsocketConsumer,我通过客户端应用程序的 websockets 连接到它。我使用 websockets 因为我需要双向通信。我正在创建一个打印过程,我在其中开始一系列长 运行ning 操作,但我需要暂停、停止等的能力。当我使用 asyncio.sleep(x) 进行模拟时,我已经完成了所有这些工作我的长期 运行ning 任务(print_layer 方法)。当我尝试将我的 RPC 添加到 asyncio.sleep 的队列中时,它按预期停止工作。

class Command(Enum):
    START = 'start_print'
    PAUSE = 'pause_print'
    STOP = 'stop_print'
    CANCEL = 'cancel_print'
    RESET = 'reset_print'

class State(Enum):
    NOT_STARTED = 0
    STARTING = 1
    RUNNING = 2
    PAUSED = 3
    STOPPED = 4
    COMPLETE = 5
    ERROR = 500

class PrintConsumer(AsyncJsonWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.status = State.NOT_STARTED
        self.publisher = Publisher()
        self.print_instruction = None
        self.current_task = None
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(enabled=True)

    @property
    def running_task(self):
        return self.current_task is not None and not self.current_task.done() and not self.current_task.cancelled()

    async def connect(self):
        await self.accept()

    async def disconnect(self, close_code):
        self.status = State.STOPPED

    async def receive(self, text_data):
        response = json.loads(text_data)
        event_cmd = response.get('event', None)
        
        if Command(event_cmd) == Command.START:
            if self.status == State.NOT_STARTED:
                sting_uuid = response.get('sting_uuid', None)
                try:
                    await asyncio.wait_for(self.initialize_print(sting_uuid), timeout=10)  # WARNING: this is blocking
                except asyncio.TimeoutError:
                    self.status = State.ERROR
            self.status = State.RUNNING
            if not self.running_task:
                # it would only have a running task in this situation already if someone starts/stops it quickly
                self.current_task = asyncio.create_task(self.resume_print())
        elif Command(event_cmd) == Command.PAUSE:
            self.status = State.PAUSED
        elif Command(event_cmd) == Command.STOP:
            if self.running_task:
                self.current_task.cancel()
        elif Command(event_cmd) == Command.RESET:
            self.status = State.NOT_STARTED
            self.print_instruction = None
            if self.running_task:
                self.current_task.cancel()
            self.current_task = None
            await self.send(json.dumps({ 'message': []}))
        
    async def initialize_print(self, uuid):
        stingfile = await get_file(uuid)
        # This is just an iterator that returns the next task to do
        # hence I use "next" in the resume_print method
        self.print_instruction = StingInstruction(stingfile)


    async def resume_print(self):
        try:
            while self.status == State.RUNNING:
                await self.send(json.dumps({ 'message': self.print_instruction.serialized_action_status}))
                await asyncio.sleep(.2) # It works with this here only
                try:
                    action = next(self.print_instruction)  # step through iterator
                except StopIteration:
                    self.status = State.COMPLETE
                    break;

                # can't seem to get this part to work
                await self.print_layer(action)
        except asyncio.CancelledError:
            # TODO: Add logic here that will send a command out to stop pumps and all motors.
            self.status = State.STOPPED
    
    async def print_layer(self, instruction):
        print_command = instruction['instruction']
        # this publishes using RPC, so it holds until I get a response.
        try:
            await asyncio.wait_for(self.publisher.publish('pumps', json.dumps(print_command)), timeout=10)
        except asyncio.TimeoutError:
            self.status = State.ERROR
        # when I used just this in the function, the code worked as expected
        # await asyncio.sleep(1)


在展示我的尝试时,我不知道从哪里开始...我的“最佳”尝试,如我所见,是将 print_layer 方法变成一个线程,以便它没有使用 asyncio.to_thread(print_layer) 阻止执行。但是在我尝试的许多事情中,它甚至不会执行。

self.print_instruction.serialized_action_statusreturns每一步的状态。我的目标是让它在每个长 运行ning 任务之前发送它。这可能看起来像...

# sending status update for each step to client
# running print_layer for first action
# sending status update for each step to client
# running print_layer for second action
...
# sending final update

相反,我一次创建每一个任务,当我添加长 运行ning 任务或许多问题时,它会在最后发送所有更新。我可以按顺序(表面上)将长 运行ning 任务发送到 运行,但 send 实际上不会发送中间层打印。非常感谢您的帮助。提前谢谢您。

这是我的发布者的一些简化相关代码(不处理连接丢失等)...

class Publisher():
    def on_response(self, ch, method, props, body):
        """when job response set job to inactive"""

    async def publish(routing_key, msg):
        new_corr_id = str(uuid4())
        self.active_jobs[new_corr_id] = False
        self.channel.basic_publish(...)
        white not self.active_jobs[new_corr_id]:
            self._connection.process_data_events()
            sleep(.1)

我发现了一个部分有效的 hack.. 如果我添加 await asyncio.sleep(.1) 在我的发送命令之后(即像这样)

await self.send(json.dumps({ 'message': self.print_instruction.serialized_action_status}))
await asyncio.sleep(.2)

然后它似乎按照我想要的方式工作(减去中断能力),我仍然能够 pause/start 我的过程。显然,我宁愿在没有黑客的情况下这样做。为什么这段代码突然工作,状态更新在 .2 asyncio 睡眠之后按预期发送而不是没有?我也不能用我不明白的 STOP 命令中断。我本来希望 django 通道读取停止命令,然后取消 运行ning 的任务并强制 resume_print 方法中的 asyncio.CancelledError

我将从更改代码中的一件事开始:

    if Command(event_cmd) == Command.START:
        if self.status == State.NOT_STARTED:
            sting_uuid = response.get('sting_uuid', None)
            try:
                await asyncio.wait_for(self.initialize_print(sting_uuid), timeout=10)  # WARNING: this is blocking
            except asyncio.TimeoutError:
                self.status = State.ERROR
        self.status = State.RUNNING
        #--> This will always set it to running, as it is outside of the if statement. That will stop your stop problem
        #You might want to do this (see https://www.w3schools.com/python/python_try_except.asp) : 
            else:
                self.status = State.RUNNING
        if not self.running_task:
            # it would only have a running task in this situation already if someone starts/stops it quickly
            self.current_task = asyncio.create_task(self.resume_print())
    elif Command(event_cmd) == Command.PAUSE:
        self.status = State.PAUSED
    elif Command(event_cmd) == Command.STOP:
        if self.running_task:
            self.current_task.cancel()
    elif Command(event_cmd) == Command.RESET:
        self.status = State.NOT_STARTED
        self.print_instruction = None
        if self.running_task:
            self.current_task.cancel()
        self.current_task = None
        await self.send(json.dumps({ 'message': []}))
    

此外,你能用你正在做的进口来完成它吗?

你看了吗:

https://docs.python.org/3/library/asyncio-task.html

这可能会帮助您让发送选项随心所欲。

我能够弄清楚我做错了什么。

我一直在假设,因为我把一个函数做成了协程(在它前面放了async),偶数循环会在做io操作时自动等待。绝对不是这样的。就我而言,这是在这里:

class Publisher():
    def on_response(self, ch, method, props, body):
        """when job response set job to inactive"""

    async def publish(routing_key, msg):
        new_corr_id = str(uuid4())
        self.active_jobs[new_corr_id] = False
        self.channel.basic_publish(...)
        white not self.active_jobs[new_corr_id]:
            self._connection.process_data_events()
            sleep(.1)

publish 方法中。此示例使用 pika 并执行 basic_publish 直接交换。这不是异步的,并且阻塞了事件循环。我将我的发布者切换为使用 aio-pika,然后一旦我的 publish 命令变为异步,事件循环就不再被阻止,并且代码按预期工作。