Наблюдаемый оператор ZIP зависает при использовании http-клиента vertx

Что я делаю:

Я использую http-клиент vertx rx для выполнения большого количества HTTP-запросов. в этом конкретном случае я вызываю "метод А", который возвращает список идентификаторов. чтобы получить все идентификаторы, мне нужно несколько раз вызвать метод A, чтобы получить следующий пакет результатов. (каждый раз, когда я указываю другой номер страницы, которую я хочу получить)

Чтобы улучшить производительность и как можно больше выполнять параллельные вызовы, я создаю список (RxJava) объектов Observables, каждый из которых представляет результат запроса одной страницы. когда я закончу создавать этот список, я вызываю оператор Obserable.zip и передаю список наблюдаемых.

Проблема:

При использовании http-клиента vertx без специальных настроек все работает, но довольно медленно. например, 3000 http запросов обрабатываются за 5 минут.

Я попытался улучшить производительность, установив параметры http-клиента vertx следующим образом:

 HttpClientOptions options = new HttpClientOptions();

 options.setMaxPoolSize(50)
        .setKeepAlive(true)
        .setPipelining(true)
        .setTcpKeepAlive(true)
        .setPipeliningLimit(25)
        .setMaxWaitQueueSize(10000);

но когда я делаю это, я получаю нестабильные результаты: иногда все работает нормально, и я могу получить все ответы менее чем за 20 секунд. однако иногда внешний сервер, который я все вызываю, закрывает соединение, и в журнале отображается следующая ошибка:

io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
  • Нет обработчик ошибок в моем коде не называется
  • При появлении этой ошибки оператор zip зависает

Вот код, который создает запрос HttpClientRequest

public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
        Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
            try {
                HttpClientRequest request = httpClient.postAbs(url);
                addHeadersToRequest(headers, request);
                sendRequest(url, subscriber, request, body);
            }catch (Exception e) {
                try {
                    subscriber.onError(e);
                }catch (Exception ex) {
                    logger.error("error calling onError for subscriber",ex);
                }finally {
                    subscriber.onCompleted();
                }
            }
        });
        return bufferObservable;
    }

private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
        final long requestId = reqNumber.getAndIncrement();

        if (bodyData != null) {
            request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
        }

        request.putHeader("Accept-Encoding", "gzip,deflate");

        Observable<HttpRestResponse> retVal = request.toObservable()
                .doOnError(throwable -> {
                    logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
                }).doOnNext(response -> {
                    if (response != null) {
                        logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
                    }
                }).flatMap(httpClientResponse -> {
                    try {
                        if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
                            Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
                                    .reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));

                            return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
                        }
                    } catch (Exception e) {
                        logger.error("error in RestHttpClient", e);
                    }

                    return Observable.just(new HttpRestResponse(null, httpClientResponse));
                });

        retVal.subscribe(subscriber);

        if (bodyData != null) {
            request.end(bodyData); // write post data
        } else {
            request.end();
        }
    }

asdasdasd

2 ответа

Если вы думаете, что можете подключить свою логику исключений следующим образом

 try {
            HttpClientRequest request = httpClient.postAbs(url);
            addHeadersToRequest(headers, request);
            sendRequest(url, subscriber, request, body);
        }catch (Exception e) {
            try {
                subscriber.onError(e);
            }catch (Exception ex) {
                logger.error("error calling onError for subscriber",ex);
            }finally {
                subscriber.onCompleted();
            }
        }

Вы не получите никакой ошибки, потому что по существу вся ваша обработка сейчас находится в Rx-экосистеме и, следовательно, вам ничего не будет сообщено здесь, в ваших блоках try catch.

Ошибки с этого момента будут приходить к вам от вашего

bufferObservable.onErrorReturn()

или же

bufferObservable.subscribe(success, error)

Итак, наконец, я понял это. похоже, что я передал метод Observable.zip пустой список...

проблема здесь в том, что onNext или onError не вызываются для возвращаемого наблюдаемого объекта метода "zip". в таком случае вызывается только onComplete, который я не удосужился создать обработчик для...

Большое спасибо всем, кто интересовался и пытался помочь.

Янов

Другие вопросы по тегам