Spring WebFlux: тестирование событий отправки сервером WebClient

На самом деле я даже не уверен, возможно ли сделать то, что я хочу: я использую поток событий (Event.class) от брокера (MQTT) с помощью Spring Integration и хотите перенаправить поток в другой микросервис. Оба сервиса должны быть горизонтально масштабируемыми. Таким образом, в другом сервисе у меня есть конечная точка POST, за исключением Flux<Event> как тело (используя Spring Cloud Функции: spring-cloud-starter-function-webflux). В службе пересылки я хочу использовать ленту и WebClient,

Что у меня пока (упрощенно):

@Configuration
public class EventFlowConfiguration{ 

@Autowired
private Webclient webClient;

@Bean
public MessageProducerSupport inboundAdapter() {
    final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MqttAsyncClient.generateClientId(), this.mqttClientFactory, "/topic");
    adapter.setConverter(new DefaultPahoMessageConverter());
    return adapter;
}

@Bean
Publisher<Message<Event>> eventFlow() {
    return IntegrationFlows
            .from(inboundAdapter())
            .handle((payload, header) -> this.eventHandler((String) payload))
            .toReactivePublisher();
}

private Event eventHandler(String payload) {
    // parsing, store in database ...
    return event;
}

@PostConstruct
public void setTrigger() {
    buildTrigger(webClient, "/receiveEventStream", eventFlow(), Event.class);
}

private <T, E extends Event> void buildTrigger(WebClient webClient, String uri, Publisher<Message<T>> publisher, Class<E> eventClass) {
    webClient
            .post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(
                    Flux.from(publisher)
                            .retry()
                            .map(Message::getPayload)
                    ,
                    eventClass
            )
            .retrieve()
            .bodyToMono(Void.class)
            .retry()
            .subscribe();
}
}

Мои вопросы:

(1) Есть ли у них шанс проверить это вообще? Как бы я это сделал? Я, что о OkHttp, WireMock, внутреннее приложение весенней загрузки в тесте, который имеет /eventReceiverEndpoint но только утверждает Тем не менее, на самом деле не получил нигде. На самом деле я даже не получаю ошибку. Я бы ожидал 4xx, так как нет сервера памяти при запуске.

(2) сделать два retry() вызовы действительно повторяют проблемы соединения с обеих сторон? Я теряю сообщения в случае отключения? Как я могу это проверить?

(3) Может ли лента в сочетании с этим долгоживущим Flux все еще выполнять балансировку нагрузки после установления соединения (например, когда нагрузка конечного потребителя достигает высокой или около того). Я полагаю, это должно было бы автоматически повторно подключить Flux к другому экземпляру?

0 ответов