Недокументированная очередь задач управляемой виртуальной машины RPCFailedError
Я столкнулся с очень специфической и недокументированной проблемой с управляемой виртуальной машиной GAE и очередями задач. Я понимаю, что служба Managed VM находится в бета-версии, поэтому этот вопрос может быть не актуален вечно, но сейчас он определенно вызывает у меня сильную головную боль.
Основным симптомом проблемы является то, что в определенных (не полностью мне известных) обстоятельствах я вижу следующую ошибку / трассировку:
File "/home/vmagent/my_app/some_file.py", line 265, in some_ndb_tasklet
res = yield some_task.add_async('some-task-queue-name')
File "/home/vmagent/python_vm_runtime/google/appengine/ext/ndb/tasklets.py", line 472, in _on_rpc_completion
result = rpc.get_result()
File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 613, in get_result
return self.__get_result_hook(self)
File "/home/vmagent/python_vm_runtime/google/appengine/api/taskqueue/taskqueue.py", line 1948, in ResultHook
rpc.check_success()
File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 579, in check_success
self.__rpc.CheckSuccess()
File "/home/vmagent/python_vm_runtime/google/appengine/ext/vmruntime/vmstub.py", line 312, in _WaitImpl
raise self._ErrorException(*_DEFAULT_EXCEPTION)
RPCFailedError: The remote RPC to the application server failed for call taskqueue.BulkAdd().
Я прошел через свой локальный App Engine SDK, чтобы проследить это, и я могу перейти к последней строке трассировки, но google/appengine/ext/vmruntime/
не существует на моей машине вообще, поэтому я понятия не имею, что происходит в vmstub.py
, Смотря на местный код, some_task.add_async('the-queue')
раскручивает RPC и ждет его завершения, но эта ошибка не except apiproxy_errors.ApplicationError, e:
в строке 1949 года taskqueue.py ожидает...
Код, который генерирует ошибку, выглядит примерно так:
@ndb.tasklet
def kickoff_tasks(batch_of_payloads):
for task_payload in batch_of_payloads:
# task_payload is a dict
task = taskqueue.Task(
url='/the/handler/url',
params=payload)
res = yield task.add_async('some-valid-task-queue-name')
Другие вещи, которые стоит отметить:
- сам этот код выполняется в обработчике задачи, запущенном другой задачей.
- Я впервые увидел эту ошибку, прежде чем реализовывать такого рода пакетирование, и предположил, что проблема была в том, что я добавил слишком много задач из обработчика задач.
- В некоторых случаях я могу выполнить это успешно с размером пакета 100, но в других, он последовательно терпит неудачу (в зависимости от данных в полезных нагрузках) на уровне 100, а иногда успешно при размере пакета 50.
- Сами полезные данные задачи включают партии элементов и настроены так, чтобы быть достаточно маленькими, чтобы соответствовать задаче. App Engine объявляет максимальный размер задачи 100 КБ, поэтому сейчас я сохраняю полезную нагрузку до 90000 байт. Уменьшение размера еще больше, кажется, не помогает никому.
- Я также попытался реализовать экспоненциальный откат, чтобы повторить
kickoff_tasks
метод, когда появляется эта ошибка, но кажется, что как только ошибка возникла, я не могу добавить другие задачи из одного и того же обработчика (т.е. я не могу запустить задачу "продолжить с того места, где вы остановились", я просто нужно дать этому сбою и перезапустить себя)
Итак, мой вопрос, что на самом деле вызывает эту ошибку? Как я могу избежать этого или исправить это так, чтобы я справился с этим правильно?
2 ответа
Это известная проблема, над которой ведется работа. На самом деле есть две проблемы - сам сбой RPC и отсутствие обработки SDK исключения RPCFailedError.
Здесь есть публичное обсуждение этой проблемы.
Если вы используете App Engine Flexible и python-compat-multicore
image, появилась новая ошибка, связанная с App Engine с использованием более новой версии библиотеки запросов, которая нарушила связь между App Engine Flexible и хранилищем данных. Вы можете исправить эту ошибку, обезьяна исправляя библиотеку в вашем appengine_config.py
файл.
Добавьте следующий код в appengine_config.py
:
try:
import appengine.ext.vmruntime.vmstub as vmstub
except ImportError:
pass
else:
if isinstance(vmstub.DEFAULT_TIMEOUT, (int, long)):
# Newer requests libraries do not accept integers as header values.
# Be sure to convert the header value before sending.
# See Support Case ID 11235929.
vmstub.DEFAULT_TIMEOUT = bytes(vmstub.DEFAULT_TIMEOUT)
Обратите внимание, что если у вас нет appengine_config.py
файл, вы можете просто создать его в каталоге вашего базового проекта (где бы вы ни app.yaml
файл). Этот файл запускается при запуске App Engine.