gunicorn с работниками Gevent: использование общего глобального списка
Я пытаюсь реализовать отправленные сервером события в моем приложении Flask, следуя этому простому рецепту: http://flask.pocoo.org/snippets/116/
Для обслуживания приложения я использую Gunicorn с работниками Gevent.
Минимальная версия моего кода выглядит так:
import multiprocessing
from gevent.queue import Queue
from gunicorn.app.base import BaseApplication
from flask import Flask, Response
app = Flask('minimal')
# NOTE: This is the global list of subscribers
subscriptions = []
class ServerSentEvent(object):
def __init__(self, data):
self.data = data
self.event = None
self.id = None
self.desc_map = {
self.data: "data",
self.event: "event",
self.id: "id"
}
def encode(self):
if not self.data:
return ""
lines = ["%s: %s" % (v, k)
for k, v in self.desc_map.iteritems() if k]
return "%s\n\n" % "\n".join(lines)
@app.route('/api/events')
def subscribe_events():
def gen():
q = Queue()
print "New subscription!"
subscriptions.append(q)
print len(subscriptions)
print id(subscriptions)
try:
while True:
print "Waiting for data"
result = q.get()
print "Got data: " + result
ev = ServerSentEvent(unicode(result))
yield ev.encode()
except GeneratorExit:
print "Removing subscription"
subscriptions.remove(q)
return Response(gen(), mimetype="text/event-stream")
@app.route('/api/test')
def push_event():
print len(subscriptions)
print id(subscriptions)
for sub in subscriptions:
sub.put("test")
return "OK"
class GunicornApplication(BaseApplication):
def __init__(self, wsgi_app, port=5000):
self.options = {
'bind': "0.0.0.0:{port}".format(port=port),
'workers': multiprocessing.cpu_count() + 1,
'worker_class': 'gevent',
'preload_app': True,
}
self.application = wsgi_app
super(GunicornApplication, self).__init__()
def load_config(self):
config = dict([(key, value) for key, value in self.options.iteritems()
if key in self.cfg.settings and value is not None])
for key, value in config.iteritems():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
if __name__ == '__main__':
gapp = GunicornApplication(app)
gapp.run()
Проблема в том, что список подписчиков, кажется, различен для каждого работника. Это означает, что если работник № 1 обрабатывает /api/events
конечная точка и добавляет нового подписчика в список, клиент будет получать только те события, которые добавляются, когда работник #1 также обрабатывает /api/test
конечная точка.
Любопытно, что фактический объект списка кажется одинаковым для каждого работника, так как id(subscriptions)
возвращает одинаковое значение в каждом работнике.
Это можно обойти? Я знаю, что мог бы просто использовать Redis, но приложение должно быть максимально автономным, поэтому я стараюсь избегать каких-либо внешних сервисов.
Обновление: причина проблемы, кажется, в моем встраивании gunicorn.app.base.BaseApplication
(что является новой функцией в v0.19). При запуске приложения из командной строки с gunicorn -k gevent minimal:app
все работает как положено
Обновление 2: предыдущее подозрение оказалось неверным, единственная причина, по которой оно сработало, заключалось в том, что число рабочих процессов по умолчанию от gunicorn 1
при настройке номера в соответствии с кодом через -w
параметр, он демонстрирует такое же поведение.
1 ответ
Ты говоришь:
фактический объект списка кажется одинаковым для каждого работника, поскольку id(подписки) возвращает одинаковое значение для каждого работника.
но я думаю, что это не так, subscriptions
на каждого работника не один и тот же объект. Каждый работник представляет собой отдельный процесс, имеет свое пространство памяти.
Для автономной системы вы можете разработать крошечную систему, функционирующую как простая версия Redis
, Например, используя SQLite или ZeroMQ для связи между этими работниками.