Python 3 - AsyncIO/APScheduler 的全局变量

Python 3 - Global Variables with AsyncIO/APScheduler

为此苦苦挣扎了一段时间。

基于此线程:Using global variables in a function other than the one that created them

我应该能够通过在特定时间安排任务来更新 thread_2 使用的变量。

代码:

import asyncio
from concurrent.futures import ProcessPoolExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
import time


def day_limits():

        global variable
        variable = 90
        print ('Day Variable: ',variable)

def night_limits():

        global variable
        variable = 65
        print ('Night Variable: ',variable)


def thread_2():


    while True:

        c_hour = int(datetime.now().strftime("%H"))
        c_min = int(datetime.now().strftime("%M"))
        c_sec = int(datetime.now().strftime("%S"))

        print ('%02d:%02d:%02d - Variable: %d ' % (c_hour,c_min,c_sec,variable))

        time.sleep(2)


if __name__ == "__main__":

    variable = 60

    scheduler = AsyncIOScheduler()
    scheduler.add_job(day_limits, 'cron', hour=7,misfire_grace_time=3600,timezone='GB')
    scheduler.add_job(night_limits, 'cron', hour=19, minute=32,misfire_grace_time=3600,timezone='GB')
    scheduler.start()

    scheduler.print_jobs()

    executor = ProcessPoolExecutor(1)
    loop = asyncio.get_event_loop()
    baa = asyncio.async(loop.run_in_executor(executor, thread_2))


    try:
        loop.run_forever()

    except (KeyboardInterrupt, Exception):
        loop.stop()
        scheduler.shutdown()

结果:

19:31:54 - Variable: 60 
19:31:56 - Variable: 60 
19:31:58 - Variable: 60    
Night Variable:  65
19:32:00 - Variable: 60 
19:32:02 - Variable: 60 

我错过了什么,但我看不到什么!

想法?

谢谢!!!

问题很简单:您正在使用 ProcessPoolExecutor。所以作业在另一个进程中 运行,它有自己的内存 space。 variable 在那里设置正确,但它在父进程中有一个不同的值 (60),永远不会改变。

因为您使用的是 ProcessPoolExecutor,所以您需要使用进程安全对象来代替普通整数。如果你只需要支持 Linux(因此可以依赖 fork()),你可以只使用一个普通的全局 multiprocessing.Value 来做到这一点。

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
import time


def day_limits():
        variable.value = 90
        print ('Day Variable: ',variable.value)

def night_limits():
        variable.value = 65
        print ('Night Variable: ',variable.value)


def thread_2():
    while True:
        c_hour = int(datetime.now().strftime("%H"))
        c_min = int(datetime.now().strftime("%M"))
        c_sec = int(datetime.now().strftime("%S"))

        print ('%02d:%02d:%02d - Variable: %d ' % (c_hour,c_min,c_sec,variable.value))

        time.sleep(2)


if __name__ == "__main__":
    variable = multiprocessing.Value('i', 60)

    scheduler = AsyncIOScheduler()
    scheduler.add_job(day_limits, 'cron', hour=7,misfire_grace_time=3600,timezone='GB')
    scheduler.add_job(night_limits, 'cron', hour=19, minute=32,misfire_grace_time=3600,timezone='GB')
    scheduler.start()

    scheduler.print_jobs()

    executor = ProcessPoolExecutor(1)
    loop = asyncio.get_event_loop()
    baa = asyncio.async(loop.run_in_executor(executor, thread_2))


    try:
        loop.run_forever()

    except (KeyboardInterrupt, Exception):
        loop.stop()
        scheduler.shutdown()

如果您需要同时支持 Windows 和 Linux,您需要使用 multiprocessing.Manager to create the Value 对象,并将该对象显式传递给您要 运行 在 Executor:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
import time


def day_limits():
        variable.value = 90
        print ('Day Variable: ',variable.value)

def night_limits():
        variable.value = 65
        print ('Night Variable: ',variable.value)


def thread_2(variable):
    while True:
        c_hour = int(datetime.now().strftime("%H"))
        c_min = int(datetime.now().strftime("%M"))
        c_sec = int(datetime.now().strftime("%S"))

        print ('%02d:%02d:%02d - Variable: %d ' % (c_hour,c_min,c_sec,variable.value))

        time.sleep(2)


if __name__ == "__main__":

    m = multiprocessing.Manager()
    variable = m.Value('i', 60)

    scheduler = AsyncIOScheduler()
    scheduler.add_job(day_limits, 'cron', hour=7,misfire_grace_time=3600,timezone='GB')
    scheduler.add_job(night_limits, 'cron', hour=19, minute=32,misfire_grace_time=3600,timezone='GB')
    scheduler.start()

    scheduler.print_jobs()

    executor = ProcessPoolExecutor(1)
    loop = asyncio.get_event_loop()
    baa = asyncio.async(loop.run_in_executor(executor, thread_2, variable))  # Need to pass variable explicitly

    try:
        loop.run_forever()

    except (KeyboardInterrupt, Exception):
        loop.stop()
        scheduler.shutdown()

因为 Windows 缺少 fork 支持,您需要明确地将 Value 传递给您在 Executor 中 运行 的函数。如果不这样做,子进程会说该变量不存在。但是,由于您明确地将 Value 传递给 run_in_executor 方法,因此您不能使用普通的 multiprocessing.Value - 您会得到一个 RuntimeError 表示 "Synchronized objects should only be shared between processes through inheritance".

使用 multiprocessing.Manager 可以解决这个问题; multiprocessing.Manager 启动一个可以创建和管理进程共享对象的进程。将 m.Value() returns a Proxy 调用到共享 Value,并且 Proxy 可以传递给 run_in_executor 而不会引发异常。