asyncio.sleep() - 如何使用它?
asyncio.sleep() - how to use it?
谁能帮帮我,我正在尝试让程序在满足条件时暂停。但是现在,它根本就没有睡觉。而且我无法理解为什么。我对 asyncio
完全陌生
time.sleep() 也不起作用,所以我更愿意使用 asyncio。非常感谢!
from python_graphql_client import GraphqlClient
import asyncio
import os
import requests
loop = asyncio.get_event_loop()
def print_handle(data):
print(data["data"]["liveMeasurement"]["timestamp"]+" "+str(data["data"]["liveMeasurement"]["power"]))
tall = (data["data"]["liveMeasurement"]["power"])
if tall >= 1000:
print("OK")
# schedule async task from sync code
asyncio.create_task(send_push_notification(data))
print("msg sent")
asyncio.create_task(sleep())
client = GraphqlClient(endpoint="wss://api.tibber.com/v1-beta/gql/subscriptions")
query = """
subscription{
liveMeasurement(homeId:"fd73a8a6ca"){
timestamp
power
}
}
"""
query2 = """
mutation{
sendPushNotification(input: {
title: "Advarsel! Høyt forbruk",
message: "Du bruker 8kw eller mer",
screenToOpen: CONSUMPTION
}){
successful
pushedToNumberOfDevices
}
}
"""
async def sleep():
await asyncio.sleep(10)
async def send_push_notification(data):
#maybe update your query with the received data here
await client.execute_async(query=query2,headers={'Authorization': "2bTCaFx74"})
async def main():
await client.subscribe(query=query, headers={'Authorization': "2bTCaFxDiYdHlxBSt074"}, handle=print_handle)
asyncio.run(main())
我认为减少发送消息数量的最简单方法是定义一个最小时间间隔,在该时间间隔内,值仍超过阈值时不发送通知。
import time
last_notification_timestamp = 0
NOTIFICATION_INTERVAL = 5 * 60 # 5 min
def print_handle(data):
global last_notification_timestamp
print(
data["data"]["liveMeasurement"]["timestamp"]
+ " "
+ str(data["data"]["liveMeasurement"]["power"])
)
tall = data["data"]["liveMeasurement"]["power"]
current_time = time.time()
if (
tall >= 1000
and current_time - NOTIFICATION_INTERVAL > last_notification_timestamp
):
print("OK")
# schedule async task from sync code
asyncio.create_task(send_push_notification(data))
last_notification_timestamp = current_time
print("msg sent")
最后发送的消息的时间戳需要存储在某个地方,所以我们将在全局范围内定义一个变量来保存它,并使用 print_handle()
中的 global
关键字来能够从函数内部写入它。在函数中,我们将检查该值是否高于阈值以及在最后一条消息之后是否经过了足够的时间。这样您仍然可以保持订阅有效并限制您收到的通知数量。这很简单,但您可能很快就会想要扩展您想要对接收到的数据执行的操作范围。请记住,print_handle()
是同步回调,应尽可能短。
如果我没理解错的话,你想观察一些数据的广播,并对这些广播做出反应,保留暂停这些反应的权利。类似于:
async def monitor(read_broadcast):
while True:
data = await read_broadcast()
print(data["data"]["liveMeasurement"]["timestamp"]+" "+str(data["data"]["liveMeasurement"]["power"]))
tall = (data["data"]["liveMeasurement"]["power"])
if tall >= 1000:
print("OK")
await send_push_notification(data)
print("msg sent")
# sleep for a while before sending another one
await asyncio.sleep(10)
要实现read_broadcast
,我们可以使用“未来”:
# client, query, query2, send_push_notification defined as before
async def main():
broadcast_fut = None
def update_broadcast_fut(_fut=None):
nonlocal broadcast_fut
broadcast_fut = asyncio.get_event_loop().create_future()
broadcast_fut.add_done_callback(update_broadcast_fut)
update_broadcast_fut()
def read_broadcast():
return broadcast_fut
asyncio.create_task(monitor(read_broadcast))
await client.subscribe(
query=query, headers={'Authorization': "2bTCaFxDiYdHlxBSt074"},
handle=lambda data: broadcast_fut.set_result(data),
)
asyncio.run(main())
请注意,我还没有测试上面的代码,所以可能会有错别字。
谁能帮帮我,我正在尝试让程序在满足条件时暂停。但是现在,它根本就没有睡觉。而且我无法理解为什么。我对 asyncio
完全陌生time.sleep() 也不起作用,所以我更愿意使用 asyncio。非常感谢!
from python_graphql_client import GraphqlClient
import asyncio
import os
import requests
loop = asyncio.get_event_loop()
def print_handle(data):
print(data["data"]["liveMeasurement"]["timestamp"]+" "+str(data["data"]["liveMeasurement"]["power"]))
tall = (data["data"]["liveMeasurement"]["power"])
if tall >= 1000:
print("OK")
# schedule async task from sync code
asyncio.create_task(send_push_notification(data))
print("msg sent")
asyncio.create_task(sleep())
client = GraphqlClient(endpoint="wss://api.tibber.com/v1-beta/gql/subscriptions")
query = """
subscription{
liveMeasurement(homeId:"fd73a8a6ca"){
timestamp
power
}
}
"""
query2 = """
mutation{
sendPushNotification(input: {
title: "Advarsel! Høyt forbruk",
message: "Du bruker 8kw eller mer",
screenToOpen: CONSUMPTION
}){
successful
pushedToNumberOfDevices
}
}
"""
async def sleep():
await asyncio.sleep(10)
async def send_push_notification(data):
#maybe update your query with the received data here
await client.execute_async(query=query2,headers={'Authorization': "2bTCaFx74"})
async def main():
await client.subscribe(query=query, headers={'Authorization': "2bTCaFxDiYdHlxBSt074"}, handle=print_handle)
asyncio.run(main())
我认为减少发送消息数量的最简单方法是定义一个最小时间间隔,在该时间间隔内,值仍超过阈值时不发送通知。
import time
last_notification_timestamp = 0
NOTIFICATION_INTERVAL = 5 * 60 # 5 min
def print_handle(data):
global last_notification_timestamp
print(
data["data"]["liveMeasurement"]["timestamp"]
+ " "
+ str(data["data"]["liveMeasurement"]["power"])
)
tall = data["data"]["liveMeasurement"]["power"]
current_time = time.time()
if (
tall >= 1000
and current_time - NOTIFICATION_INTERVAL > last_notification_timestamp
):
print("OK")
# schedule async task from sync code
asyncio.create_task(send_push_notification(data))
last_notification_timestamp = current_time
print("msg sent")
最后发送的消息的时间戳需要存储在某个地方,所以我们将在全局范围内定义一个变量来保存它,并使用 print_handle()
中的 global
关键字来能够从函数内部写入它。在函数中,我们将检查该值是否高于阈值以及在最后一条消息之后是否经过了足够的时间。这样您仍然可以保持订阅有效并限制您收到的通知数量。这很简单,但您可能很快就会想要扩展您想要对接收到的数据执行的操作范围。请记住,print_handle()
是同步回调,应尽可能短。
如果我没理解错的话,你想观察一些数据的广播,并对这些广播做出反应,保留暂停这些反应的权利。类似于:
async def monitor(read_broadcast):
while True:
data = await read_broadcast()
print(data["data"]["liveMeasurement"]["timestamp"]+" "+str(data["data"]["liveMeasurement"]["power"]))
tall = (data["data"]["liveMeasurement"]["power"])
if tall >= 1000:
print("OK")
await send_push_notification(data)
print("msg sent")
# sleep for a while before sending another one
await asyncio.sleep(10)
要实现read_broadcast
,我们可以使用“未来”:
# client, query, query2, send_push_notification defined as before
async def main():
broadcast_fut = None
def update_broadcast_fut(_fut=None):
nonlocal broadcast_fut
broadcast_fut = asyncio.get_event_loop().create_future()
broadcast_fut.add_done_callback(update_broadcast_fut)
update_broadcast_fut()
def read_broadcast():
return broadcast_fut
asyncio.create_task(monitor(read_broadcast))
await client.subscribe(
query=query, headers={'Authorization': "2bTCaFxDiYdHlxBSt074"},
handle=lambda data: broadcast_fut.set_result(data),
)
asyncio.run(main())
请注意,我还没有测试上面的代码,所以可能会有错别字。