Как TTL (время жизни) применяется к пространству имен?
Apache Pulsar имеет функцию TTL, как описано в разделе " Сохранение и истечение срока действия официальной документации". Однако я не могу определить, где в конфигурации указано, как часто выполняется эта проверка. Используя стандарт bin/pulsar standalone
команда, с настраиваемым пространством имен, с ttl, настроенным на 5 секунд bin/pulsar-admin namespaces set-message-ttl public/ttl-test --messageTTL 5
,
Я вижу, что срок действия сообщений истекает только через заданный интервал, и в консоль выводится следующее сообщение журнала:
15: 11: 59.337 [pulsar-msg-expiry-monitor-52-1] ИНФОРМАЦИЯ org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor - [persistent://public/ttl-test/my-topic][spark-shell] Начальная проверка истечения срока действия сообщения, ttl= 5 секунд
Суть моего вопроса заключается в следующем: как я могу увеличить скорость проверки сообщений на предмет того, превышают ли они TTL?
2 ответа
Конфигурация messageExpiryCheckIntervalInMinutes
в брокере определяет, как часто темы пространства имен проверяются на наличие сообщений с истекшим сроком действия.
Согласно официальной документации по конфигурации
Используйте команду set-message-ttl и укажите имя пространства имен (по умолчанию public/default для постоянной темы) и время.
bin/pulsar-admin namespaces set-message-ttl public/default --messageTTL 120
Пример кода производителя и потребителя для достижения TTL (клиент Python)
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic-reader1')
producer.send(('Hello-Pulsar1').encode('utf-8'))
producer.send(('Hello-Pulsar2').encode('utf-8'))
producer.send(('Hello-Pulsar3').encode('utf-8'))
producer.close()
client.close()
Вы можете отправить несколько сообщений, используя метод отправки. Название темы должно быть одинаковым в обоих классах.
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe("my-topic-reader1", "my-subscription")
//receive all the messages.whatever we publish
msg = consumer.receive()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
//Here we are not acknowledge all the messages.
//close the consumer and client
consumer.close()
client.close()
В течение 120 секунд мы снова открываем клиента и потребителя и пытаемся прочитать те же сообщения, которые не публикуются. Затем мы снова закрываем клиента и потребителя.
Позже (через 120 секунд) снова откроем клиента и потребителя, затем попытаемся получить сообщение. Но это не должно прийти. В этом состоянии вы достигаете Время жить.