Тупик с помощью Aggregator + Redis
Этот пост связан с этим
Spring тупик интеграции с помощью Aggregator + MessageStoreReaper + Redis?
но это сообщение слишком длинное для публикации. Я продолжаю с оригинальным постом
Я обновился до последней версии Java 7 1.7.0_60-b19, но проблема все еще существует.
Я сделал еще один дамп потока и обнаружил ту же проблему: все DefaultMessageListenerContainers (количество 20) заблокированы taskScheduler (entityScheduler-3) в вызове блокировки AbstractCorrelatingMessageHandler.
Вот конфигурация планировщика и агрегатора:
<task:scheduled-tasks scheduler="entityScheduler">
<task:scheduled ref="entityReaperBean" method="run" fixed-rate="${entity.aggregator.timeout}" />
</task:scheduled-tasks>
<bean id="entityReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="entityAggregatorRedisMessageStore" />
<property name="timeout" value="${entity.aggregator.reaper.timeout}" />
</bean>
<bean id="entityAggregatorRedisMessageStore"
class="org.springframework.integration.redis.store.RedisMessageStore">
<constructor-arg ref="entityRedisConnectionFactory" />
</bean>
<int:aggregator id="entityAggregator" send-partial-result-on-expiry="true"
release-strategy-expression="size() >= ${transactions-receiver.batch_size}" correlation-strategy-expression="payload.entityCode"
expire-groups-upon-completion="true" message-store="entityAggregatorRedisMessageStore" />
Вот раздел дампа потока:
"entityScheduler-3" - Thread t@188
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <545079d6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- locked <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
"DefaultMessageListenerContainer-5" - Thread t@48
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "entityScheduler-3" t@188
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy16.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy17.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy18.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:148)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
Возможно ли, что по какой-то причине метод handleMessageInternal объекта AbstractCorrelatingMessageHandler не разблокируется? Я использую хранилище сообщений Redis и вижу, что MsgStoreReaper также использует блокировку. Может ли это быть проблемой?
Большое спасибо за вашу помощь здесь! С уважением Гусман
ОБНОВИТЬ:
Я нашел это сообщения в журнале:
2014-07-23 22:47:26,967 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.
2014-07-23 22:47:26,968 ERROR RedisMessageStore:122 - Exception in expiry callback
2014-07-23 22:47:26,971 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.
но не исключение, ни отслеживание стека. Ошибка TaskUtils вам что-то говорит?
1 ответ
На самом деле, я вижу одно место...
finally {
if (removeGroup) {
this.remove(group);
}
lock.unlock();
}
... если хранилище сообщений выдает исключение во время удаления, мы бы пропустили разблокировку - вы видите что-нибудь в журнале?