如何使用 asyncio 更新具有两个不同时间间隔的 Dataframe
How to update a Dataframe with two different time intervals with asyncio
我想做的是在两个不同长度的时间间隔内更新数据帧。
更新 1 以 0 - 2 秒的间隔更改 df 的最后一行,在 Signal
class、
中生成
更新 2 在 change_hist_data()
中生成并以 5 秒的间隔更改行 0 - 4。
我的代码的问题是在Signal
class.
中pulse
的速度在整个df上发生了数据变化的区间
我想要的是第 0 - 4 行以 5 秒的间隔更新,只有最后一行的速度更快。
我究竟做错了什么?
顺便说一句,如果有人愿意给我额外的小费 - sys.stdout.write("3[F")
运行 会长期顺利(数周或数月)吗?我注意到有时列名会来回移动
import pandas as pd
import time
import random
import numpy as np
import sys
import asyncio
class Signal:
def __init__(self):
pass
def pulse(self):
while True:
x = random.uniform(0,2)
y = random.uniform(1,9)
time.sleep(x)
self.pulse = y
return self.pulse
async def change_hist_data():
while True:
task1 = asyncio.create_task(print_df())
await asyncio.sleep(5)
data = np.random.rand(6,1)
return data
async def print_df():
while True:
task2 = asyncio.create_task(change_hist_data())
df = pd.DataFrame(await task2, columns={'data'})
p = Signal()
p = p.pulse()
df.data.iloc[-1] = p
print(df)
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
asyncio.run(print_df())
更新:
我根据 Peter Cornelius 的意见和我最初的想法提出了这个解决方案。 df
现在在两个时间间隔内打印。其实方法名random_delay
已经不合适了,我把它设置为0.2秒的固定间隔。
import random
import asyncio
import pandas as pd
import numpy as np
import sys
class DataFrameWrapper:
def __init__(self):
self.data = self.update_rows0_to_4()
self.df = pd.DataFrame(self.data, columns={'data'})
def update_row5(self):
self.pulse = random.uniform(0,2)
return self.pulse
def update_rows0_to_4(self):
self.data = np.random.rand(6,1)
return self.data
async def random_delay(df_wrapper):
while True:
df_wrapper.df.data.iloc[-1] = df_wrapper.update_row5()
print(df_wrapper.df)
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
await asyncio.sleep(0.2)
async def delay_5s(df_wrapper):
while True:
df_wrapper.df.data = df_wrapper.update_rows0_to_4()
print(df_wrapper.df)
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
await asyncio.sleep(5.0)
df_wrapper = DataFrameWrapper()
async def main():
await asyncio.gather(random_delay(df_wrapper),delay_5s(df_wrapper) )
asyncio.run(main())
您对问题的描述很好,但是您的程序结构不正确。你需要这样的东西:
import random
import asyncio
class DataFrameWrapper:
def __init__(self, df):
self.df = df
def update_row5(self):
print("Updating row 5")
def update_rows0_to_4(self):
print("Updating row 0-4")
async def random_delay(df_wrapper):
while True:
x = random.uniform(0.0, 2.0)
await asyncio.sleep(x)
df_wrapper.update_row5()
async def delay_5s(df_wrapper):
asyncio.create_task(random_delay(df_wrapper))
while True:
await asyncio.sleep(5.0)
df_wrapper.update_rows0_to_4()
# df is some dataframe
df_wrapper = DataFrameWrapper(df)
asyncio.run(delay_5s(df_wrapper))
这里的周期性延时函数是两个asyncio任务。它们定期执行函数调用返回到数据框对象,您可以在其中操作数据。我没有尝试在其中包含任何细节 - 我只是删除了那些函数。
在您的第一个 while 循环中,您调用了 time.sleep,它会阻止 asyncio 事件循环。所以这不是一个任务,因此它不会与协程合作 change_hist_data.
你的前两个 while 循环都不是真正的循环,因为你总是 return 在循环的末尾。
我想做的是在两个不同长度的时间间隔内更新数据帧。
更新 1 以 0 - 2 秒的间隔更改 df 的最后一行,在 Signal
class、
中生成
更新 2 在 change_hist_data()
中生成并以 5 秒的间隔更改行 0 - 4。
我的代码的问题是在Signal
class.
中pulse
的速度在整个df上发生了数据变化的区间
我想要的是第 0 - 4 行以 5 秒的间隔更新,只有最后一行的速度更快。
我究竟做错了什么?
顺便说一句,如果有人愿意给我额外的小费 - sys.stdout.write("3[F")
运行 会长期顺利(数周或数月)吗?我注意到有时列名会来回移动
import pandas as pd
import time
import random
import numpy as np
import sys
import asyncio
class Signal:
def __init__(self):
pass
def pulse(self):
while True:
x = random.uniform(0,2)
y = random.uniform(1,9)
time.sleep(x)
self.pulse = y
return self.pulse
async def change_hist_data():
while True:
task1 = asyncio.create_task(print_df())
await asyncio.sleep(5)
data = np.random.rand(6,1)
return data
async def print_df():
while True:
task2 = asyncio.create_task(change_hist_data())
df = pd.DataFrame(await task2, columns={'data'})
p = Signal()
p = p.pulse()
df.data.iloc[-1] = p
print(df)
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
asyncio.run(print_df())
更新:
我根据 Peter Cornelius 的意见和我最初的想法提出了这个解决方案。 df
现在在两个时间间隔内打印。其实方法名random_delay
已经不合适了,我把它设置为0.2秒的固定间隔。
import random
import asyncio
import pandas as pd
import numpy as np
import sys
class DataFrameWrapper:
def __init__(self):
self.data = self.update_rows0_to_4()
self.df = pd.DataFrame(self.data, columns={'data'})
def update_row5(self):
self.pulse = random.uniform(0,2)
return self.pulse
def update_rows0_to_4(self):
self.data = np.random.rand(6,1)
return self.data
async def random_delay(df_wrapper):
while True:
df_wrapper.df.data.iloc[-1] = df_wrapper.update_row5()
print(df_wrapper.df)
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
await asyncio.sleep(0.2)
async def delay_5s(df_wrapper):
while True:
df_wrapper.df.data = df_wrapper.update_rows0_to_4()
print(df_wrapper.df)
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
sys.stdout.write("3[F")
await asyncio.sleep(5.0)
df_wrapper = DataFrameWrapper()
async def main():
await asyncio.gather(random_delay(df_wrapper),delay_5s(df_wrapper) )
asyncio.run(main())
您对问题的描述很好,但是您的程序结构不正确。你需要这样的东西:
import random
import asyncio
class DataFrameWrapper:
def __init__(self, df):
self.df = df
def update_row5(self):
print("Updating row 5")
def update_rows0_to_4(self):
print("Updating row 0-4")
async def random_delay(df_wrapper):
while True:
x = random.uniform(0.0, 2.0)
await asyncio.sleep(x)
df_wrapper.update_row5()
async def delay_5s(df_wrapper):
asyncio.create_task(random_delay(df_wrapper))
while True:
await asyncio.sleep(5.0)
df_wrapper.update_rows0_to_4()
# df is some dataframe
df_wrapper = DataFrameWrapper(df)
asyncio.run(delay_5s(df_wrapper))
这里的周期性延时函数是两个asyncio任务。它们定期执行函数调用返回到数据框对象,您可以在其中操作数据。我没有尝试在其中包含任何细节 - 我只是删除了那些函数。
在您的第一个 while 循环中,您调用了 time.sleep,它会阻止 asyncio 事件循环。所以这不是一个任务,因此它不会与协程合作 change_hist_data.
你的前两个 while 循环都不是真正的循环,因为你总是 return 在循环的末尾。