Spring WebFlux: тестирование событий отправки сервером WebClient
На самом деле я даже не уверен, возможно ли сделать то, что я хочу: я использую поток событий (Event.class
) от брокера (MQTT) с помощью Spring Integration и хотите перенаправить поток в другой микросервис. Оба сервиса должны быть горизонтально масштабируемыми. Таким образом, в другом сервисе у меня есть конечная точка POST, за исключением Flux<Event>
как тело (используя Spring Cloud Функции: spring-cloud-starter-function-webflux
). В службе пересылки я хочу использовать ленту и WebClient
,
Что у меня пока (упрощенно):
@Configuration
public class EventFlowConfiguration{
@Autowired
private Webclient webClient;
@Bean
public MessageProducerSupport inboundAdapter() {
final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MqttAsyncClient.generateClientId(), this.mqttClientFactory, "/topic");
adapter.setConverter(new DefaultPahoMessageConverter());
return adapter;
}
@Bean
Publisher<Message<Event>> eventFlow() {
return IntegrationFlows
.from(inboundAdapter())
.handle((payload, header) -> this.eventHandler((String) payload))
.toReactivePublisher();
}
private Event eventHandler(String payload) {
// parsing, store in database ...
return event;
}
@PostConstruct
public void setTrigger() {
buildTrigger(webClient, "/receiveEventStream", eventFlow(), Event.class);
}
private <T, E extends Event> void buildTrigger(WebClient webClient, String uri, Publisher<Message<T>> publisher, Class<E> eventClass) {
webClient
.post()
.uri(uri)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(
Flux.from(publisher)
.retry()
.map(Message::getPayload)
,
eventClass
)
.retrieve()
.bodyToMono(Void.class)
.retry()
.subscribe();
}
}
Мои вопросы:
(1) Есть ли у них шанс проверить это вообще? Как бы я это сделал? Я, что о OkHttp, WireMock, внутреннее приложение весенней загрузки в тесте, который имеет /eventReceiverEndpoint
но только утверждает Тем не менее, на самом деле не получил нигде. На самом деле я даже не получаю ошибку. Я бы ожидал 4xx, так как нет сервера памяти при запуске.
(2) сделать два retry()
вызовы действительно повторяют проблемы соединения с обеих сторон? Я теряю сообщения в случае отключения? Как я могу это проверить?
(3) Может ли лента в сочетании с этим долгоживущим Flux все еще выполнять балансировку нагрузки после установления соединения (например, когда нагрузка конечного потребителя достигает высокой или около того). Я полагаю, это должно было бы автоматически повторно подключить Flux к другому экземпляру?