Spring тупик интеграции с помощью Aggregator + MessageStoreReaper + Redis?

Этот вопрос связан с этим сообщением на форуме СИ, но поскольку форум закрыт, я размещаю его здесь, чтобы продолжить обсуждение:

http://forum.spring.io/forum/spring-projects/integration/748192-messages-not-flowing-when-using-jms-channels

Подводя итог, у меня есть агрегатор с хранилищем сообщений Redis и жаткой, запланированной каждые 60 секунд. Сообщения отправляются в агрегатор с использованием JMS-канала. Вот конфиг:

<bean id="jedisPoolConfigBean" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxActive" value="10" />
    <property name="maxIdle" value="5" />
    <property name="minIdle" value="1" />
    <property name="testOnBorrow" value="true" />
    <property name="testOnReturn" value="true" />
    <property name="testWhileIdle" value="true" />
</bean>

<bean id="loyaltyRedisConnectionFactory"
    class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <property name="hostName" value="${redis.hostName}" />
    <property name="database" value="${redis.loyalty.database}" />
    <property name="port" value="${redis.port}" />
    <property name="poolConfig" ref="jedisPoolConfigBean" />
</bean>

<bean id="loyaltyAggregatorRedisMessageStore"
    class="org.springframework.integration.redis.store.RedisMessageStore">
    <constructor-arg ref="loyaltyRedisConnectionFactory" />
</bean>

<task:scheduler id="loyaltyScheduler" 
      pool-size="${loyalty.aggregator.reaper.pool_size}"/>

<task:scheduled-tasks scheduler="loyaltyScheduler">
    <task:scheduled ref="loyaltyReaperBean" method="run" fixed-rate="60000" />
</task:scheduled-tasks>

<bean id="loyaltyReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="loyaltyAggregatorRedisMessageStore" />
    <property name="timeout" value="30000" />
</bean>

Я немного новичок с дампами потоков, но насколько я вижу, потоки DefaultMessageListener застряли в taskScheduler, который запускает MessageReaper. В частности, в классе ReentrantLock.

Любая идея? Может быть, я должен сделать некоторые другие конфигурации, чтобы избежать этого

Любая помощь приветствуется

Заранее спасибо! Гусман

"loyaltyScheduler-1" - Thread t@57
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <47e54701> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

  Locked ownable synchronizers:
  - locked <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)


"DefaultMessageListenerContainer-6" - Thread t@73
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "loyaltyScheduler-1" t@57
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)

"DefaultMessageListenerContainer-7" - Thread t@80
 java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "loyaltyScheduler-1" t@57
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)

"DefaultMessageListenerContainer-8" - Thread t@83
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <368788cf> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "loyaltyScheduler-1" t@57
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)

0 ответов

Другие вопросы по тегам