Ошибка обратного давления интеграции пружины при разделении большого потока
Целью является потоковая передача большого файла json.gz (4 ГБ сжатых, около 12 ГБ несжатых, 12 миллионов строк) с веб-сервера напрямую в базу данных без локальной загрузки. Поскольку исходящий шлюз интеграции Spring не поддерживает формат gzip, я делаю это сам, используя okhttp, который автоматически распаковывает ответ:
body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);
Flux<String> flux = Flux.fromStream(br.lines())
.onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
.doAfterTerminate(() -> closeQuietly(closeables))
.doOnError(t -> log.error(...))
В потоке интеграции:
.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())
Но
2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException:
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1];
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...),
failedMessage=...}]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
Выходной канал подключается во время выполнения, используя неограниченную очередь, опрашиваемую мостом. Это облегчает тестирование, так что очередь может быть заменена DirectChannel
для тестирования.
@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow srcToSinkBridge() {
return IntegrationFlows.from(streamingOutputChannel())
.bridge(e -> e.poller(Pollers.fixedDelay(500)))
.channel(repositoryInputChannel())
.get();
}
У меня есть пара сомнений здесь.
- Я не уверен, что динамическое связывание с использованием SPEL в имени компонента работает, но я не знаю, как это проверить.
- Поскольку очередь не ограничена, я могу думать только о том, что опрос не достаточно быстрый. Тем не менее, исключение предполагает, что сплиттер имеет проблемы с продолжением работы.
1 ответ
Проблема заключается в log
заявление! Он использует прослушивание, чтобы изменить выходной канал сплиттера на DirectChannel
что портит логику AbstractMessageSplitter.
boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel;
Цитируя документ:
Начиная с версии 5.0, ... если выходной канал Splitter является экземпляром ReactiveStreamsSubscribeableChannel, AbstractMessageSplitter создает результат потока вместо итератора, а выходной канал подписывается на этот поток для разделения на основе обратного давления по требованию потока в нисходящем направлении.
Рабочий код такой, как показано ниже: простое перемещение оператора log сразу после сплиттера до конца решило проблему с обратным давлением:
IntegrationFlows.from(inputChannel)
.filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER))
.handle(new GzipToFluxTransformer(...))
.transform((Flux<String> payload) -> payload
.onBackpressureBuffer(getOnBackpressureBufferSize(),
s -> log.error("Buffer overrun!")))
.split()
.channel(c -> c.flux(outputChannel))
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.get();
Я открыл выпуск 2302 о весенней интеграции GitHub.