使用调度程序,其中时间存储在列表中,功能输入也存储在列表中

Using scheduler where time is stored in list and input for function is also stored in a list

我有以下两个列表:

marketId_list = ['1.170349308', '1.170349312', '1.170349315']

execution_time_list = ['16:12', '16:13', '16:14']

我想在 execution_time_list 中指定的时间将 marketId_list 中的值传递给函数。

对于这个问题,我指定了函数,例如:

def test(market_identification):
    print(market_identification)

所以在16:12会打印1.170349308,在16:13会打印1.170349312,在16:14会打印1.170349315 .

为此,我编写了以下代码:

import schedule
import time

for time_of_execution in execution_time_list:

    for market_identification in marketId_list: 

        schedule.every().tuesday.at(time_of_execution).do(test)
        while True:

            schedule.run_pending()
            time.sleep(1)

虽然这给了我以下错误:

TypeError: test() missing 1 required positional argument: 'market_identification'

您需要提供缺少的参数。一种方法是使用 closure,由嵌套函数实现。

在这里,调用 test_wrapper(market_identification) 创建了一个新函数,其中已经包含了所需的参数。调度框架随后将调用此生成的函数。


def test(market_identification):
    print(f"test {market_identification}")


def test_wrapper(mi):
    def inner():
        return test(mi)
    return inner


for time_of_execution, market_identification in zip(execution_time_list, marketId_list):
    schedule.every().tuesday.at(time_of_execution).do(test_wrapper(market_identification))
    print(f"scheduled {market_identification} at {time_of_execution}")


while True:
    schedule.run_pending()
    time.sleep(1)

简单演示一下,直接调用生成的函数即可: test_wrapper("hello")() 结果输出

test hello

因为这种嵌套函数的用例非常常见,所以使用 functools.partial 方法 standard library 为您提供了一个小帮手:

from functools import partial

for time_of_execution, market_identification in zip(execution_time_list, marketId_list):
    schedule.every().tuesday.at(time_of_execution).do(partial(test, market_identification))
    print(f"scheduled {market_identification} at {time_of_execution}")

请注意,while 循环不得位于内部循环中。它必须在所有调度器都设置好后执行。我假设缩进在原始代码中被弄乱了。已在我上面的回答中修复。

更新: 我首先忽略了您解释每次只能使用一个市场 ID 的部分。在这种情况下,您不需要两个嵌套循环,而是一个 zipexecution_time_listmarketId_list 的循环。我相应地更新了我的答案。