Сообщения Google Cloud PubSub, не обработанные обратным вызовом
Я пытаюсь использовать Google PubSub для передачи и получения сообщений между двумя службами. Однако некоторые из отправленных сообщений, по-видимому, отбрасываются случайным образом и не обрабатываются методом обратного вызова подписчика.
При отправке сообщений около половины сообщений обрабатываются методом обратного вызова. Для другой половины метод обратного вызова, кажется, не вызывается вообще (информация не регистрируется). Тем не менее, сообщения по-прежнему исчезают из темы и не пересылаются.
Код, используемый для запуска подписчика:
logger = logging.getLogger(LOGGER_NAME)
logger.info('Starting the pubsub subscriber')
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(GOOGLE_CLOUD_PROJECT, SUBSCRIPTION_NAME)
subscriber.subscribe(subscription_path, callback=callback)
while True:
try:
sleep(60)
except Exception as e:
// Log exception
Метод обратного вызова:
def callback(message):
logger = logging.getLogger(LOGGER_NAME)
logger.info(f'Recieved callback with message: {message}', extra = {'callback_message': message} )
// Process message
Ошибка, кажется, на стороне подписчика. Сообщения отправляются от издателя, и если подписчик не подключен к теме, сообщения не исчезают.
Я пытался использовать Flow Control для контроля количества сообщений, получаемых подписчиком, но, похоже, это не имеет никакого эффекта.
Могут ли сообщения обрабатываться без вызова метода обратного вызова? Существуют ли другие причины, по которым сообщения могут исчезать из темы?
РЕДАКТИРОВАТЬ: Оказывается, другой сервис читал из той же подписки, обрабатывая пропущенные сообщения.
1 ответ
Я знаю, что вы нашли ответ на свою проблему, но я подумал, что было бы целесообразно перечислить некоторые полезные шаги для отладки этого типа проблемы:
- Убедитесь, что сообщения действительно были опубликованы. Когда публикация завершается успешно, ответ должен включать идентификатор сообщения, например, в виде строки, полученной APIFuture в методе публикации Java.
- Проверьте, не создается ли резерв сообщений. Вы можете просмотреть
subscription/oldest_unacked_message_age
а такжеsubscription/num_undelivered_messages
через Stackdriver. - Проверьте, установлен ли у вашего подписчика контроль потока, который не позволяет вам своевременно получать все сообщения. Если у вас установлено управление потоком данных, и оно препятствует доставке всех сообщений, вы, скорее всего, увидите, что количество недоставленных сообщений увеличивается в Stackdriver.
- Убедитесь, что у вас нет дополнительных клиентов, подписывающихся на сообщения для той же подписки. Например, возможно, вы используете инструмент gcloud для просмотра и просмотра сообщений. В этой ситуации вы, вероятно, не увидите увеличения количества недоставленных сообщений в Stackdriver.
Если после проверки всего этого вы не уверены, что происходит с вашими сообщениями, лучше связаться со службой поддержки, указав название вашего проекта и подписку, а также идентификаторы любых сообщений, которые, по вашему мнению, не были доставлены.