异步线程 Python OCPP
Asyncronous Threading Python OCPP
我正在尝试在 python 中实现 ocpp 库。有两个函数,运行 在 while 循环中连续,cp.start() 用于日志记录,cp.heartbeat 作为协议实习生心跳。当我想在我的例程中正常实现它们时,while 循环将阻塞事件循环,所以我希望它们作为线程。但是图书馆似乎有问题。
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_3',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_3', ws)
def start_logging(loop):
asyncio.set_event_loop(loop)
loop.create_task(cp.start())
loop.run_forever()
loop = asyncio.get_event_loop()
t = threading.Thread(target=start_logging, args=(loop,))
t.start()
await asyncio.gather(cp.send_heartbeat())
if __name__ == '__main__':
asyncio.run(main())
错误:
ConnectionResetError: [WinError 995] Der E/A-Vorgang wurde wegen eines Threadendes oder einer Anwendungsanforderung abgebrochen
AssertionError
ERROR:asyncio:Error on reading from the event loop self pipe
loop: <ProactorEventLoop running=True closed=False debug=False>
AssertionError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-5' coro=<ChargePoint.start() done, defined at C:\Users\sasko\AppData\Local\Programs\Python\Python39\lib\site-packages\ocpp\charge_point.py:121> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>
即使我将线程设置为守护进程,心跳也会起作用,但我无法再关闭程序。
最终的目标是让cp.start()和心跳运行在一个线程中,这样我就可以在另一个逻辑中控制电动车的充电过程。
查看github上的代码库,所有你想调用的函数都是协程。它们可能包含无限循环,但它们中有 await
语句,这使得它们将控制权交还给事件循环。因此,据我所知,没有必要为任何事情使用线程。摘自版本 1.6 的示例:
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_1', ws)
await asyncio.gather(cp.start(), cp.send_boot_notification())
我想这应该可以帮助您入门。
编辑:
好的,上面的还是成立的。我回答了你的问题,但你真正需要的是了解 API 应该如何工作。我告诉你他们的例子有点令人困惑,我认为你不会绕过阅读他们的文档。但是我从代码中理解的要点是你需要 subclass 中央 class ChargePoint
,从这个例子中并不清楚,因为他们命名了他们的 subclass 和他们的baseclass一样。我会尽量让他们的例子更清楚。我希望我理解正确...:[=14=]
# simplified and commented version of the v1.6 example
import asyncio
import logging
import websockets
from ocpp.routing import on
from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp # this is the baseclass renamed to cp
from ocpp.v16.enums import Action, RegistrationStatus
logging.basicConfig(level=logging.INFO)
class ChargePoint(cp): # inheriting from cp, now called ChargePoint (again)
@on(Action.SomeMessage) # this decorator adds your function to a mapping of hooks for that message/event
def on_some_message(*args, **kwargs):
pass # do something which probably got something to do with charging something
asyncio.create_task(self.some_coro()) # create async task from sync code
# add more decorated functions to implement your logic
async def some_coro(self):
pass # do something with I/O
async def send_boot_notification(self):
request = call.BootNotificationPayload(
charge_point_model="Optimus",
charge_point_vendor="The Mobility House"
)
response = await self.call(request)
if response.status == RegistrationStatus.accepted:
print("Connected to central system.")
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_1', ws) # going full circle, naming the instance the same as the rebound baseclass :-/
# this seems initializing, maybe not do it concurrently
await cp.send_boot_notification()
# this starts the infinite loop which receives and relays
# messages to their respective hooks
# (you get the concurrency you wanted out of threads by registering
# your own hooks (pieces of code)
await cp.start() # main() stays here until you somehow shut it down
if __name__ == '__main__':
asyncio.run(main())
所以显然我无法对此进行测试,也不能向你保证这就是他们的意图,但我希望它能有所帮助。
我正在尝试在 python 中实现 ocpp 库。有两个函数,运行 在 while 循环中连续,cp.start() 用于日志记录,cp.heartbeat 作为协议实习生心跳。当我想在我的例程中正常实现它们时,while 循环将阻塞事件循环,所以我希望它们作为线程。但是图书馆似乎有问题。
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_3',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_3', ws)
def start_logging(loop):
asyncio.set_event_loop(loop)
loop.create_task(cp.start())
loop.run_forever()
loop = asyncio.get_event_loop()
t = threading.Thread(target=start_logging, args=(loop,))
t.start()
await asyncio.gather(cp.send_heartbeat())
if __name__ == '__main__':
asyncio.run(main())
错误:
ConnectionResetError: [WinError 995] Der E/A-Vorgang wurde wegen eines Threadendes oder einer Anwendungsanforderung abgebrochen
AssertionError
ERROR:asyncio:Error on reading from the event loop self pipe
loop: <ProactorEventLoop running=True closed=False debug=False>
AssertionError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-5' coro=<ChargePoint.start() done, defined at C:\Users\sasko\AppData\Local\Programs\Python\Python39\lib\site-packages\ocpp\charge_point.py:121> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>
即使我将线程设置为守护进程,心跳也会起作用,但我无法再关闭程序。 最终的目标是让cp.start()和心跳运行在一个线程中,这样我就可以在另一个逻辑中控制电动车的充电过程。
查看github上的代码库,所有你想调用的函数都是协程。它们可能包含无限循环,但它们中有 await
语句,这使得它们将控制权交还给事件循环。因此,据我所知,没有必要为任何事情使用线程。摘自版本 1.6 的示例:
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_1', ws)
await asyncio.gather(cp.start(), cp.send_boot_notification())
我想这应该可以帮助您入门。
编辑:
好的,上面的还是成立的。我回答了你的问题,但你真正需要的是了解 API 应该如何工作。我告诉你他们的例子有点令人困惑,我认为你不会绕过阅读他们的文档。但是我从代码中理解的要点是你需要 subclass 中央 class ChargePoint
,从这个例子中并不清楚,因为他们命名了他们的 subclass 和他们的baseclass一样。我会尽量让他们的例子更清楚。我希望我理解正确...:[=14=]
# simplified and commented version of the v1.6 example
import asyncio
import logging
import websockets
from ocpp.routing import on
from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp # this is the baseclass renamed to cp
from ocpp.v16.enums import Action, RegistrationStatus
logging.basicConfig(level=logging.INFO)
class ChargePoint(cp): # inheriting from cp, now called ChargePoint (again)
@on(Action.SomeMessage) # this decorator adds your function to a mapping of hooks for that message/event
def on_some_message(*args, **kwargs):
pass # do something which probably got something to do with charging something
asyncio.create_task(self.some_coro()) # create async task from sync code
# add more decorated functions to implement your logic
async def some_coro(self):
pass # do something with I/O
async def send_boot_notification(self):
request = call.BootNotificationPayload(
charge_point_model="Optimus",
charge_point_vendor="The Mobility House"
)
response = await self.call(request)
if response.status == RegistrationStatus.accepted:
print("Connected to central system.")
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_1', ws) # going full circle, naming the instance the same as the rebound baseclass :-/
# this seems initializing, maybe not do it concurrently
await cp.send_boot_notification()
# this starts the infinite loop which receives and relays
# messages to their respective hooks
# (you get the concurrency you wanted out of threads by registering
# your own hooks (pieces of code)
await cp.start() # main() stays here until you somehow shut it down
if __name__ == '__main__':
asyncio.run(main())
所以显然我无法对此进行测试,也不能向你保证这就是他们的意图,但我希望它能有所帮助。