从 APScheduler 收集期货
Collecting futures from APScheduler
我有以下代码:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
class FutureScheduler(object):
def __init__():
self.futures = []
self.scheduler = BlockingScheduler()
self.pool = ThreadPoolExecutor(5)
self.full_frame = pd.DataFrame()
def start(self):
job = self.scheduler.add_job(self.add_future, 'cron', day_of_week='mon-fri', hour='8-15', minute='*')
self.scheduler.start()
self.flush_csvs()
def add_future(self):
self.futures.append(self.pool.submit(self.long_running_task))
def flush_csvs(self):
for future in as_completed(self.futures):
results = future.result()
self.full_frame = pd.concat((self.full_frame, results))
self.full_frame.to_csv('results.csv')
print "flushed... Queue size: %s" % len(self.futures)
def long_running_task(self):
#takes a while may or may not return before the next one is kicked off
所以我遇到的问题是 flush_csvs
循环中的代码永远不会是 运行。在调用 as_completed
之前,我是否必须将所有期货添加到列表中?有没有办法让 BlockingScheduler
return 有未来?我看到它 returns a Job
但在这种情况下,我希望它更像未来。
这不起作用,因为调度程序阻止主线程继续。这可以防止 flush_csvs 被执行。
self.scheduler.start()
self.flush_csvs()
但是,这可能不是您想要的。 APScheduler 在内部使用线程池,因此回调 (self.long_running_task) 已经在单独的线程中执行。
你可以通过APScheduler改变这个线程池的配置,这取决于你需要的worker数量,如果你需要多个核心(使用ProcessPoolExecutor而不是ThreadPoolExecutor)等。你也可以配置每个作业做你想做的事。例如,为每分钟 运行 合并一次(仅 运行 一个)的作业配置策略,而不是 运行 背靠背多次合并以防出现延迟。
我有以下代码:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
class FutureScheduler(object):
def __init__():
self.futures = []
self.scheduler = BlockingScheduler()
self.pool = ThreadPoolExecutor(5)
self.full_frame = pd.DataFrame()
def start(self):
job = self.scheduler.add_job(self.add_future, 'cron', day_of_week='mon-fri', hour='8-15', minute='*')
self.scheduler.start()
self.flush_csvs()
def add_future(self):
self.futures.append(self.pool.submit(self.long_running_task))
def flush_csvs(self):
for future in as_completed(self.futures):
results = future.result()
self.full_frame = pd.concat((self.full_frame, results))
self.full_frame.to_csv('results.csv')
print "flushed... Queue size: %s" % len(self.futures)
def long_running_task(self):
#takes a while may or may not return before the next one is kicked off
所以我遇到的问题是 flush_csvs
循环中的代码永远不会是 运行。在调用 as_completed
之前,我是否必须将所有期货添加到列表中?有没有办法让 BlockingScheduler
return 有未来?我看到它 returns a Job
但在这种情况下,我希望它更像未来。
这不起作用,因为调度程序阻止主线程继续。这可以防止 flush_csvs 被执行。
self.scheduler.start()
self.flush_csvs()
但是,这可能不是您想要的。 APScheduler 在内部使用线程池,因此回调 (self.long_running_task) 已经在单独的线程中执行。
你可以通过APScheduler改变这个线程池的配置,这取决于你需要的worker数量,如果你需要多个核心(使用ProcessPoolExecutor而不是ThreadPoolExecutor)等。你也可以配置每个作业做你想做的事。例如,为每分钟 运行 合并一次(仅 运行 一个)的作业配置策略,而不是 运行 背靠背多次合并以防出现延迟。