google.cloud.pubsub - Потоковое извлечение сообщений PubSub
В настоящее время я провожу некоторые тесты на последней google-cloud-pubsub==0.35.4
релиз pubsub. Мое намерение состоит в том, чтобы обработать бесконечный поток (изменяющийся в нагрузке), используя динамическое количество клиентских клиентов.
Тем не менее, когда у меня есть очередь скажем.. 600 сообщений и 1 клиент работает, а затем добавить дополнительные клиенты:
- Ожидаемый: все оставшиеся сообщения распределяются равномерно по всем клиентам
- Наблюдаемые: только новые сообщения распределяются по клиентам, все старые сообщения отправляются ранее существующим клиентам.
Ниже приведена упрощенная версия того, что я использую для своих клиентов (для справки мы будем использовать только тему с низким приоритетом). Я не буду включать издателя, поскольку он не имеет никакого отношения.
PRIORITY_HIGH = 1
PRIORITY_MEDIUM = 2
PRIORITY_LOW = 3
MESSAGE_LIMIT = 10
ACKS_PER_MIN = 100.00
ACKS_RATIO = {
PRIORITY_LOW: 100,
}
PRIORITY_TOPICS = {
PRIORITY_LOW: 'test_low',
}
PRIORITY_SEQUENCES = {
PRIORITY_LOW: [PRIORITY_LOW, PRIORITY_MEDIUM, PRIORITY_HIGH],
}
class Subscriber:
subscriber_client = None
subscriptions = {}
priority_queue = defaultdict(Queue.Queue)
priorities = []
def __init__(self):
logging.basicConfig()
self.subscriber_client = pubsub_v1.SubscriberClient()
for option, percentage in ACKS_RATIO.iteritems():
self.priorities += [option] * percentage
def subscribe_to_topic(self, topic, max_messages=10):
self.subscriptions[topic] = self.subscriber_client.subscribe(
BASE_TOPIC_PATH.format(project=PROJECT, topic=topic,),
self.process_message,
flow_control=pubsub_v1.types.FlowControl(
max_messages=max_messages,
),
)
def un_subscribe_from_topic(self, topic):
subscription = self.subscriptions.get(topic)
if subscription:
subscription.cancel()
del self.subscriptions[topic]
def process_message(self, message):
json_message = json.loads(message.data.decode('utf8'))
self.priority_queue[json_message['priority']].put(message)
def retrieve_message(self):
message = None
priority = random.choice(self.priorities)
ack_priorities = PRIORITY_SEQUENCES[priority]
for ack_priority in ack_priorities:
try:
message = self.priority_queue[ack_priority].get(block=False)
break
except Queue.Empty:
pass
return message
if __name__ == '__main__':
messages_acked = 0
pub_sub = Subscriber()
pub_sub.subscribe_to_topic(PRIORITY_TOPICS[PRIORITY_LOW], MESSAGE_LIMIT * 3)
while True:
msg = pub_sub.retrieve_message()
if msg:
json_msg = json.loads(msg.data.decode('utf8'))
msg.ack()
print ("%s - Akked Priority %s , High %s, Medium %s, Low %s" % (
datetime.datetime.now().strftime('%H:%M:%S'),
json_msg['priority'],
pub_sub.priority_queue[PRIORITY_HIGH].qsize(),
pub_sub.priority_queue[PRIORITY_MEDIUM].qsize(),
pub_sub.priority_queue[PRIORITY_LOW].qsize(),
))
time.sleep(60.0 / ACKS_PER_MIN)
Мне интересно, если это поведение присуще тому, как работает потоковое извлечение или существуют конфигурации, которые могут изменить это поведение.
Ура!
1 ответ
Учитывая документацию Cloud Pub/Sub, Cloud Pub/sub доставляет каждое опубликованное сообщение как минимум один раз для каждой подписки, тем не менее, есть некоторые исключения из этого поведения:
- Сообщение, которое не может быть доставлено в течение максимального срока хранения 7 дней, будет удалено.
- Сообщение, опубликованное до создания подписки, доставлено не будет.
Другими словами, служба будет доставлять сообщения в подписки, созданные до публикации сообщения, поэтому старые сообщения не будут доступны для новых подписок. Насколько я знаю, Cloud Pub/Sub не предоставляет возможности для изменения этого поведения.