Установить число потоков подписчика Python Pub/Sub для асинхронных подписчиков
Я реализовал асинхронный пул подписчик, используя Python. Это основной код
def receive_messages(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print ("A")
time.sleep(2)
print('Received message: {}'.format(message))
message.ack()
print ("B")
subscriber.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
Мне нужно напечатать как
A,
сообщение
В
сообщение
В
(Мне нужно запускать последовательно) или получать сообщения через данные потоков. Я не нахожу способ ограничить никакие темы. Моя программа выдала ошибку сегментации из-за множества потоков.
Как я контролирую нет потоков для получения сообщений.
3 ответа
Если вам требуется, чтобы ваши обратные вызовы обработки выполнялись последовательно, вам лучше использовать модель передачи сообщений, чем модифицировать внутренние компоненты подписчика. Если вы отправляете полученные сообщения в явную очередь. В очереди вы можете убедиться, что только один работник вытягивает из этой очереди, и только одно обрабатывается за один раз. Тем не менее, обратите внимание, что, хотя это дает вам гарантию "по одному" для обработки, если есть только одна подписка, она не дает вам никаких гарантий заказа. Сообщения могут по-прежнему обрабатываться в любом произвольном порядке относительно порядка их публикации.
Проблема может быть решена с помощью политики
from google.cloud import pubsub_v1
from concurrent import futures
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def callback(message):
print (str(message.data) + " " + str(threading.current_thread()))
message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)
Мы можем установить максимальное количество потоков, используя max_workers. Также можно настроить параметры управления потоком.
Если кто-то ищет более новую версию
from concurrent import futures
from google.cloud import pubsub_v1
executor = futures.ThreadPoolExecutor(max_workers=1)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor)
with pubsub_v1.SubscriberClient() as subscriber:
streaming_pull_future = subscriber.subscribe(subscription_name, callback, scheduler=scheduler, await_callbacks_on_shutdown=True)
timeout = 5 * 60 # seconds
try:
streaming_pull_future.result(timeout=timeout)
except Exception:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.