Ошибка обратного давления интеграции пружины при разделении большого потока

Целью является потоковая передача большого файла json.gz (4 ГБ сжатых, около 12 ГБ несжатых, 12 миллионов строк) с веб-сервера напрямую в базу данных без локальной загрузки. Поскольку исходящий шлюз интеграции Spring не поддерживает формат gzip, я делаю это сам, используя okhttp, который автоматически распаковывает ответ:

body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);

Flux<String> flux = Flux.fromStream(br.lines())
    .onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
    .doAfterTerminate(() -> closeQuietly(closeables))
    .doOnError(t -> log.error(...))

В потоке интеграции:

.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())

Но

2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1]; 
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...), 
failedMessage=...}]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)

Выходной канал подключается во время выполнения, используя неограниченную очередь, опрашиваемую мостом. Это облегчает тестирование, так что очередь может быть заменена DirectChannel для тестирования.

@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow srcToSinkBridge() {
    return IntegrationFlows.from(streamingOutputChannel())
        .bridge(e -> e.poller(Pollers.fixedDelay(500)))
        .channel(repositoryInputChannel())
        .get();
}

У меня есть пара сомнений здесь.

  1. Я не уверен, что динамическое связывание с использованием SPEL в имени компонента работает, но я не знаю, как это проверить.
  2. Поскольку очередь не ограничена, я могу думать только о том, что опрос не достаточно быстрый. Тем не менее, исключение предполагает, что сплиттер имеет проблемы с продолжением работы.

1 ответ

Решение

Проблема заключается в log заявление! Он использует прослушивание, чтобы изменить выходной канал сплиттера на DirectChannel что портит логику AbstractMessageSplitter.

boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel;

Цитируя документ:

Начиная с версии 5.0, ... если выходной канал Splitter является экземпляром ReactiveStreamsSubscribeableChannel, AbstractMessageSplitter создает результат потока вместо итератора, а выходной канал подписывается на этот поток для разделения на основе обратного давления по требованию потока в нисходящем направлении.

Рабочий код такой, как показано ниже: простое перемещение оператора log сразу после сплиттера до конца решило проблему с обратным давлением:

IntegrationFlows.from(inputChannel)
.filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER))
.handle(new GzipToFluxTransformer(...))
.transform((Flux<String> payload) -> payload
        .onBackpressureBuffer(getOnBackpressureBufferSize(),
                s -> log.error("Buffer overrun!")))
.split()
.channel(c -> c.flux(outputChannel))
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.get();

Я открыл выпуск 2302 о весенней интеграции GitHub.