使用 Pandas 和 APScheduler 每小时在后台处理 CSV

Process CSV in the background hourly with Pandas and APScheduler

我有一个 CSV 文件 (ZN_15M),我试图每小时使用 read_csv 函数。所以我安装了 APScheduler 并尝试使用它每小时读取一次 CSV 文件(还有一些其他未显示的内容,但如果我可以让 read_csv 内容运行,其他内容也将起作用):

import sys
from time import sleep
from apscheduler.schedulers.background import BackgroundScheduler


scheduler = BackgroundScheduler()
scheduler.start() 

def Run():
    f2 = open('C:\Users\cost9\OneDrive\Documents\PYTHON\Exported_Data\ZN_ES\ZN_15M.csv')
    ZN = pd.read_csv(f2)
    #Do stuff to the CSV File/DataFrame
    ZN.tocsv(path_or_buf = 'path')

def main():
    job = scheduler.add_interval_job(Run, minutes=60, args=())
    while True:
        sleep(60)
        sys.stdout.write('.'); sys.stdout.flush()

当我手动 运行 脚本时,我没有收到任何错误,但没有像我希望的那样每小时 运行ning 一次。不确定我在这里做错了什么...

更新:我收到以下错误:

def process_csv(path_to_csv):
    ZN_ES_comb = pd.read_csv(path_to_csv)
    # Insert your CSV processing here
    ZN_ES_comb = pd.DataFrame(ZN_ES_comb)
    ZN_ES_comb.to_csv(path_to_csv.replace('.csv', '_modified_{timestamp}.csv').format(
        timestamp=time.strftime("%Y%m%d-%H%M%S")), index=False)

if __name__ == '__main__':
    # Create CSV for demonstrating purposes
    path_to_csv = 'C:\Users\cost9\OneDrive\Documents\PYTHON\Daily Tasks\ZN_ES\ZN_ES_15M\CSV\ZN_ES_comb.csv'
    pd.DataFrame(ZN_ES_comb).to_csv(path_to_csv, index=False)
    # Start scheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    scheduler.add_job(func=process_csv,
                      args=[path_to_csv],
                      trigger=IntervalTrigger(seconds=2))
    # Wait for 7 seconds so that scheduler can call process_csv 3 times
    time.sleep(7)

错误是针对行 pd.DataFrame(ZN_ES_comb).to_csv(path_to_csv, index=False) - 它说:

NameError: name 'ZN_ES_comb' is not defined

您的代码中有两个问题:

  1. 应该是ZN.to_csv()而不是def Run()中的ZN.tocsv()
  2. time.sleep() 的参数值以秒为单位,而不是您显然认为的以分钟为单位。因此,在睡觉期间 Run() 根本不是 运行。

下面有一个适用于 Python 3.5 和 APScheduler 3.3.1 的工作解决方案。 IntervalTrigger() 也有 hours 参数,您可能想使用它来代替 seconds

import time

import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger


def process_csv(path_to_csv):
    df = pd.read_csv(path_to_csv)
    # Insert your CSV processing here
    df.to_csv(path_to_csv.replace('.csv', '_modified_{timestamp}.csv').format(
        timestamp=time.strftime("%Y%m%d-%H%M%S")), index=False)

if __name__ == '__main__':
    # Create CSV for demonstrating purposes
    path_to_csv = 'made_up.csv'
    pd.DataFrame({'fruit': ['apple', 'banana'],
                  'number': [1, 2]}).to_csv(path_to_csv, index=False)
    # Start scheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    scheduler.add_job(func=process_csv,
                      args=[path_to_csv],
                      trigger=IntervalTrigger(seconds=2))
    # Wait for 7 seconds so that scheduler can call process_csv 3 times
    time.sleep(7)