Выборочная очередь несвязанных сообщений в Oracle Advanced Queuing
Этот вопрос относится к исключению из очереди сообщений в Oracle Streams Advanced Queuing.
Мне нужно убедиться, что сообщения, связанные друг с другом, обрабатываются последовательно.
Например, предположим, что очередь заполнена четырьмя сообщениями, имеющими связанное с бизнесом поле, называемое ссылкой транзакции (txn_ref), и два сообщения (1,3) принадлежат одной и той же транзакции (000001):
id | txn_ref |
---+---------+
1 | 000001 |
2 | 000002 |
3 | 000001 |
4 | 000003 |
Предположим также, что я запускаю 4 потока / процесса, которые хотят удалить из этой очереди. Должно произойти следующее:
- поток 1 удаляет сообщение #1
- поток 2 удаляет сообщение № 2
- Поток 3 исключает сообщение № 4 (поскольку сообщение № 3 относится к № 1, а № 1 еще не завершен).
- поток 4 блока, ожидающих сообщения
- поток 1 фиксирует свою работу для сообщения #1
- поток 4 (или, возможно, поток 1) удаляет сообщение № 3.
Первоначально я думал, что я мог бы достичь этого с условием ожидания, когда ENQ_TIME (время постановки в очередь) не позднее, чем любой другой ENQ_TIME из всех сообщений, имеющих одинаковый TXN_REF. Но моя проблема в том, как сослаться на TXN_REF сообщения, которое я еще не выбрал, чтобы выбрать его. например
// Java API
String condition = "ENQ_TIME = (select min(ENQ_TIME) from AQ_TABLE1 where ??";
dequeueOption.setCondition(condition);
Можно ли здесь добиться того, чего я хочу?
2 ответа
Чтобы ответить на ваш прямой вопрос, это может быть достигнуто с помощью correlation
поле (называется CORRID
в таблице), которая предназначена для этой цели.
Итак, в очереди вы бы использовали AQMessageProperties.setCorrelation()
метод со значением TXN_REF в качестве параметра. Затем в вашем состоянии вы бы сделали что-то вроде этого:
// Java API
String condition = "tab.ENQ_TIME = (select min(AQ_TABLE1.ENQ_TIME) from AQ_TABLE1 self where tab.CORRID=AQ_TABLE1.CORRID)";
dequeueOption.setCondition(condition);
Стратегия, которую вы можете попробовать, если это возможно, использует группы сообщений. Документация Oracle описывает это кратко, но я нашел эту статью о мире жаб гораздо более полезной. По сути, вы настраиваете таблицу очередей, чтобы обрабатывать все сообщения, отправленные одновременно, как одну "группу". При снятии очереди только один пользователь может одновременно исключать из "группы" сообщений.