Наблюдаемый оператор ZIP зависает при использовании http-клиента vertx
Что я делаю:
Я использую http-клиент vertx rx для выполнения большого количества HTTP-запросов. в этом конкретном случае я вызываю "метод А", который возвращает список идентификаторов. чтобы получить все идентификаторы, мне нужно несколько раз вызвать метод A, чтобы получить следующий пакет результатов. (каждый раз, когда я указываю другой номер страницы, которую я хочу получить)
Чтобы улучшить производительность и как можно больше выполнять параллельные вызовы, я создаю список (RxJava) объектов Observables, каждый из которых представляет результат запроса одной страницы. когда я закончу создавать этот список, я вызываю оператор Obserable.zip и передаю список наблюдаемых.
Проблема:
При использовании http-клиента vertx без специальных настроек все работает, но довольно медленно. например, 3000 http запросов обрабатываются за 5 минут.
Я попытался улучшить производительность, установив параметры http-клиента vertx следующим образом:
HttpClientOptions options = new HttpClientOptions();
options.setMaxPoolSize(50)
.setKeepAlive(true)
.setPipelining(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(25)
.setMaxWaitQueueSize(10000);
но когда я делаю это, я получаю нестабильные результаты: иногда все работает нормально, и я могу получить все ответы менее чем за 20 секунд. однако иногда внешний сервер, который я все вызываю, закрывает соединение, и в журнале отображается следующая ошибка:
io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
- Нет обработчик ошибок в моем коде не называется
- При появлении этой ошибки оператор zip зависает
Вот код, который создает запрос HttpClientRequest
public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
});
return bufferObservable;
}
private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
final long requestId = reqNumber.getAndIncrement();
if (bodyData != null) {
request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
}
request.putHeader("Accept-Encoding", "gzip,deflate");
Observable<HttpRestResponse> retVal = request.toObservable()
.doOnError(throwable -> {
logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
}).doOnNext(response -> {
if (response != null) {
logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
}
}).flatMap(httpClientResponse -> {
try {
if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
.reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));
return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
}
} catch (Exception e) {
logger.error("error in RestHttpClient", e);
}
return Observable.just(new HttpRestResponse(null, httpClientResponse));
});
retVal.subscribe(subscriber);
if (bodyData != null) {
request.end(bodyData); // write post data
} else {
request.end();
}
}
asdasdasd
2 ответа
Если вы думаете, что можете подключить свою логику исключений следующим образом
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
Вы не получите никакой ошибки, потому что по существу вся ваша обработка сейчас находится в Rx-экосистеме и, следовательно, вам ничего не будет сообщено здесь, в ваших блоках try catch.
Ошибки с этого момента будут приходить к вам от вашего
bufferObservable.onErrorReturn()
или же
bufferObservable.subscribe(success, error)
Итак, наконец, я понял это. похоже, что я передал метод Observable.zip пустой список...
проблема здесь в том, что onNext или onError не вызываются для возвращаемого наблюдаемого объекта метода "zip". в таком случае вызывается только onComplete, который я не удосужился создать обработчик для...
Большое спасибо всем, кто интересовался и пытался помочь.
Янов