Обновление глобального дикта из нескольких потоков
У меня есть следующее приложение, которое запускает планировщик для периодического обновления состояния глобальной переменной (dict):
from sanic import Sanic
from sanic.response import text
from apscheduler.schedulers.background import BackgroundScheduler
import bumper
app = Sanic()
scheduler = BackgroundScheduler()
inventory = {1: 1, 2: 2}
@scheduler.scheduled_job('interval', seconds=5)
def bump():
bumper.bump()
@scheduler.scheduled_job('interval', seconds=10)
def manual_bump():
global inventory
inventory[2] += 1
@app.route("/")
async def test(request):
return text(inventory)
if __name__ == "__main__":
scheduler.start()
app.run(host="0.0.0.0", port=8000)
Функция, импортированная в 5-секундном интервальном задании, находится в другом файле в том же каталоге:
from app import inventory
def bump_inventory():
inventory[1] += 1
print('new', inventory)
Это, однако, не работает, как я надеялся. Импортированная функция обновляет инвентарь, но изменение никогда не распространяется на исходный словарь, поэтому либо bump_inventory
работает над копией inventory
или это никогда не обновляет это вне области функции. В двух разных терминалах:
]$ python app.py
2017-02-19 14:11:45,643: INFO: Goin' Fast @ http://0.0.0.0:8000
2017-02-19 14:11:45,644: INFO: Starting worker [26053]
new {1: 2, 2: 2}
new {1: 3, 2: 2}
]$ while true; do curl http://0.0.0.0:8000/; echo; sleep 1; done
{1: 1, 2: 2}
...
{1: 1, 2: 3}
...
Как правильно это сделать?
2 ответа
Догадаться. До сих пор не уверен, почему переменная общего доступа не обновляется (я думаю, что она все еще является копией), но передача ее в функцию в качестве аргумента работает просто отлично (поскольку мы передаем ссылки на объект, а не на реальный объект). Изменение 5-секундного интервала для этого работает:
@scheduler.scheduled_job('interval', seconds=5)
def bump():
global inventory
bumper.bump(inventory)
Это также удаляет циклический импорт (т.е. удаляет from app import inventory
) в другом файле.
1- Там нет необходимости использовать apscheduler
с asyncio. У вас есть все необходимые удобства, встроенные в asyncio, и он хорошо сочетается с Sanic.
2- Использование глобального состояния не рекомендуется, особенно в сценарии веб-приложения. Вы должны использовать базу данных или Redis. Но если вам по какой-то причине нужно состояние приложения, вы можете сохранить его прямо на app
объект.
Следующий релиз Sanic будет иметь add_task
способ добавить асинхронные задачи в ваше приложение. Вы можете установить главную ветку от Github, если хотите использовать это сейчас:
import asyncio
from sanic import Sanic
from sanic.response import text
app = Sanic()
app.inventory = {1:1, 2:2}
async def five_second_job(app):
while True:
app.inventory[1] += 1
await asyncio.sleep(5)
async def ten_second_job(app):
while True:
app.inventory[2] += 2
await asyncio.sleep(10)
@app.route("/")
async def test(request):
return text(app.inventory)
if __name__ == "__main__":
app.add_task(five_second_job(app))
app.add_task(ten_second_job(app))
app.run(host="0.0.0.0", port=9000)