如何使用 asyncio 更新具有两个不同时间间隔的 Dataframe

How to update a Dataframe with two different time intervals with asyncio

我想做的是在两个不同长度的时间间隔内更新数据帧。
更新 1 以 0 - 2 秒的间隔更改 df 的最后一行,在 Signal class、
中生成 更新 2 在 change_hist_data() 中生成并以 5 秒的间隔更改行 0 - 4。
我的代码的问题是在Signalclass.
pulse的速度在整个df上发生了数据变化的区间 我想要的是第 0 - 4 行以 5 秒的间隔更新,只有最后一行的速度更快。 我究竟做错了什么? 顺便说一句,如果有人愿意给我额外的小费 - sys.stdout.write("3[F") 运行 会长期顺利(数周或数月)吗?我注意到有时列名会来回移动

import pandas as pd
import time
import random
import numpy as np
import sys
import asyncio



class Signal:

    def __init__(self):
        pass

    def pulse(self):
        
        while True:
            x = random.uniform(0,2)
            y = random.uniform(1,9)
            time.sleep(x)
            self.pulse = y
            return self.pulse

async def change_hist_data():
    
    while True:
        
        task1 = asyncio.create_task(print_df())
        await asyncio.sleep(5)
        data = np.random.rand(6,1)
        
        return data

async def print_df():
   
    while True:

        task2 = asyncio.create_task(change_hist_data())
        df = pd.DataFrame(await task2, columns={'data'})
        p = Signal()
        p = p.pulse()
        df.data.iloc[-1] = p
        print(df)
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        


asyncio.run(print_df())

更新:

我根据 Peter Cornelius 的意见和我最初的想法提出了这个解决方案。 df 现在在两个时间间隔内打印。其实方法名random_delay已经不合适了,我把它设置为0.2秒的固定间隔。

import random
import asyncio
import pandas as pd
import numpy as np
import sys


class DataFrameWrapper:
    
    def __init__(self):
        
        self.data = self.update_rows0_to_4() 
        self.df = pd.DataFrame(self.data, columns={'data'})

    def update_row5(self):
        
        self.pulse = random.uniform(0,2) 
        return self.pulse
        
    def update_rows0_to_4(self):
        self.data = np.random.rand(6,1)
        return self.data
        
async def random_delay(df_wrapper):
    
    while True:

        df_wrapper.df.data.iloc[-1] = df_wrapper.update_row5()
        print(df_wrapper.df)
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        await asyncio.sleep(0.2)
       
async def delay_5s(df_wrapper):

    while True:

        df_wrapper.df.data = df_wrapper.update_rows0_to_4()
        print(df_wrapper.df)
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        sys.stdout.write("3[F")
        
        await asyncio.sleep(5.0)
    

df_wrapper = DataFrameWrapper()

async def main():
    await asyncio.gather(random_delay(df_wrapper),delay_5s(df_wrapper) )

asyncio.run(main())

您对问题的描述很好,但是您的程序结构不正确。你需要这样的东西:

import random
import asyncio

class DataFrameWrapper:
    def __init__(self, df):
        self.df = df

    def update_row5(self):
        print("Updating row 5")
        
    def update_rows0_to_4(self):
        print("Updating row 0-4")
        
async def random_delay(df_wrapper):
    while True:
        x = random.uniform(0.0, 2.0)
        await asyncio.sleep(x)
        df_wrapper.update_row5()
        
async def delay_5s(df_wrapper):
    asyncio.create_task(random_delay(df_wrapper))
    while True:
        await asyncio.sleep(5.0)
        df_wrapper.update_rows0_to_4()
        
# df is some dataframe
df_wrapper = DataFrameWrapper(df)
asyncio.run(delay_5s(df_wrapper))
    

这里的周期性延时函数是两个asyncio任务。它们定期执行函数调用返回到数据框对象,您可以在其中操作数据。我没有尝试在其中包含任何细节 - 我只是删除了那些函数。

在您的第一个 while 循环中,您调用了 time.sleep,它会阻止 asyncio 事件循环。所以这不是一个任务,因此它不会与协程合作 change_hist_data.

你的前两个 while 循环都不是真正的循环,因为你总是 return 在循环的末尾。