Сделайте Http вызов с использованием ReactiveX для Java

Я новичок в ReactiveX для Java, и у меня есть следующий блок кода, который выполняет внешний вызов HTTP, но это не асинхронный. Мы используем rxjava 1.2 и Java 1.8

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

У меня есть следующий блок кода, который я нашел в Интернете, но я не мог полностью понять его и как я могу применить его к моей базе кода.

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }

1 ответ

Решение

Если я вас правильно понимаю, вам нужно что-то вроде этого, чтобы обернуть свои существующие callExternalUrl

static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

Краткое описание кода:

  1. Планирует выполнение существующих callExternalUrl на Schedulers.io
  2. Минимальное преобразование ResponseEntity<T> в успешном T и случаи ошибок. Это происходит на io Планировщик тоже, но это не важно, так как он действительно короткий. (Если было исключение внутри callExternalUrl, передается как есть.)
  3. Делает подписчика на результат, который будет выполнен на Schedulers.computation

Предостережения:

  1. Вы, вероятно, хотите использовать свои собственные планировщики для обоих subscribeOn а также observeOn
  2. Вы, вероятно, хотите иметь лучшую логику в первой лямбде flatMap чтобы различать успех и ошибку и определенно вам нужен более конкретный тип исключения.

Магия высшего порядка

Если вы хотите использовать функции более высокого порядка и тратить немного производительности на меньшее дублирование кода, вы можете сделать что-то вроде этого:

// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

куда MyClass где ты callExternalUrl является.


Обновление (только асинхронные вызовы)

приватная статическая RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // здесь вы можете передать пользовательский ExecutorService

private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

Снова аналогичные оговорки применяются:

  1. Вы, вероятно, хотите использовать свои собственные планировщики для обоих newClient позвонить и observeOn
  2. Возможно, вы хотите иметь лучшую логику для обработки ошибок, чем просто проверять, является ли это HTTP 200 или нет, и определенно вам нужен более конкретный тип исключения. Но это все специфично для бизнес-логики, так что решать вам.

Также из вашего примера не ясно, как именно тело запроса (HttpEntity) является ли сборка и всегда ли вы хотите String в качестве ответа, как это в вашем оригинальном примере. Тем не менее я просто повторил вашу логику как есть. Если вам нужно что-то еще, вам, вероятно, следует обратиться к документации по адресу https://jersey.java.net/documentation/2.25/media.html

Другие вопросы по тегам