Тупик с помощью Aggregator + Redis

Этот пост связан с этим

Spring тупик интеграции с помощью Aggregator + MessageStoreReaper + Redis?

но это сообщение слишком длинное для публикации. Я продолжаю с оригинальным постом

Я обновился до последней версии Java 7 1.7.0_60-b19, но проблема все еще существует.

Я сделал еще один дамп потока и обнаружил ту же проблему: все DefaultMessageListenerContainers (количество 20) заблокированы taskScheduler (entityScheduler-3) в вызове блокировки AbstractCorrelatingMessageHandler.

Вот конфигурация планировщика и агрегатора:

<task:scheduled-tasks scheduler="entityScheduler">
    <task:scheduled ref="entityReaperBean" method="run" fixed-rate="${entity.aggregator.timeout}" />
</task:scheduled-tasks>

<bean id="entityReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="entityAggregatorRedisMessageStore" />
    <property name="timeout" value="${entity.aggregator.reaper.timeout}" />
</bean>

<bean id="entityAggregatorRedisMessageStore"
    class="org.springframework.integration.redis.store.RedisMessageStore">
    <constructor-arg ref="entityRedisConnectionFactory" />
</bean>

<int:aggregator id="entityAggregator" send-partial-result-on-expiry="true"
        release-strategy-expression="size() >= ${transactions-receiver.batch_size}" correlation-strategy-expression="payload.entityCode"
        expire-groups-upon-completion="true" message-store="entityAggregatorRedisMessageStore" />

Вот раздел дампа потока:

"entityScheduler-3" - Thread t@188
 java.lang.Thread.State: WAITING
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <545079d6> (a    java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
 - locked <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

"DefaultMessageListenerContainer-5" - Thread t@48
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "entityScheduler-3" t@188
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy16.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy17.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy18.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:148)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)
at java.lang.Thread.run(Thread.java:745)

 Locked ownable synchronizers:
  - None

Возможно ли, что по какой-то причине метод handleMessageInternal объекта AbstractCorrelatingMessageHandler не разблокируется? Я использую хранилище сообщений Redis и вижу, что MsgStoreReaper также использует блокировку. Может ли это быть проблемой?

Большое спасибо за вашу помощь здесь! С уважением Гусман

ОБНОВИТЬ:

Я нашел это сообщения в журнале:

 2014-07-23 22:47:26,967 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.
 2014-07-23 22:47:26,968 ERROR RedisMessageStore:122 - Exception in expiry callback
 2014-07-23 22:47:26,971 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.

но не исключение, ни отслеживание стека. Ошибка TaskUtils вам что-то говорит?

1 ответ

Решение

На самом деле, я вижу одно место...

        finally {
            if (removeGroup) {
                this.remove(group);
            }
            lock.unlock();
        }

... если хранилище сообщений выдает исключение во время удаления, мы бы пропустили разблокировку - вы видите что-нибудь в журнале?

Другие вопросы по тегам