Сделайте Http вызов с использованием ReactiveX для Java
Я новичок в ReactiveX для Java, и у меня есть следующий блок кода, который выполняет внешний вызов HTTP, но это не асинхронный. Мы используем rxjava 1.2 и Java 1.8
private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {
RestTemplate restTemplate;
HttpEntity request;
request = new HttpEntity(jsonContent, httpHeaders);
return restTemplate.exchange(url, httpMethod, request, String.class);
}
У меня есть следующий блок кода, который я нашел в Интернете, но я не мог полностью понять его и как я могу применить его к моей базе кода.
private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {
return httpClient.target(url)
.request()
.rx()
.get()
.subscribeOn(Schedulers.io())
.map(mapper);
}
1 ответ
Если я вас правильно понимаю, вам нужно что-то вроде этого, чтобы обернуть свои существующие callExternalUrl
static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
return Observable.fromCallable(() -> callExternalUrl(url, json, method))
.subscribeOn(Schedulers.io())
.flatMap(re -> {
if (re.hasBody())
return Observable.just(re.getBody());
else
return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
},
e -> Observable.error(e),
(Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
.observeOn(Schedulers.computation());
}
Краткое описание кода:
- Планирует выполнение существующих
callExternalUrl
наSchedulers.io
- Минимальное преобразование
ResponseEntity<T>
в успешномT
и случаи ошибок. Это происходит наio
Планировщик тоже, но это не важно, так как он действительно короткий. (Если было исключение внутриcallExternalUrl
, передается как есть.) - Делает подписчика на результат, который будет выполнен на
Schedulers.computation
Предостережения:
- Вы, вероятно, хотите использовать свои собственные планировщики для обоих
subscribeOn
а такжеobserveOn
- Вы, вероятно, хотите иметь лучшую логику в первой лямбде
flatMap
чтобы различать успех и ошибку и определенно вам нужен более конкретный тип исключения.
Магия высшего порядка
Если вы хотите использовать функции более высокого порядка и тратить немного производительности на меньшее дублирование кода, вы можете сделать что-то вроде этого:
// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
return Observable.fromCallable(() -> externalCall.call(url, json, method))
.subscribeOn(Schedulers.io())
.flatMap(re -> {
if (re.hasBody())
return Observable.just(re.getBody());
else
return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
},
e -> Observable.error(e),
(Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
.observeOn(Schedulers.computation());
}
static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}
куда MyClass
где ты callExternalUrl
является.
Обновление (только асинхронные вызовы)
приватная статическая RxClient httpClient = Rx.newClient(RxObservableInvoker.class); // здесь вы можете передать пользовательский ExecutorService
private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
return httpClient.target(url)
.request()
.headers(httpHeaders) // assuming httpHeaders is something global as in your example
.rx()
.method(httpMethod, entity)
.map(resp -> {
if (200 != resp.getStatus()) {
throw new RuntimeException("Bad status code " + resp.getStatus());
} else {
if (!resp.hasEntity()) {
// return null; // or error?
throw new RuntimeException("Empty response"); // or empty?
} else {
try {
return resp.readEntity(String.class);
} catch (Exception ex) {
throw new RuntimeException(ex); // wrap exception into unchecked
}
}
}
})
.observeOn(Schedulers.computation());
}
private Observable<String> executeGetAsync(String url) {
return executeHttpAsync(url, "GET", null);
}
private Observable<String> executePostAsync(String url, String json) {
return executeHttpAsync(url, "POST", Entity.json(json));
}
Снова аналогичные оговорки применяются:
- Вы, вероятно, хотите использовать свои собственные планировщики для обоих
newClient
позвонить иobserveOn
- Возможно, вы хотите иметь лучшую логику для обработки ошибок, чем просто проверять, является ли это HTTP 200 или нет, и определенно вам нужен более конкретный тип исключения. Но это все специфично для бизнес-логики, так что решать вам.
Также из вашего примера не ясно, как именно тело запроса (HttpEntity
) является ли сборка и всегда ли вы хотите String
в качестве ответа, как это в вашем оригинальном примере. Тем не менее я просто повторил вашу логику как есть. Если вам нужно что-то еще, вам, вероятно, следует обратиться к документации по адресу https://jersey.java.net/documentation/2.25/media.html