Недокументированная очередь задач управляемой виртуальной машины RPCFailedError

Я столкнулся с очень специфической и недокументированной проблемой с управляемой виртуальной машиной GAE и очередями задач. Я понимаю, что служба Managed VM находится в бета-версии, поэтому этот вопрос может быть не актуален вечно, но сейчас он определенно вызывает у меня сильную головную боль.

Основным симптомом проблемы является то, что в определенных (не полностью мне известных) обстоятельствах я вижу следующую ошибку / трассировку:

  File "/home/vmagent/my_app/some_file.py", line 265, in some_ndb_tasklet
    res = yield some_task.add_async('some-task-queue-name')
  File "/home/vmagent/python_vm_runtime/google/appengine/ext/ndb/tasklets.py", line 472, in _on_rpc_completion
    result = rpc.get_result()
  File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 613, in get_result
    return self.__get_result_hook(self)
  File "/home/vmagent/python_vm_runtime/google/appengine/api/taskqueue/taskqueue.py", line 1948, in ResultHook
    rpc.check_success()
  File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 579, in check_success
    self.__rpc.CheckSuccess()
  File "/home/vmagent/python_vm_runtime/google/appengine/ext/vmruntime/vmstub.py", line 312, in _WaitImpl
    raise self._ErrorException(*_DEFAULT_EXCEPTION)
RPCFailedError: The remote RPC to the application server failed for call taskqueue.BulkAdd().

Я прошел через свой локальный App Engine SDK, чтобы проследить это, и я могу перейти к последней строке трассировки, но google/appengine/ext/vmruntime/ не существует на моей машине вообще, поэтому я понятия не имею, что происходит в vmstub.py, Смотря на местный код, some_task.add_async('the-queue') раскручивает RPC и ждет его завершения, но эта ошибка не except apiproxy_errors.ApplicationError, e: в строке 1949 года taskqueue.py ожидает...

Код, который генерирует ошибку, выглядит примерно так:

@ndb.tasklet
def kickoff_tasks(batch_of_payloads):
    for task_payload in batch_of_payloads:
        # task_payload is a dict
        task = taskqueue.Task(
            url='/the/handler/url',
            params=payload)
        res = yield task.add_async('some-valid-task-queue-name')

Другие вещи, которые стоит отметить:

  • сам этот код выполняется в обработчике задачи, запущенном другой задачей.
  • Я впервые увидел эту ошибку, прежде чем реализовывать такого рода пакетирование, и предположил, что проблема была в том, что я добавил слишком много задач из обработчика задач.
  • В некоторых случаях я могу выполнить это успешно с размером пакета 100, но в других, он последовательно терпит неудачу (в зависимости от данных в полезных нагрузках) на уровне 100, а иногда успешно при размере пакета 50.
  • Сами полезные данные задачи включают партии элементов и настроены так, чтобы быть достаточно маленькими, чтобы соответствовать задаче. App Engine объявляет максимальный размер задачи 100 КБ, поэтому сейчас я сохраняю полезную нагрузку до 90000 байт. Уменьшение размера еще больше, кажется, не помогает никому.
  • Я также попытался реализовать экспоненциальный откат, чтобы повторить kickoff_tasks метод, когда появляется эта ошибка, но кажется, что как только ошибка возникла, я не могу добавить другие задачи из одного и того же обработчика (т.е. я не могу запустить задачу "продолжить с того места, где вы остановились", я просто нужно дать этому сбою и перезапустить себя)

Итак, мой вопрос, что на самом деле вызывает эту ошибку? Как я могу избежать этого или исправить это так, чтобы я справился с этим правильно?

2 ответа

Решение

Это известная проблема, над которой ведется работа. На самом деле есть две проблемы - сам сбой RPC и отсутствие обработки SDK исключения RPCFailedError.

Здесь есть публичное обсуждение этой проблемы.

Если вы используете App Engine Flexible и python-compat-multicore image, появилась новая ошибка, связанная с App Engine с использованием более новой версии библиотеки запросов, которая нарушила связь между App Engine Flexible и хранилищем данных. Вы можете исправить эту ошибку, обезьяна исправляя библиотеку в вашем appengine_config.py файл.

Добавьте следующий код в appengine_config.py:

try:
    import appengine.ext.vmruntime.vmstub as vmstub
except ImportError:
    pass
else:
    if isinstance(vmstub.DEFAULT_TIMEOUT, (int, long)):
        # Newer requests libraries do not accept integers as header values. 
        # Be sure to convert the header value before sending. 
        # See Support Case ID 11235929.
        vmstub.DEFAULT_TIMEOUT = bytes(vmstub.DEFAULT_TIMEOUT)

Обратите внимание, что если у вас нет appengine_config.py файл, вы можете просто создать его в каталоге вашего базового проекта (где бы вы ни app.yaml файл). Этот файл запускается при запуске App Engine.

Другие вопросы по тегам