从多个线程更新全局字典

Updating a global dict from multiple threads

我有以下运行调度程序以定期更新全局变量 (dict) 状态的应用程序:

from sanic import Sanic
from sanic.response import text
from apscheduler.schedulers.background import BackgroundScheduler
import bumper

app = Sanic()
scheduler = BackgroundScheduler()

inventory = {1: 1, 2: 2}

@scheduler.scheduled_job('interval', seconds=5)
def bump():
    bumper.bump()


@scheduler.scheduled_job('interval', seconds=10)
def manual_bump():
    global inventory
    inventory[2] += 1


@app.route("/")
async def test(request):
    return text(inventory)

if __name__ == "__main__":

    scheduler.start()
    app.run(host="0.0.0.0", port=8000)

5秒间隔作业中导入的函数在同一目录下的不同文件中:

from app import inventory

def bump_inventory():
    inventory[1] += 1
    print('new', inventory)

然而,这并没有像我希望的那样起作用。导入的函数更新了清单,但更改永远不会传播到原始字典,因此 bump_inventory 正在处理 inventory 的副本,或者它永远不会在函数范围之外更新它。在两个不同的终端:

]$ python app.py
2017-02-19 14:11:45,643: INFO: Goin' Fast @ http://0.0.0.0:8000
2017-02-19 14:11:45,644: INFO: Starting worker [26053]
new {1: 2, 2: 2}
new {1: 3, 2: 2}

]$ while true; do curl http://0.0.0.0:8000/; echo; sleep 1; done
{1: 1, 2: 2}
...
{1: 1, 2: 3}
...

这样做的正确方法是什么?

想通了。仍然不确定为什么共享变量没有更新(我的猜测仍然是它的副本)但是将它作为参数传递给函数工作得很好(因为我们传递的是对对象的引用,而不是实际对象)。将 5 秒间隔修改为此有效:

@scheduler.scheduled_job('interval', seconds=5)
def bump():
    global inventory
    bumper.bump(inventory)

这也删除了另一个文件中的循环导入(即删除 from app import inventory)。

1- 无需将 apscheduler 与 asyncio 一起使用。您已将所需的所有功能内置到 asyncio 中,并且它与 Sanic 配合得很好。

2-不建议使用全局状态,尤其是在Web应用程序场景中。您应该使用数据库或 Redis。但是如果你出于某种原因需要应用程序状态,你可以将它直接存储在 app 对象上。

下一个版本的 Sanic 将有一个 add_task 方法供您将 asyncio 任务添加到您的应用程序。如果你现在想使用它,你可以从 Github 安装 master 分支:

import asyncio
from sanic import Sanic
from sanic.response import text

app = Sanic()
app.inventory = {1:1, 2:2}


async def five_second_job(app):
    while True:
        app.inventory[1] += 1
        await asyncio.sleep(5)


async def ten_second_job(app):
    while True:
        app.inventory[2] += 2
        await asyncio.sleep(10)


@app.route("/")
async def test(request):
    return text(app.inventory)

if __name__ == "__main__":
    app.add_task(five_second_job(app))
    app.add_task(ten_second_job(app))
    app.run(host="0.0.0.0", port=9000)