gunicorn с работниками Gevent: использование общего глобального списка

Я пытаюсь реализовать отправленные сервером события в моем приложении Flask, следуя этому простому рецепту: http://flask.pocoo.org/snippets/116/

Для обслуживания приложения я использую Gunicorn с работниками Gevent.

Минимальная версия моего кода выглядит так:

import multiprocessing

from gevent.queue import Queue
from gunicorn.app.base import BaseApplication
from flask import Flask, Response

app = Flask('minimal')
# NOTE: This is the global list of subscribers
subscriptions = []


class ServerSentEvent(object):
    def __init__(self, data):
        self.data = data
        self.event = None
        self.id = None
        self.desc_map = {
            self.data: "data",
            self.event: "event",
            self.id: "id"
        }

    def encode(self):
        if not self.data:
            return ""
        lines = ["%s: %s" % (v, k)
                 for k, v in self.desc_map.iteritems() if k]
        return "%s\n\n" % "\n".join(lines)


@app.route('/api/events')
def subscribe_events():
    def gen():
        q = Queue()
        print "New subscription!"
        subscriptions.append(q)
        print len(subscriptions)
        print id(subscriptions)
        try:
            while True:
                print "Waiting for data"
                result = q.get()
                print "Got data: " + result
                ev = ServerSentEvent(unicode(result))
                yield ev.encode()
        except GeneratorExit:
            print "Removing subscription"
            subscriptions.remove(q)
    return Response(gen(), mimetype="text/event-stream")


@app.route('/api/test')
def push_event():
    print len(subscriptions)
    print id(subscriptions)
    for sub in subscriptions:
        sub.put("test")
    return "OK"


class GunicornApplication(BaseApplication):
    def __init__(self, wsgi_app, port=5000):
        self.options = {
            'bind': "0.0.0.0:{port}".format(port=port),
            'workers': multiprocessing.cpu_count() + 1,
            'worker_class': 'gevent',
            'preload_app': True,
        }
        self.application = wsgi_app
        super(GunicornApplication, self).__init__()

    def load_config(self):
        config = dict([(key, value) for key, value in self.options.iteritems()
                       if key in self.cfg.settings and value is not None])
        for key, value in config.iteritems():
            self.cfg.set(key.lower(), value)

    def load(self):
        return self.application


if __name__ == '__main__':
    gapp = GunicornApplication(app)
    gapp.run()

Проблема в том, что список подписчиков, кажется, различен для каждого работника. Это означает, что если работник № 1 обрабатывает /api/events конечная точка и добавляет нового подписчика в список, клиент будет получать только те события, которые добавляются, когда работник #1 также обрабатывает /api/test конечная точка.

Любопытно, что фактический объект списка кажется одинаковым для каждого работника, так как id(subscriptions) возвращает одинаковое значение в каждом работнике.

Это можно обойти? Я знаю, что мог бы просто использовать Redis, но приложение должно быть максимально автономным, поэтому я стараюсь избегать каких-либо внешних сервисов.

Обновление: причина проблемы, кажется, в моем встраивании gunicorn.app.base.BaseApplication (что является новой функцией в v0.19). При запуске приложения из командной строки с gunicorn -k gevent minimal:app все работает как положено

Обновление 2: предыдущее подозрение оказалось неверным, единственная причина, по которой оно сработало, заключалось в том, что число рабочих процессов по умолчанию от gunicorn 1 при настройке номера в соответствии с кодом через -w параметр, он демонстрирует такое же поведение.

1 ответ

Ты говоришь:

фактический объект списка кажется одинаковым для каждого работника, поскольку id(подписки) возвращает одинаковое значение для каждого работника.

но я думаю, что это не так, subscriptions на каждого работника не один и тот же объект. Каждый работник представляет собой отдельный процесс, имеет свое пространство памяти.

Для автономной системы вы можете разработать крошечную систему, функционирующую как простая версия Redis, Например, используя SQLite или ZeroMQ для связи между этими работниками.

Другие вопросы по тегам