Отключить фильтр в весеннем облачном шлюзе и не пересылать запрос

Я использую Spring Cloud Gateway для создания Api Gateway. В настоящее время я сопоставляю 10 REST-сервисов и при получении ответа отправляю тело запроса в Kafka Stream.

У меня есть 2 глобальных фильтра. Один фильтр используется для отправки (или нет) запроса в нашу внутреннюю службу.

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    if (enabled) {
        logger.debug("Service switch is enabled. The Request if forwarded to the service.");
        return chain.filter(exchange);
    } else {
        logger.debug("Service switch is disabled. A 404 error is returned.");
        exchange.getResponse().setStatusCode(HttpStatus.NOT_FOUND);
        return exchange.getResponse().writeWith(Mono.just(exchange.getResponse().bufferFactory().allocateBuffer(0)));
    }
}

Второй фильтр используется для отправки тела запроса в Kafka.

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    CachedServerHttpRequestDecorator request = new CachedServerHttpRequestDecorator(exchange.getRequest());

    ServerHttpResponseDecorator response = new ServerHttpResponseDecorator(exchange.getResponse()) {
        @Override
        public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
            try {
                customerProducer.sendMessage("1234", request.getCachedBody());
                super.setStatusCode(OK);
                return super.writeWith(body);
            } catch (Exception ex) {
                throw new DataLakeException(ex);
            }
        }
    };

    if (enabled) {
        LOGGER.debug("Datalake switch is enabled. The request body will be send to the datalake");
        return chain.filter(exchange.mutate().request(request).response(response).build());
    } else {
        LOGGER.debug("Datalake switch is disabled. Nothing is sent to the datalake.");
        return chain.filter(exchange);
    }
}

Объект CachedServerHttpRequestDecorator является декоратором запросов, который кэширует содержимое тела, когда оно читается.

открытый класс CachedServerHttpRequestDecorator extends ServerHttpRequestDecorator {

private static final Logger LOGGER = LoggerFactory.getLogger(CachedServerHttpRequestDecorator.class);

private StringBuilder cached = new StringBuilder();

public CachedServerHttpRequestDecorator(ServerHttpRequest delegate) {
    super(delegate);
}

@Override
public Flux<DataBuffer> getBody() {
    return super.getBody()
            //.publishOn(Schedulers.single())
            .doOnNext(this::cache)
            .doOnComplete(() -> LOGGER.debug("Request cached : {}", cached.toString()));
}

public String getCachedBody() {
    return cached.toString();
}

private void cache(DataBuffer dataBuffer) {
    byte[] data = new byte[dataBuffer.readableByteCount()];
    dataBuffer.asByteBuffer().get(data);
    String chunk = new String(data, StandardCharsets.UTF_8);
    LOGGER.debug("Receiving a message chunk : {}", chunk);
    cached.append(chunk);
}

}

Моя проблема возникает в первом фильтре, когда я использую случай, когда не хочу направлять запрос в службу отдыха. В этом случае тело запроса никогда не читается, поэтому во втором фильтре я отправляю пустое сообщение Кафке.

Когда я пытаюсь прочитать тело в моем первом фильтре с помощью что-то вроде exchange.getRequest.getBody.subscribe(), он читает только первый фрагмент моего сообщения. Мое сообщение может быть очень большим, поэтому оно делится на 10 и более частей.

Поэтому мой вопрос заключается в том, как я могу прочитать все мое сообщение вручную, не направляя запрос в службу REST?

0 ответов

Другие вопросы по тегам