Обновление глобального дикта из нескольких потоков

У меня есть следующее приложение, которое запускает планировщик для периодического обновления состояния глобальной переменной (dict):

from sanic import Sanic
from sanic.response import text
from apscheduler.schedulers.background import BackgroundScheduler
import bumper

app = Sanic()
scheduler = BackgroundScheduler()

inventory = {1: 1, 2: 2}

@scheduler.scheduled_job('interval', seconds=5)
def bump():
    bumper.bump()


@scheduler.scheduled_job('interval', seconds=10)
def manual_bump():
    global inventory
    inventory[2] += 1


@app.route("/")
async def test(request):
    return text(inventory)

if __name__ == "__main__":

    scheduler.start()
    app.run(host="0.0.0.0", port=8000)

Функция, импортированная в 5-секундном интервальном задании, находится в другом файле в том же каталоге:

from app import inventory

def bump_inventory():
    inventory[1] += 1
    print('new', inventory)

Это, однако, не работает, как я надеялся. Импортированная функция обновляет инвентарь, но изменение никогда не распространяется на исходный словарь, поэтому либо bump_inventory работает над копией inventory или это никогда не обновляет это вне области функции. В двух разных терминалах:

]$ python app.py
2017-02-19 14:11:45,643: INFO: Goin' Fast @ http://0.0.0.0:8000
2017-02-19 14:11:45,644: INFO: Starting worker [26053]
new {1: 2, 2: 2}
new {1: 3, 2: 2}

]$ while true; do curl http://0.0.0.0:8000/; echo; sleep 1; done
{1: 1, 2: 2}
...
{1: 1, 2: 3}
...

Как правильно это сделать?

2 ответа

Решение

Догадаться. До сих пор не уверен, почему переменная общего доступа не обновляется (я думаю, что она все еще является копией), но передача ее в функцию в качестве аргумента работает просто отлично (поскольку мы передаем ссылки на объект, а не на реальный объект). Изменение 5-секундного интервала для этого работает:

@scheduler.scheduled_job('interval', seconds=5)
def bump():
    global inventory
    bumper.bump(inventory)

Это также удаляет циклический импорт (т.е. удаляет from app import inventory) в другом файле.

1- Там нет необходимости использовать apscheduler с asyncio. У вас есть все необходимые удобства, встроенные в asyncio, и он хорошо сочетается с Sanic.

2- Использование глобального состояния не рекомендуется, особенно в сценарии веб-приложения. Вы должны использовать базу данных или Redis. Но если вам по какой-то причине нужно состояние приложения, вы можете сохранить его прямо на app объект.

Следующий релиз Sanic будет иметь add_task способ добавить асинхронные задачи в ваше приложение. Вы можете установить главную ветку от Github, если хотите использовать это сейчас:

import asyncio
from sanic import Sanic
from sanic.response import text

app = Sanic()
app.inventory = {1:1, 2:2}


async def five_second_job(app):
    while True:
        app.inventory[1] += 1
        await asyncio.sleep(5)


async def ten_second_job(app):
    while True:
        app.inventory[2] += 2
        await asyncio.sleep(10)


@app.route("/")
async def test(request):
    return text(app.inventory)

if __name__ == "__main__":
    app.add_task(five_second_job(app))
    app.add_task(ten_second_job(app))
    app.run(host="0.0.0.0", port=9000)
Другие вопросы по тегам