Запуск интегратора Spring с неправильным интервалом времени

Код: https://github.com/giuliopulina/spring-integration-poller

У меня возникла проблема при попытке создать JDBC Poller с интеграцией Spring.

Когда я добавляю в таблицу новые данные, обработка идет медленнее, чем ожидалось: все работает нормально, за исключением того факта, что опрос запускается каждые 60 секунд, и я не могу понять, почему.

2015-05-27 10: 50: 40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Разрешенное выражение #root.![Pk] to (список pks)

2015-05-27 10: 51: 40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Разрешенное выражение #root.![Pk] to (список pks)

Это соответствующая часть конфигурации xml интеграции пружины:

<task:executor id="pollerPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>

<!--<task:executor id="processingPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/> -->

<bean id="jdbcSource" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
    <constructor-arg ref="dataSource"/>
    <constructor-arg value="XXXXXXXXXXXXXX"/>
    <property name="updateSql" value="XXXXXXXXXXXXXXXX"/>
    <property name="maxRowsPerPoll" value="50"/>
</bean>

<int:inbound-channel-adapter send-timeout="10000" auto-startup="false" id="inboundAdapter" ref="jdbcSource" channel="jdbcOutputChannel">
    <int:poller receive-timeout="3000" time-unit="MILLISECONDS" fixed-rate="0" error-channel="errorChannel" task-executor="pollerPool">
        <int:advice-chain>
            <ref bean="threadPrepareInterceptor"/>
            <ref bean="txAdvice"/>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<int:service-activator id="serviceActivator" input-channel="jdbcOutputChannel" ref="someServiceActivatorBean"/>

<tx:advice id="txAdvice" transaction-manager="txManager">
    <tx:attributes>
        <tx:method name="get*" read-only="true"/>
        <tx:method name="*"/>
    </tx:attributes>
</tx:advice>

<int:channel id="jdbcOutputChannel"  >
    <!-- using direct channel -->
    <!--<int:dispatcher task-executor="processingPool"/>-->
</int:channel>

Не могли бы вы помочь мне понять проблему?

ОБНОВИТЬ:

По поводу предложения "jdbcOutputChannel" о транзакциях я согласен и изменил свою конфигурацию в соответствии с вашей подсказкой, поскольку она более чистая (во всяком случае, активатор службы также выполнялся в отдельной транзакции, даже если это не было указано в примере xml).

Что касается проблемы, с которой я столкнулся, я попытался удалить все остальные компоненты интеграции Spring, и программа опроса непрерывно запускалась, как я и ожидала (я знаю, что значение fixed-rate=0 слишком велико:)) Вместо этого, когда в проекте настроены другие программы опроса вот так и мой поллер, похоже, наследует тот же тайм-аут:

<int:service-activator id="someOtherServiceActivator">
    <int:poller fixed-rate="0" error-channel="someOtherPollerErrorChannel" receive-timeout="60000" />
</int:service-activator>

Переключение тайм-аута других опрошенных на 10000 мс, также мой поллер срабатывает каждые 10 секунд (вместо 60). Я не могу поделиться полной конфигурацией интеграции с пружиной, но я хотел бы спросить: может быть, полностью разделенный поллер может изменять поведение друг друга?

ОБНОВЛЕНИЕ 2: Я создал отдельный проект, пытаясь воспроизвести проблему, но все же мне не удалось это сделать. Итак, я попытался удалить следующую конфигурацию, которая была введена для того, чтобы запускать опросы только тогда, когда приложение полностью запущено и работает:

<int:publish-subscribe-channel id="startupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="startupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="startupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="startupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" /> 

И проблема исчезла, даже если это я могу полностью понять причину. В любом случае, создание другого startupChannel для моего опроса работает отлично:

<int:publish-subscribe-channel id="globalStartupChannel" />
<int:publish-subscribe-channel id="myStartupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="globalStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int-event:inbound-channel-adapter channel="myStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="globalStartupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="myStartupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />

ОБНОВЛЕНИЕ 3:

При подготовке проекта с кодом для вас я заметил следующий журнал:

Информация: Ни один бин с именем taskScheduler не был явно определен. Поэтому будет создан ThreadPoolTaskScheduler по умолчанию.

Итак, я добавил следующую конфигурацию, и теперь все работает нормально:

<task:scheduler id="taskScheduler" pool-size="20" />

Я предполагаю, что по умолчанию размер пула равен 10, поэтому в некотором роде конфигурация перезаписывается при наличии totalNumberOfPollers > taskScheduler.size(). Я прав?

Спасибо Джулио

1 ответ

Решение

Я не могу воспроизвести вашу ситуацию; Я предлагаю вам сделать дамп потока между опросами, чтобы посмотреть, что этот поток делает.

Тем не менее, fixed-rate из 0 невероятно агрессивен; Ваши администраторы базы данных, скорее всего, без задержек проведут подобный опрос.

Также, jdbcOutputChannelбудучи ExecutorChannel, означает, что транзакция будет зафиксирована сразу после отправки сообщения на этот канал. Если вы хотите, чтобы поток выполнялся в транзакциях, вы не должны использовать диспетчер здесь.

РЕДАКТИРОВАТЬ:

Я до сих пор не могу воспроизвести вашу ситуацию с этим...

<int:control-bus input-channel="input"/>

<int-event:inbound-channel-adapter channel="ps" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:publish-subscribe-channel id="ps" />

<int:transformer input-channel="ps" output-channel="input" expression="'@foo.start()'" />

<int:transformer input-channel="ps" output-channel="input" expression="'@sa.start()'" />

<int:inbound-channel-adapter id="foo" channel="bar" expression="'foo'" auto-startup="false">
    <int:poller fixed-rate="1000" />
</int:inbound-channel-adapter>

<int:channel id="bar">
    <int:queue />
</int:channel>

<int:service-activator id="sa" input-channel="bar" output-channel="baz" auto-startup="false"
        expression="payload.toUpperCase()">
    <int:poller fixed-rate="6000" receive-timeout="0" />
</int:service-activator>

<int:logging-channel-adapter id="baz" level="ERROR"/>

... как и ожидалось, я вижу 6 FOOкаждые 6 секунд (i-c-a опрашивается один раз в секунду, а sa запускается один раз каждые 6 секунд).

EDIT2:

Я посмотрел на ваш проект, и основная причина вашей проблемы, как вы говорите, во многих опрошенных конечных точках, но на самом деле, это так:

fixed-rate="0" receive-timeout="60000"

При такой конфигурации ресурсы планировщика (потоки) блокируются в QueueChannelи, как вы обнаружили, вы исчерпали все ресурсы.

Одним из решений является увеличение количества потоков в пуле планировщика.

В этой конфигурации кажется, что вы пытаетесь получить по запросу обмен сообщениями с нулевой задержкой с помощью опроса, когда опрашивающий постоянно ожидает в очереди метод receive().

Если вы не можете позволить себе задержку, подумайте об использовании DirectChannelвместо. Если вы не хотите, чтобы нижняя конечная точка работала в потоке вызывающего, используйте ExecutorChannels...

<task:executor id="exec" pool-size="100"/>

<int:channel id="otherMessageChannel1">
    <int:dispatcher task-executor="exec" />
</int:channel>

Обычно это предпочтительнее текущей настройки.

Другие вопросы по тегам