Очередь со справедливым удалением из нескольких ключей производителя
У меня есть такой сценарий, когда я получаю события из тысяч источников. Каждый источник отправляет информацию о своем текущем статусе. Хотя я хочу обработать все события, более важно сначала обработать последнее событие каждого источника, чтобы текущее представление было актуальным. Так что я думал об использовании ConcurrentHashMap
с идентификатором каждого источника в качестве ключа и очередью LIFO (стеком) в качестве значения. Затем я бы перебрал ключи Map
и просто вытолкнуть один предмет из стека каждого источника.
Меня беспокоит то, что, пока я перебираю ключи и убираю элементы из очереди каждого ключа, производитель может публиковать новые события в очередях, что может создать проблемы параллелизма. Производитель может также добавить новые ключи на карту и выполнить итерации entrySet
из Map
кажется слабо последовательным. Что не является большой проблемой, потому что новый элемент будет обработан в следующей итерации. В идеале я мог бы также использовать некоторую параллельную обработку в потоке entrySet
ускорить процесс.
Мне интересно, есть ли более чистый подход к этому. На самом деле я мог бы использовать LIFO BlockingDequeue
и сначала обрабатывал последние события, но проблема этого подхода в том, что существует риск того, что один источник может отправить больше событий, чем другие, и, таким образом, возможно, обработать больше событий, чем другие.
Есть ли какая-либо другая структура данных, которую я мог бы рассмотреть, которая обеспечивает такое поведение? По сути, я ищу способ расставить приоритеты для событий из каждого источника и в то же время дать реальную возможность каждому источнику обрабатываться потребителем.
2 ответа
Вы думали о очереди FIFO очередей LIFO? Каждый источник добавляет в свою очередь LIFO, и для обработки вы берете первую очередь LIFO из очереди FIFO, обрабатываете одно событие и затем помещаете его обратно в очередь FIFO. Таким образом, у вас также не должно быть проблем с новыми источниками, поскольку их очередь LIFO будет просто добавлена в очередь FIFO.
Для добавления событий в правильную очередь LIFO вы можете поддерживать дополнительный HashMap, который знает очередь для каждого источника, и если возникает новый источник, которого еще нет в Map, вы знаете, что нужно добавить его очередь LIFO в очередь FIFO.
Я рекомендую создать собственную структуру для управления этим, поскольку она добавляет гибкость (и скорость) для вашего конкретного случая использования.
Я бы пошел с круговой очередью для хранения каждой очереди LIFO (стека). Круговая очередь - это та, где вы добавляете элементы в хвост и читаете (но не удаляете) из головы. Как только голова = хвост, вы начинаете все сначала.
Вы можете создать свою собственную очередь, используя простой массив. Не так сложно управлять синхронизацией операций, таких как добавление очередей в массив и расширение по мере необходимости. Я считаю, что добавление очередей в массив - это не то, что вы делаете очень часто.
Этим легко управлять, и вы можете расширить круговую очередь, чтобы рассчитать частоту доступа к записям, и уменьшить частоту доступа к его записям (добавляя / удаляя потоки пользователей, или даже заставляя их немного подождать, прежде чем использовать из стек управляется записью).
Вы даже можете избежать блокировки потоков при чтении элементов из циклической очереди с использованием нескольких потоков, заставляя их вызывать операцию "register" перед использованием из стека: каждый поток имеет свой идентификатор, а когда он "регистрирует", идентификатор сохраняется в заданном запись в очереди. Перед регистрацией и перед извлечением из стека поток выполняет операцию "чтения регистрационного идентификатора", и возвращаемый идентификатор должен соответствовать своему собственному идентификатору. Это означает, что только поток, который "владеет" данной записью в очереди, может извлекаться из этого стека. Если регистрация / подтверждение процесса регистрации завершается неудачей, это означает, что другой поток потребляет эту запись, поэтому текущий поток переходит к следующей доступной записи.
Я использовал такую стратегию в прошлом, и она масштабировалась как очарование. Я надеюсь, что это имеет смысл для вас.