Отключить фильтр в весеннем облачном шлюзе и не пересылать запрос
Я использую Spring Cloud Gateway для создания Api Gateway. В настоящее время я сопоставляю 10 REST-сервисов и при получении ответа отправляю тело запроса в Kafka Stream.
У меня есть 2 глобальных фильтра. Один фильтр используется для отправки (или нет) запроса в нашу внутреннюю службу.
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if (enabled) {
logger.debug("Service switch is enabled. The Request if forwarded to the service.");
return chain.filter(exchange);
} else {
logger.debug("Service switch is disabled. A 404 error is returned.");
exchange.getResponse().setStatusCode(HttpStatus.NOT_FOUND);
return exchange.getResponse().writeWith(Mono.just(exchange.getResponse().bufferFactory().allocateBuffer(0)));
}
}
Второй фильтр используется для отправки тела запроса в Kafka.
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
CachedServerHttpRequestDecorator request = new CachedServerHttpRequestDecorator(exchange.getRequest());
ServerHttpResponseDecorator response = new ServerHttpResponseDecorator(exchange.getResponse()) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
try {
customerProducer.sendMessage("1234", request.getCachedBody());
super.setStatusCode(OK);
return super.writeWith(body);
} catch (Exception ex) {
throw new DataLakeException(ex);
}
}
};
if (enabled) {
LOGGER.debug("Datalake switch is enabled. The request body will be send to the datalake");
return chain.filter(exchange.mutate().request(request).response(response).build());
} else {
LOGGER.debug("Datalake switch is disabled. Nothing is sent to the datalake.");
return chain.filter(exchange);
}
}
Объект CachedServerHttpRequestDecorator является декоратором запросов, который кэширует содержимое тела, когда оно читается.
открытый класс CachedServerHttpRequestDecorator extends ServerHttpRequestDecorator {
private static final Logger LOGGER = LoggerFactory.getLogger(CachedServerHttpRequestDecorator.class);
private StringBuilder cached = new StringBuilder();
public CachedServerHttpRequestDecorator(ServerHttpRequest delegate) {
super(delegate);
}
@Override
public Flux<DataBuffer> getBody() {
return super.getBody()
//.publishOn(Schedulers.single())
.doOnNext(this::cache)
.doOnComplete(() -> LOGGER.debug("Request cached : {}", cached.toString()));
}
public String getCachedBody() {
return cached.toString();
}
private void cache(DataBuffer dataBuffer) {
byte[] data = new byte[dataBuffer.readableByteCount()];
dataBuffer.asByteBuffer().get(data);
String chunk = new String(data, StandardCharsets.UTF_8);
LOGGER.debug("Receiving a message chunk : {}", chunk);
cached.append(chunk);
}
}
Моя проблема возникает в первом фильтре, когда я использую случай, когда не хочу направлять запрос в службу отдыха. В этом случае тело запроса никогда не читается, поэтому во втором фильтре я отправляю пустое сообщение Кафке.
Когда я пытаюсь прочитать тело в моем первом фильтре с помощью что-то вроде exchange.getRequest.getBody.subscribe(), он читает только первый фрагмент моего сообщения. Мое сообщение может быть очень большим, поэтому оно делится на 10 и более частей.
Поэтому мой вопрос заключается в том, как я могу прочитать все мое сообщение вручную, не направляя запрос в службу REST?