Как TTL (время жизни) применяется к пространству имен?

Apache Pulsar имеет функцию TTL, как описано в разделе " Сохранение и истечение срока действия официальной документации". Однако я не могу определить, где в конфигурации указано, как часто выполняется эта проверка. Используя стандарт bin/pulsar standalone команда, с настраиваемым пространством имен, с ttl, настроенным на 5 секунд bin/pulsar-admin namespaces set-message-ttl public/ttl-test --messageTTL 5,

Я вижу, что срок действия сообщений истекает только через заданный интервал, и в консоль выводится следующее сообщение журнала:

15: 11: 59.337 [pulsar-msg-expiry-monitor-52-1] ИНФОРМАЦИЯ org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://public/ttl-test/my-topic][spark-shell] Начальная проверка истечения срока действия сообщения, ttl= 5 секунд

Суть моего вопроса заключается в следующем: как я могу увеличить скорость проверки сообщений на предмет того, превышают ли они TTL?

2 ответа

Решение

Конфигурация messageExpiryCheckIntervalInMinutes в брокере определяет, как часто темы пространства имен проверяются на наличие сообщений с истекшим сроком действия.

Согласно официальной документации по конфигурации

Используйте команду set-message-ttl и укажите имя пространства имен (по умолчанию public/default для постоянной темы) и время.

bin/pulsar-admin namespaces set-message-ttl public/default --messageTTL 120

Пример кода производителя и потребителя для достижения TTL (клиент Python)

import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic-reader1')

producer.send(('Hello-Pulsar1').encode('utf-8'))
producer.send(('Hello-Pulsar2').encode('utf-8'))
producer.send(('Hello-Pulsar3').encode('utf-8'))

producer.close()
client.close()

Вы можете отправить несколько сообщений, используя метод отправки. Название темы должно быть одинаковым в обоих классах.

import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe("my-topic-reader1", "my-subscription")

//receive all the messages.whatever we publish
msg = consumer.receive()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

//Here we are not acknowledge all the messages.

//close the consumer and client
consumer.close()
client.close()

В течение 120 секунд мы снова открываем клиента и потребителя и пытаемся прочитать те же сообщения, которые не публикуются. Затем мы снова закрываем клиента и потребителя.

Позже (через 120 секунд) снова откроем клиента и потребителя, затем попытаемся получить сообщение. Но это не должно прийти. В этом состоянии вы достигаете Время жить.

Другие вопросы по тегам