Веб-сервис Python, подписанный на реактивный источник, вызывает странное поведение в объекте
Я реализовал веб-сервис, используя Falcon. Эта служба хранит конечный автомат (pytransitions), который передается ресурсам службы в конструкторе. Служба работает с Gunicorn.
Веб-сервис запускает процесс при запуске с использованием RxPy. Событие вернулось в on_next(event)
используется для запуска перехода в конечный автомат.
БАГ
Я ожидаю, что конечный автомат имеет согласованное состояние как в службе, так и в ресурсах, но кажется, что в ресурсах состояние никогда не изменяется.
У нас есть тест, который пытается воспроизвести это поведение, но на удивление тест работает
class TochoLevel(object):
def __init__(self, tochine):
self.tochine = tochine
def on_get(self, req, res):
res.status = falcon.HTTP_200
res.body = self.tochine.state
def get_machine():
states = ["low", "medium", "high"]
transitions = [
{'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
{'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
{'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
]
locked_factory = MachineFactory.get_predefined(locked=True)
return locked_factory(
states=states,
transitions=transitions,
initial='low',
auto_transitions=False,
queued=False
)
def _level_observable(observer):
for i in range(1, 21):
sleep(0.1)
next_val = 'to_low'
if 8 <= i <= 15:
next_val = 'to_medium'
elif i > 15:
next_val = 'to_high'
observer.on_next(next_val)
observer.on_completed()
def get_level_observable():
return Observable.create(_level_observable)
class NotBlockingService(falcon.API):
def __init__(self):
super(NotBlockingService, self).__init__()
self.tochine = get_machine()
self.add_route('/tochez', TochoLevel(self.tochine))
def _run_machine(self, val):
self.tochine.trigger(val)
print('machine exec: {}, state: {}'.format(val, self.tochine.state))
return self.tochine.state
def start(self):
source = get_level_observable()
(source.subscribe_on(ThreadPoolScheduler(2))
.subscribe(self._run_machine))
def test_can_query_falcon_service_while_being_susbcribed_as_observer():
svc = NotBlockingService()
client = testing.TestClient(svc)
assert client.simulate_get('/tochez').text == 'low'
start = time()
svc.start()
sleep(1.2)
assert client.simulate_get('/tochez').text == 'medium'
end = time()
sleep(1.2)
assert client.simulate_get('/tochez').text == 'high'
assert (end - start) < 2
ВОПРОС
Почему конечный автомат не меняет состояние в ресурсе TochoLevel
когда я запускаю службу с Gunicorn и размножать государства в on_next
метод RXPY?
1 ответ
Конечно, когда вы выполняете свой сервис в режиме разработки, вы используете только один форк (один процесс выполнения). Когда вы используете программное обеспечение, такое как Gunicorn, вы используете стратегию предварительной обработки для надежного обслуживания в производственной среде.
Стратегия предварительной обработки генерирует множество подпроцессов для разрешения запроса, и логика является независимой, работая каждый разветвление в автономном режиме между различными запросами.
Gunicorn, благодаря стандартизированной схеме приложения для WSGI в Python ( Python2_PEP-333 & Python3_PEP-3333), получает объект APP. Gunicorn запускает столько экземпляров (префорков), сколько указано в его конфигурации. Gunicorn называет таких работников вилками и по умолчанию использует 1 работника. Каждый работник будет работать со своим статусом, и, возможно, Gunicorn также создает новый экземпляр объекта App для каждого запроса...
Это причина вашего состояния машины без настойчивости.
Совет: попробуйте сначала запустить Gunicorn с 1 рабочим и проверьте постоянство состояния конечного автомата. Если вы достигнете устойчивости конечного автомата, то второй проблемой, которую необходимо решить, будет синхронизация конечного автомата по всем рабочим.