Поведение Spring WebFlux Flux с не потоковым приложением /json

Я оцениваю использование Spring Webflux, но мы должны поддерживать клиентов, которые ожидают приложения / JSON, а не приложения / потока + JSON. Мне неясно, как Spring WebFlux обрабатывает сериализацию Flux в случае клиента, которому требуется application / json.

Если Flux сериализуется как application / json, а не application / stream + json, это блокирующая операция?

Ниже я собрал образец контроллера, чтобы продемонстрировать, что я вижу. Когда поток бесконечен и создает application / json, браузер ничего не возвращает. Это кажется разумным, так как он, вероятно, ожидает завершения потока. Когда поток бесконечен и создает application / stream + json, я постоянно вижу объекты JSON в браузере, как и ожидалось. Когда Flux конечен, скажем, на 100 элементов, а тип - application / json, он рендерится, как и ожидалось, одновременно. Вопрос в том, нужно ли ждать завершения потока, прежде чем сериализовать, и вызывает ли это операцию блокировки. Каковы последствия для производительности и масштабируемости с помощью Flux при возврате обычного приложения / json?

@RestController
public class ReactiveController {

    /* Note: In the browser this sits forever and never renders */
    @GetMapping(path = "/nonStreaming", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Person> getPeopleNonStreaming() {
        return Flux.interval(Duration.ofMillis(100))
                .map(tick -> new Person("Dude", "Dude", tick));
    }

    /* Note: This renders in the browser in chunks forever */
    @GetMapping(path = "/streaming", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> getPeopleStreaming() {
        return Flux.interval(Duration.ofMillis(100))
                .map(tick -> new Person("Dude", "Dude", tick));
    }

    /* Note: This returns, but I can't tell if it is done in a non blocking manner. It
     * appears to gather everything before serializing. */
    @GetMapping(path = "/finiteFlux", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Person> finiteFlux() {
        return Flux.range(0, 100)
                .map(tick -> new Person("Dude", "Dude", tick));
    }
}

ОБНОВИТЬ:

Я добавил дополнительную информацию о регистрации ниже:

Потоковая передача, кажется, использует два разных потока

2019-02-13 16:53:07.363 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter    : [dac80fd4] HTTP GET "/streaming"
2019-02-13 16:53:07.384 DEBUG 3416 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : [dac80fd4] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.getPeopleStreaming()
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/stream+json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/stream+json]
2019-02-13 16:53:07.398 DEBUG 3416 --- [ctor-http-nio-2] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [dac80fd4] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:53:07.532 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@6b3e843d]
2019-02-13 16:53:07.566 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json;q=0.8;charset=UTF-8
2019-02-13 16:53:07.591 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.629 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@217d62db]
2019-02-13 16:53:07.630 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.732 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@741c0c88]
2019-02-13 16:53:07.732 DEBUG 3416 --- [ctor-http-nio-2] r.n.channel.ChannelOperationsHandler     : [id: 0xdac80fd4, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52398] Writing object 
2019-02-13 16:53:07.832 DEBUG 3416 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [dac80fd4] Encoding [io.jkratz.reactivedemo.Person@7b8532e5]

В то время как конечно с JSON используется только один поток.

2019-02-13 16:55:34.431 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5b048f46] HTTP GET "/finiteFlux"
2019-02-13 16:55:34.432 DEBUG 3416 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : [5b048f46] Mapped to public reactor.core.publisher.Flux<io.jkratz.reactivedemo.Person> io.jkratz.reactivedemo.ReactiveController.finiteFlux()
2019-02-13 16:55:34.434 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/json;q=0.8' given [text/html, application/xhtml+xml, image/webp, image/apng, application/xml;q=0.9, */*;q=0.8] and supported [application/json]
2019-02-13 16:55:34.435 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [5b048f46] 0..N [io.jkratz.reactivedemo.Person]
2019-02-13 16:55:34.439 DEBUG 3416 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonEncoder  : [5b048f46] Encoding [[io.jkratz.reactivedemo.Person@425c8296, io.jkratz.reactivedemo.Person@22ae73df, io.jkratz.reactived (truncated)...]
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json;q=0.8;charset=UTF-8
2019-02-13 16:55:34.448 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object 
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter    : [5b048f46] Completed 200 OK
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Last HTTP response frame
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Writing object EmptyLastHttpContent
2019-02-13 16:55:34.450 DEBUG 3416 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] Decreasing pending responses, now 0
2019-02-13 16:55:34.451 DEBUG 3416 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler     : [id: 0x5b048f46, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:52991] No ChannelOperation attached. Dropping: EmptyLastHttpContent

1 ответ

Решение

В случае потокового миметипа (application/stream+json), кодек JSON, настроенный по умолчанию в Spring WebFlux, будет сериализован в JSON и сбрасывает в сети каждый элемент Flux вход. Такое поведение удобно, когда поток бесконечен или когда вы хотите отправить информацию клиенту, как только она станет доступной. Обратите внимание, что это приводит к снижению производительности, поскольку вызов сериализатора и очистка несколько раз потребляют ресурсы.

В случае не потокового типа (application/json), кодек JSON, сконфигурированный по умолчанию в Spring WebFlux, будет сериализован в JSON и мгновенно подключится к сети. Это буферизует Flux<YourObject> в памяти и сериализовать его за один проход. Это не означает, что операция блокируется, поскольку Flux<Databuffer> написано в реактивном ключе к сети. здесь ничего не блокируется.

Это всего лишь компромисс между "потоковой передачей данных и использованием большего количества ресурсов" по ​​сравнению с "буферизацией и более эффективным использованием ресурсов".

В случае потоковой передачи вещи с большей вероятностью будут обрабатываться разными рабочими потоками, поскольку рабочие элементы доступны через разные интервалы. В случае простого ответа JSON - он может обрабатываться также одним или несколькими потоками: это зависит от размера полезной нагрузки, если удаленный клиент медленный или нет.

Кажется, что вся магия происходит внутри метода кодирования AbstractJackson2Encoder#. Это код для обычногоapplication/json сериализация:

// non-streaming

return Flux.from(inputStream)
            .collectList() // make Mono<List<YourClass>> from Flux<YourClass>
            .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) // serialize list to JSON and write to DataBuffer
            .flux(); // get Flux<DataBuffer> from Mono<DataBuffer>

Итак, да, он ждет завершения Flux перед сериализацией.

Улучшения производительности сомнительны, потому что он всегда должен ждать сериализации всех данных. Так что особой разницы между Flux и обычнымList на случай, если application/json тип СМИ

Другие вопросы по тегам