异步线程 Python OCPP

Asyncronous Threading Python OCPP

我正在尝试在 python 中实现 ocpp 库。有两个函数,运行 在 while 循环中连续,cp.start() 用于日志记录,cp.heartbeat 作为协议实习生心跳。当我想在我的例程中正常实现它们时,while 循环将阻塞事件循环,所以我希望它们作为线程。但是图书馆似乎有问题。

async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_3',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_3', ws)

        def start_logging(loop):
            asyncio.set_event_loop(loop)
            loop.create_task(cp.start())
            loop.run_forever()

        loop = asyncio.get_event_loop()
        t = threading.Thread(target=start_logging, args=(loop,))
        t.start()
   
        await asyncio.gather(cp.send_heartbeat())


if __name__ == '__main__':
    asyncio.run(main())

错误:

ConnectionResetError: [WinError 995] Der E/A-Vorgang wurde wegen eines Threadendes oder einer Anwendungsanforderung abgebrochen

AssertionError
ERROR:asyncio:Error on reading from the event loop self pipe
loop: <ProactorEventLoop running=True closed=False debug=False>

AssertionError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-5' coro=<ChargePoint.start() done, defined at C:\Users\sasko\AppData\Local\Programs\Python\Python39\lib\site-packages\ocpp\charge_point.py:121> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>

即使我将线程设置为守护进程,心跳也会起作用,但我无法再关闭程序。 最终的目标是让cp.start()和心跳运行在一个线程中,这样我就可以在另一个逻辑中控制电动车的充电过程。

查看github上的代码库,所有你想调用的函数都是协程。它们可能包含无限循环,但它们中有 await 语句,这使得它们将控制权交还给事件循环。因此,据我所知,没有必要为任何事情使用线程。摘自版本 1.6 的示例:

async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_1',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_1', ws)

        await asyncio.gather(cp.start(), cp.send_boot_notification())

我想这应该可以帮助您入门。

编辑:

好的,上面的还是成立的。我回答了你的问题,但你真正需要的是了解 API 应该如何工作。我告诉你他们的例子有点令人困惑,我认为你不会绕过阅读他们的文档。但是我从代码中理解的要点是你需要 subclass 中央 class ChargePoint,从这个例子中并不清楚,因为他们命名了他们的 subclass 和他们的baseclass一样。我会尽量让他们的例子更清楚。我希望我理解正确...:[=​​14=]

# simplified and commented version of the v1.6 example
import asyncio
import logging
import websockets

from ocpp.routing import on
from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp # this is the baseclass renamed to cp
from ocpp.v16.enums import Action, RegistrationStatus

logging.basicConfig(level=logging.INFO)


class ChargePoint(cp): # inheriting from cp, now called ChargePoint (again)

    @on(Action.SomeMessage) # this decorator adds your function to a mapping of hooks for that message/event
    def on_some_message(*args, **kwargs):
        pass # do something which probably got something to do with charging something
        asyncio.create_task(self.some_coro()) # create async task from sync code

    # add more decorated functions to implement your logic

    async def some_coro(self):
        pass # do something with I/O

    async def send_boot_notification(self):
        request = call.BootNotificationPayload(
            charge_point_model="Optimus",
            charge_point_vendor="The Mobility House"
        )

        response = await self.call(request)

        if response.status == RegistrationStatus.accepted:
            print("Connected to central system.")


async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_1',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_1', ws) # going full circle, naming the instance the same as the rebound baseclass :-/

        # this seems initializing, maybe not do it concurrently
        await cp.send_boot_notification()

        # this starts the infinite loop which receives and relays
        # messages to their respective hooks
        # (you get the concurrency you wanted out of threads by registering
        # your own hooks (pieces of code)
        await cp.start() # main() stays here until you somehow shut it down

if __name__ == '__main__':
    asyncio.run(main())

所以显然我无法对此进行测试,也不能向你保证这就是他们的意图,但我希望它能有所帮助。