使用调度程序,其中时间存储在列表中,功能输入也存储在列表中
Using scheduler where time is stored in list and input for function is also stored in a list
我有以下两个列表:
marketId_list = ['1.170349308', '1.170349312', '1.170349315']
execution_time_list = ['16:12', '16:13', '16:14']
我想在 execution_time_list
中指定的时间将 marketId_list
中的值传递给函数。
对于这个问题,我指定了函数,例如:
def test(market_identification):
print(market_identification)
所以在16:12
会打印1.170349308
,在16:13
会打印1.170349312
,在16:14
会打印1.170349315
.
为此,我编写了以下代码:
import schedule
import time
for time_of_execution in execution_time_list:
for market_identification in marketId_list:
schedule.every().tuesday.at(time_of_execution).do(test)
while True:
schedule.run_pending()
time.sleep(1)
虽然这给了我以下错误:
TypeError: test() missing 1 required positional argument: 'market_identification'
您需要提供缺少的参数。一种方法是使用 closure,由嵌套函数实现。
在这里,调用 test_wrapper(market_identification)
创建了一个新函数,其中已经包含了所需的参数。调度框架随后将调用此生成的函数。
def test(market_identification):
print(f"test {market_identification}")
def test_wrapper(mi):
def inner():
return test(mi)
return inner
for time_of_execution, market_identification in zip(execution_time_list, marketId_list):
schedule.every().tuesday.at(time_of_execution).do(test_wrapper(market_identification))
print(f"scheduled {market_identification} at {time_of_execution}")
while True:
schedule.run_pending()
time.sleep(1)
简单演示一下,直接调用生成的函数即可:
test_wrapper("hello")()
结果输出
test hello
因为这种嵌套函数的用例非常常见,所以使用 functools.partial
方法 standard library 为您提供了一个小帮手:
from functools import partial
for time_of_execution, market_identification in zip(execution_time_list, marketId_list):
schedule.every().tuesday.at(time_of_execution).do(partial(test, market_identification))
print(f"scheduled {market_identification} at {time_of_execution}")
请注意,while
循环不得位于内部循环中。它必须在所有调度器都设置好后执行。我假设缩进在原始代码中被弄乱了。已在我上面的回答中修复。
更新:
我首先忽略了您解释每次只能使用一个市场 ID 的部分。在这种情况下,您不需要两个嵌套循环,而是一个 zip
是 execution_time_list
和 marketId_list
的循环。我相应地更新了我的答案。
我有以下两个列表:
marketId_list = ['1.170349308', '1.170349312', '1.170349315']
execution_time_list = ['16:12', '16:13', '16:14']
我想在 execution_time_list
中指定的时间将 marketId_list
中的值传递给函数。
对于这个问题,我指定了函数,例如:
def test(market_identification):
print(market_identification)
所以在16:12
会打印1.170349308
,在16:13
会打印1.170349312
,在16:14
会打印1.170349315
.
为此,我编写了以下代码:
import schedule
import time
for time_of_execution in execution_time_list:
for market_identification in marketId_list:
schedule.every().tuesday.at(time_of_execution).do(test)
while True:
schedule.run_pending()
time.sleep(1)
虽然这给了我以下错误:
TypeError: test() missing 1 required positional argument: 'market_identification'
您需要提供缺少的参数。一种方法是使用 closure,由嵌套函数实现。
在这里,调用 test_wrapper(market_identification)
创建了一个新函数,其中已经包含了所需的参数。调度框架随后将调用此生成的函数。
def test(market_identification):
print(f"test {market_identification}")
def test_wrapper(mi):
def inner():
return test(mi)
return inner
for time_of_execution, market_identification in zip(execution_time_list, marketId_list):
schedule.every().tuesday.at(time_of_execution).do(test_wrapper(market_identification))
print(f"scheduled {market_identification} at {time_of_execution}")
while True:
schedule.run_pending()
time.sleep(1)
简单演示一下,直接调用生成的函数即可:
test_wrapper("hello")()
结果输出
test hello
因为这种嵌套函数的用例非常常见,所以使用 functools.partial
方法 standard library 为您提供了一个小帮手:
from functools import partial
for time_of_execution, market_identification in zip(execution_time_list, marketId_list):
schedule.every().tuesday.at(time_of_execution).do(partial(test, market_identification))
print(f"scheduled {market_identification} at {time_of_execution}")
请注意,while
循环不得位于内部循环中。它必须在所有调度器都设置好后执行。我假设缩进在原始代码中被弄乱了。已在我上面的回答中修复。
更新:
我首先忽略了您解释每次只能使用一个市场 ID 的部分。在这种情况下,您不需要两个嵌套循环,而是一个 zip
是 execution_time_list
和 marketId_list
的循环。我相应地更新了我的答案。