Опрос сообщений Spring Integration
У меня есть настройка конфигурации Spring для опроса сообщений из очереди БД:
<int:annotation-config default-publisher-channel="messageChannel" />
<task:executor id="messageTaskExecutor" pool-size="1"
queue-capacity="1" rejection-policy="CALLER_RUNS" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
<bean id="messageQueryProvider"
class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />
<bean id="messageSessionStore"
class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource" />
<property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
<property name="tablePrefix" value="QUEUE_" />
<property name="usingIdCache" value="true" />
</bean>
<int:channel id="messageChannel">
<int:queue message-store="messageSessionStore" />
</int:channel>
<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>
Тем не менее, приложение работает на нескольких узлах. Когда сервер перезагружается, кажется, что сообщения перехватываются более чем 1 узлом (все узлы одновременно закрываются и перезапускаются последовательно). Есть ли способ избежать множественной обработки сообщений?
1 ответ
Это как-то не возможно, используя OracleChannelMessageStoreQueryProvider
, Просто потому, что мы полагаемся на FOR UPDATE SKIP LOCKED
, Поэтому, когда SELECT
выполняется одним узлом, записи блокируются, а следующий переходит к следующим свободным строкам в таблице.
В JavaDoc нет setUsingIdCache()
:
* <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
* to true, as the Oracle query will ignore locked rows.</p>
Но я думаю, что это совершенно не связано. Удаление этого параметра и <int:transaction-synchronization-factory>
Вы упростите свою конфигурацию, но поведение не должно быть изменено.
Я думаю, что вы видите, как round-robin
: один узел получает первый ряд, следующий пропускает его и получает следующий.
Я почему-то не верю, что разные узлы получают одно и то же сообщение, когда это Oracle.