Исключение тайм-аута сообщения асинхронного производителя Kafka
У нас есть приложение Springboot, которое использует spring-cloud-stream-binder-kafka для отправки сообщений в тему Kafka. Мы отправляем сообщение в синхронном режиме. Мы используем синхронный режим, потому что мы хотим выполнить некоторые действия, когда возникает ошибка во время отправка сообщения. Некоторое время это работало нормально. Мы даже провели нагрузочное тестирование со 100 тыс. Сообщений в час, и у нас не было ни одного исключения TimeOut. Но в последнее время мы наблюдаем некоторые исключения из-за тайм-аута. Тайм-аут в основном происходит, когда мы отправляем одно сообщение. Интересная вещь, хотя мы получают исключение TimeOut, сообщение все еще доходит до темы Kafka. Как только мы получаем исключение тайм-аута, следующий набор сообщений проходит без тайм-аута. Исключение тайм-аута непоследовательно.
Код:
@Service
@EnableBinding(Source.class)
{
try
{
source .output().send(MessageBuilder.withPayLoad(JsonStringMessage).build();
}
catch(MessageTimeOutException ex)
{
log.error("Kafka MessageTimeOut");
}
}
application .yml :
spring:
cloud:
stream:
binders:
kafka1:
type: Kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: broker name
configuration:
sasl.jass.config: kafkaconnectionstring
ssl.mechanism: PLAIN
security.protocol: SASL_SSL
retries: 3
bindings:
output:
producer:
sync:true
bindings:
output:
binder: kafka1
destination: Kafka topic name
Версия Springboot: 2.1.5
Весна-облако-ручей-связыватель-кафка: 2.1.2
Детали исключения:
org.springframework.integration.MessageTimeOutException: Тайм-аут ожидания ответа от Kafkaproducer; вложенное исключение - java.util.concurrent.TimeoutException
Полный StackTrace:
"org.springframework.integration.MessageTimeoutException: Тайм-аут ожидания ответа от KafkaProducer; вложенное исключение - это java.util.concurrent.TimeoutException, org.springframework.integration.kafka.outbound.KafkaProducerMessageHandlerProducer.processSend.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:386)\ тат org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)\ тат org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)\tat org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)\tat org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)\tat org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatcher.integration.integration.integration: dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)\tat org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)\tat org.springframework.integration.Ascribennn (AbstractSpring. 73)\tat org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)\tat org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel..invoke(NativeMethodAccessorImpl.java:62)\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\tat java.lang.reflect.Method.invoke(Method.java:498)\tat org.springframework.web.metho.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)\tat org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java.forRequest.invokeAndHandle(ServletInvocableHandlerMethod.java:104)\tat org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.invokeHandlerMethod (RequestMappingHandlerAdapter.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)\tat org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)\tat org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039)\tat org.springframework.DletispatcherServlet: \tat org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)\tat org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908)\tletp javax.servlet. служба (HttpServlet.java:660)\tat org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)\tat javax.servlet.http.HttpServlet.service(HttpServlet. \ tatpServlet.service (HttpServlet.java:7g41).apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\tat org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\tat org.apache.catalina.core. catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\tat org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java orgrame.spring) \ tatpTraceFilter.java:90.spring.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\tat org.apache.catalina.core.Chalication.Application. \tat org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)\tat org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)\tat org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)\tat org.security.framework.framework. FilterChainProxy.java:334)\tat org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:119)\tat org.springframework.security.web.FilterChainProxy$.ChoinFilter: VirtualFilterChainProxy3. \tat org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)\tat org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(Filterjavaframeing).security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)\tat org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.ConfilderArtware.security.ConnectRequest. 170)\tat org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.web.savedrequest.RequestCacheAwareFilter.web.savedrequest.RequestCacheAwareFilter.doFilware: или springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.oauth2.provider.authentication.OAuth2AuthenticationProcessingFilter.doFilter.AuthenticationProcessingFilter.doFilter. Интернет.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)\tat org.security.framework.Project.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:74)\tat org.springframework.web.filter.Filter.OncePer10 \tat org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilistencefilter.context.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)\tat org.springframework.web.filter.OncePerRequestFilter10.java.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\tat org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)\tat org.security.doFilter.java:178)\tat org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)\tat org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilter2 org.java).catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\tat org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)\tat org.web.filter.java:99) \ tat org.spring.Framework(java:107)\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.javaat orramework.s) \ web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)\tat org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\tat org.apache.Catalation (Application) \ tat org.apache.Catal java:193)\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\tat org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)\tat org.springframework.web.filter.OncePerRequestFilter10.doFilter.OncePerRequestFilter10.doFilter.ua core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain.java:193) \ tat org.apache.catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain.java:166)\ tat org.springframework.vletMilm.webate.filterAndRecordMetrics(WebMvcMetricsFilter.java:117)\tat org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106.subscribe.com или FilterPilter.java:106.java:107)\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\tat org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\tat org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingframe.subscribe.com).filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\tat org.apache.catalina.FilterChain: Application(ApplicationFilterChain.FilterChain. 166)\ tat org.cloudfoundry.router.ClientCertificateMapper.doFilter (ClientCertificateMapper.java:77) \ tat org.apache.catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain.java:193.confilter (ApplicationFilterChain.java:193. ApplicationFilterChain.doFilter (ApplicationFilterChain.java:166)\ tat org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)\tat org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)\tat org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase)\tat org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)\tat org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)\tat org.apache.catalinaEngine.core.Standalve.invoke(StandardEngineValve.java:74)\tat org.apache.catalina.valves.RemoteIpValve.invoke(RemoteIpValve.java:679)\tat org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)\tat org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)\tat org.apache.coyote.AbstractProtocol. $Connection процесс (AbstractProtocol.java:836)\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1747)\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\