Запуск интегратора Spring с неправильным интервалом времени
Код: https://github.com/giuliopulina/spring-integration-poller
У меня возникла проблема при попытке создать JDBC Poller с интеграцией Spring.
Когда я добавляю в таблицу новые данные, обработка идет медленнее, чем ожидалось: все работает нормально, за исключением того факта, что опрос запускается каждые 60 секунд, и я не могу понять, почему.
2015-05-27 10: 50: 40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Разрешенное выражение #root.![Pk] to (список pks)
2015-05-27 10: 51: 40,234 DEBUG ExpressionEvaluatingSqlParameterSourceFactory - Разрешенное выражение #root.![Pk] to (список pks)
Это соответствующая часть конфигурации xml интеграции пружины:
<task:executor id="pollerPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>
<!--<task:executor id="processingPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/> -->
<bean id="jdbcSource" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
<constructor-arg ref="dataSource"/>
<constructor-arg value="XXXXXXXXXXXXXX"/>
<property name="updateSql" value="XXXXXXXXXXXXXXXX"/>
<property name="maxRowsPerPoll" value="50"/>
</bean>
<int:inbound-channel-adapter send-timeout="10000" auto-startup="false" id="inboundAdapter" ref="jdbcSource" channel="jdbcOutputChannel">
<int:poller receive-timeout="3000" time-unit="MILLISECONDS" fixed-rate="0" error-channel="errorChannel" task-executor="pollerPool">
<int:advice-chain>
<ref bean="threadPrepareInterceptor"/>
<ref bean="txAdvice"/>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<int:service-activator id="serviceActivator" input-channel="jdbcOutputChannel" ref="someServiceActivatorBean"/>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
<int:channel id="jdbcOutputChannel" >
<!-- using direct channel -->
<!--<int:dispatcher task-executor="processingPool"/>-->
</int:channel>
Не могли бы вы помочь мне понять проблему?
ОБНОВИТЬ:
По поводу предложения "jdbcOutputChannel" о транзакциях я согласен и изменил свою конфигурацию в соответствии с вашей подсказкой, поскольку она более чистая (во всяком случае, активатор службы также выполнялся в отдельной транзакции, даже если это не было указано в примере xml).
Что касается проблемы, с которой я столкнулся, я попытался удалить все остальные компоненты интеграции Spring, и программа опроса непрерывно запускалась, как я и ожидала (я знаю, что значение fixed-rate=0 слишком велико:)) Вместо этого, когда в проекте настроены другие программы опроса вот так и мой поллер, похоже, наследует тот же тайм-аут:
<int:service-activator id="someOtherServiceActivator">
<int:poller fixed-rate="0" error-channel="someOtherPollerErrorChannel" receive-timeout="60000" />
</int:service-activator>
Переключение тайм-аута других опрошенных на 10000 мс, также мой поллер срабатывает каждые 10 секунд (вместо 60). Я не могу поделиться полной конфигурацией интеграции с пружиной, но я хотел бы спросить: может быть, полностью разделенный поллер может изменять поведение друг друга?
ОБНОВЛЕНИЕ 2: Я создал отдельный проект, пытаясь воспроизвести проблему, но все же мне не удалось это сделать. Итак, я попытался удалить следующую конфигурацию, которая была введена для того, чтобы запускать опросы только тогда, когда приложение полностью запущено и работает:
<int:publish-subscribe-channel id="startupChannel" />
<int:control-bus input-channel="controlBusChannel" />
<int-event:inbound-channel-adapter channel="startupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:transformer input-channel="startupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="startupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
И проблема исчезла, даже если это я могу полностью понять причину. В любом случае, создание другого startupChannel для моего опроса работает отлично:
<int:publish-subscribe-channel id="globalStartupChannel" />
<int:publish-subscribe-channel id="myStartupChannel" />
<int:control-bus input-channel="controlBusChannel" />
<int-event:inbound-channel-adapter channel="globalStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int-event:inbound-channel-adapter channel="myStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:transformer input-channel="globalStartupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="myStartupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
ОБНОВЛЕНИЕ 3:
При подготовке проекта с кодом для вас я заметил следующий журнал:
Информация: Ни один бин с именем taskScheduler не был явно определен. Поэтому будет создан ThreadPoolTaskScheduler по умолчанию.
Итак, я добавил следующую конфигурацию, и теперь все работает нормально:
<task:scheduler id="taskScheduler" pool-size="20" />
Я предполагаю, что по умолчанию размер пула равен 10, поэтому в некотором роде конфигурация перезаписывается при наличии totalNumberOfPollers > taskScheduler.size(). Я прав?
Спасибо Джулио
1 ответ
Я не могу воспроизвести вашу ситуацию; Я предлагаю вам сделать дамп потока между опросами, чтобы посмотреть, что этот поток делает.
Тем не менее, fixed-rate
из 0 невероятно агрессивен; Ваши администраторы базы данных, скорее всего, без задержек проведут подобный опрос.
Также, jdbcOutputChannel
будучи ExecutorChannel
, означает, что транзакция будет зафиксирована сразу после отправки сообщения на этот канал. Если вы хотите, чтобы поток выполнялся в транзакциях, вы не должны использовать диспетчер здесь.
РЕДАКТИРОВАТЬ:
Я до сих пор не могу воспроизвести вашу ситуацию с этим...
<int:control-bus input-channel="input"/>
<int-event:inbound-channel-adapter channel="ps" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int:publish-subscribe-channel id="ps" />
<int:transformer input-channel="ps" output-channel="input" expression="'@foo.start()'" />
<int:transformer input-channel="ps" output-channel="input" expression="'@sa.start()'" />
<int:inbound-channel-adapter id="foo" channel="bar" expression="'foo'" auto-startup="false">
<int:poller fixed-rate="1000" />
</int:inbound-channel-adapter>
<int:channel id="bar">
<int:queue />
</int:channel>
<int:service-activator id="sa" input-channel="bar" output-channel="baz" auto-startup="false"
expression="payload.toUpperCase()">
<int:poller fixed-rate="6000" receive-timeout="0" />
</int:service-activator>
<int:logging-channel-adapter id="baz" level="ERROR"/>
... как и ожидалось, я вижу 6 FOO
каждые 6 секунд (i-c-a
опрашивается один раз в секунду, а sa запускается один раз каждые 6 секунд).
EDIT2:
Я посмотрел на ваш проект, и основная причина вашей проблемы, как вы говорите, во многих опрошенных конечных точках, но на самом деле, это так:
fixed-rate="0" receive-timeout="60000"
При такой конфигурации ресурсы планировщика (потоки) блокируются в QueueChannel
и, как вы обнаружили, вы исчерпали все ресурсы.
Одним из решений является увеличение количества потоков в пуле планировщика.
В этой конфигурации кажется, что вы пытаетесь получить по запросу обмен сообщениями с нулевой задержкой с помощью опроса, когда опрашивающий постоянно ожидает в очереди метод receive().
Если вы не можете позволить себе задержку, подумайте об использовании DirectChannel
вместо. Если вы не хотите, чтобы нижняя конечная точка работала в потоке вызывающего, используйте ExecutorChannel
s...
<task:executor id="exec" pool-size="100"/>
<int:channel id="otherMessageChannel1">
<int:dispatcher task-executor="exec" />
</int:channel>
Обычно это предпочтительнее текущей настройки.