Опрос сообщений Spring Integration

У меня есть настройка конфигурации Spring для опроса сообщений из очереди БД:

<int:annotation-config default-publisher-channel="messageChannel" />

<task:executor id="messageTaskExecutor" pool-size="1"
    queue-capacity="1" rejection-policy="CALLER_RUNS" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>

<bean id="messageQueryProvider"
    class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />

<bean id="messageSessionStore"
    class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource" />
    <property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
    <property name="tablePrefix" value="QUEUE_" />
    <property name="usingIdCache" value="true" />
</bean>

<int:channel id="messageChannel">
    <int:queue message-store="messageSessionStore" />
</int:channel>

<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
    <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>

Тем не менее, приложение работает на нескольких узлах. Когда сервер перезагружается, кажется, что сообщения перехватываются более чем 1 узлом (все узлы одновременно закрываются и перезапускаются последовательно). Есть ли способ избежать множественной обработки сообщений?

1 ответ

Решение

Это как-то не возможно, используя OracleChannelMessageStoreQueryProvider, Просто потому, что мы полагаемся на FOR UPDATE SKIP LOCKED, Поэтому, когда SELECT выполняется одним узлом, записи блокируются, а следующий переходит к следующим свободным строкам в таблице.

В JavaDoc нет setUsingIdCache():

 * <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
 * to true, as the Oracle query will ignore locked rows.</p>

Но я думаю, что это совершенно не связано. Удаление этого параметра и <int:transaction-synchronization-factory> Вы упростите свою конфигурацию, но поведение не должно быть изменено.

Я думаю, что вы видите, как round-robin: один узел получает первый ряд, следующий пропускает его и получает следующий.

Я почему-то не верю, что разные узлы получают одно и то же сообщение, когда это Oracle.

Другие вопросы по тегам