Redis надежные очереди для многопоточной обработки
Для моего текущего проекта я использую Redis для рассылки сообщений по нескольким процессам. Теперь я должен сделать их надежными.
Я рассматриваю возможность использования шаблона Reliable queue через команду BRPOPLPUSH. Этот шаблон предполагает, что поток обработки удаляет лишнюю копию сообщения из "списка обработки" с помощью команды lrem после успешного завершения задания.
Поскольку я использую несколько потоков для выталкивания, дополнительные копии извлеченного элемента попадают в список обработки из нескольких потоков. То есть очередь обработки содержит элементы, вытолкнутые несколькими потоками. Как следствие, если поток завершает свою работу, он не может знать, какой элемент удалить из "очереди обработки".
Чтобы преодолеть эту проблему, я думаю, что я должен поддерживать несколько очередей обработки (по одной для каждого потока) на основе threadId. Итак, мой BRPOPLPUSH будет:
BRPOPLPUSH <primary-queue> <thread-specific-processing-queue>
Затем для очистки объектов тайм-аута мой поток мониторинга должен будет отслеживать все эти специфичные для потока очереди обработки.
Есть ли лучшие подходы к этой проблеме, чем задуманный выше?
1 ответ
@user779159
Для поддержки надежного механизма очереди мы используем следующий подход:
- two data structures
-- Redis List (the original queue from which items are popped regularly)
-- a Redis z-set, which temporarily stores the popped item.
Алгоритм:
-- When an element is popped, we store in z-set
-- If the task that picked the item completed its job, it will delete the entry from z-set.
-- If the task couldn't complete it, the item will be hanging around in z-set. So we know, whether a task was done within expected time or not.
-- Now, another background process periodically scans this z-set, picks up items which are timedout, and then puts them back to queue
Как это сделать:
- мы используем zset для хранения всплывающего элемента (обычно используя скрипт lua).
- Мы храним значение тайм-аута как ранг / счет этого элемента.
- Другой процесс сканера будет периодически (скажем, каждую минуту) запускать команду z-set zrangebyscore, чтобы выбрать элементы между (сейчас и в течение 1 минуты).
- Если по вышеуказанной команде найдены элементы, это означает, что процесс, который вытолкнул элемент (через brpop), не выполнил свою задачу вовремя.
- Таким образом, этот 2-й процесс поместит элемент обратно в очередь (список повторного просмотра), где он изначально принадлежал.