RxJava объединяет последовательность запросов

Эта проблема

У меня есть два Apis. Api 1 дает мне список предметов, а Api 2 дает мне более подробную информацию для каждого из предметов, которые я получил от Api 1. То, как я решил это до сих пор, приводит к плохой производительности.

Вопрос

Эффективное и быстрое решение этой проблемы с помощью Retrofit и RxJava.

Мой подход

На данный момент мое решение выглядит так:

Шаг 1: Модернизация выполняется Single<ArrayList<Information>> от Api 1.

Шаг 2: Я перебираю эти пункты и делаю запрос для каждого к Api 2.

Шаг 3: Retrofit Returns Последовательно выполняется Single<ExtendedInformation> за каждый предмет

Шаг 4: После того, как все вызовы формы Api 2 полностью выполнены, я создаю новый Объект для всех Предметов, объединяя информацию и Расширенную информацию.

Мой код

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }

5 ответов

Решение

TL; Dr использовать concatMapEager или же flatMap и выполнять суб-вызовы асинхронно или по расписанию.


длинная история

Я не разработчик Android, поэтому мой вопрос будет ограничен чисто RxJava (версия 1 и версия 2).

Если я правильно понял картинку, то необходимый поток:

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param

Предполагая, что Retrofit генерирует клиентов для

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

Если порядок товара не важен, то можно использовать flatMap только:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

Но только если модификатор сборки настроен с

  • Либо с помощью асинхронного адаптера (вызовы будут помещены в очередь во внутреннем исполнителе okhttp). Я лично думаю, что это не очень хорошая идея, потому что вы не можете контролировать этого исполнителя.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
    
  • Или с помощью адаптера на основе планировщика (вызовы будут планироваться в планировщике RxJava). Это был бы мой предпочтительный вариант, поскольку вы явно выбираете, какой планировщик используется, скорее всего, это будет планировщик ввода-вывода, но вы можете попробовать другой.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    

Причина в том, что flatMap подпишется на каждую наблюдаемую, созданную api2.extendedInfo(...) и объединить их в полученную наблюдаемую. Таким образом, результаты будут отображаться в порядке их получения.

Если модифицированный клиент не настроен на асинхронную работу или настроен на работу в планировщике, можно установить его:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

Эта структура почти идентична предыдущей, за исключением того, что она указывает локально, на каком планировщике каждый api2.extendedInfo должен бежать.

Можно настроить maxConcurrency параметр flatMap контролировать, сколько запросов вы хотите выполнить одновременно. Хотя я буду осторожен с этим, вы не хотите запускать все запросы одновременно. Обычно по умолчанию maxConcurrency достаточно хорош (128).

Теперь, если порядок оригинального запроса имеет значение. concatMap обычно оператор, который делает то же самое, что и flatMap по порядку, но последовательно, что оказывается медленным, если код должен ждать выполнения всех подзапросов. Решение, однако, является еще одним шагом вперед с concatMapEagerэтот подпишется на наблюдаемый по порядку и буферизует результаты по мере необходимости.

Предполагая, что клиенты дооснащения асинхронны или работают на определенном планировщике:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

Или, если планировщик должен быть установлен локально:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

Также возможно настроить параллелизм в этом операторе.


Дополнительно, если Api возвращается Flowable, можно использовать .parallel это все еще в бета-версии в настоящее время в RxJava 2.1.7. Но тогда результаты не в порядке, и я не знаю способ (пока?), Чтобы заказать их без сортировки после.

api.items(queryParam) // Flowable<Item>
   .parallel(10)
   .runOn(Schedulers.io())
   .map(item -> api2.extendedInfo(item.id()))
   .sequential();     // Flowable<ItemExtended>

flatMap Оператор предназначен для удовлетворения этих типов рабочих процессов.

я обрисую широкие мазки на простом примере из пяти шагов. надеюсь, вы сможете легко восстановить те же принципы в своем коде:

@Test fun flatMapExample() {
    // (1) constructing a fake stream that emits a list of values
    Observable.just(listOf(1, 2, 3, 4, 5))
            // (2) convert our List emission into a stream of its constituent values 
            .flatMap { numbers -> Observable.fromIterable(numbers) }
            // (3) subsequently convert each individual value emission into an Observable of some 
            //     newly calculated type
            .flatMap { number ->
                when(number) {
                       1 -> Observable.just("A1")
                       2 -> Observable.just("B2")
                       3 -> Observable.just("C3")
                       4 -> Observable.just("D4")
                       5 -> Observable.just("E5")
                    else -> throw RuntimeException("Unexpected value for number [$number]")
                }
            }
            // (4) collect all the final emissions into a list
            .toList()
            .subscribeBy(
                    onSuccess = {
                        // (5) handle all the combined results (in list form) here
                        println("## onNext($it)")
                    },
                    onError = { error ->
                        println("## onError(${error.message})")
                    }
            )
}

(кстати, если порядок выбросов имеет значение, посмотрите на использование concatMap вместо).

Надеюсь, это поможет.

Проверьте ниже, это работает.

Скажем, у вас есть несколько сетевых вызовов, которые вам нужно сделать - например, для получения информации о пользователе Github и пользовательских событий Github.

И вы хотите дождаться возвращения каждого из них, прежде чем обновлять пользовательский интерфейс. RxJava может помочь вам здесь. Давайте сначала определим наш объект Retrofit для доступа к API Github, а затем настроим две наблюдаемые для вызова двух сетевых запросов.

Retrofit repo = new Retrofit.Builder()
        .baseUrl("https://api.github.com")
        .addConverterFactory(GsonConverterFactory.create())
        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
        .build();

Observable<JsonObject> userObservable = repo
        .create(GitHubUser.class)
        .getUser(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());

Observable<JsonArray> eventsObservable = repo
        .create(GitHubEvents.class)
        .listEvents(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());

Используемый интерфейс для этого, как показано ниже:

public interface GitHubUser {
  @GET("users/{user}")
  Observable<JsonObject> getUser(@Path("user") String user);
}

public interface GitHubEvents {
  @GET("users/{user}/events")
  Observable<JsonArray> listEvents(@Path("user") String user);
}

После того, как мы используем метод zx в RxJava, чтобы объединить наши два Observables и дождаться их завершения, прежде чем создавать новый Observable.

Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
  @Override
  public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
    return new UserAndEvents(jsonObject, jsonElements);
  }
});

Наконец, давайте вызовем метод подписки для нашего нового комбинированного Observable:

combined.subscribe(new Subscriber<UserAndEvents>() {
          ...
          @Override
          public void onNext(UserAndEvents o) {
            // You can access the results of the 
            // two observabes via the POJO now
          }
        });

Больше не нужно ждать в темах и т. Д. Для завершения сетевых вызовов RxJava сделал все это для вас в zip(). надеюсь, мой ответ поможет вам.

Я решил аналогичную проблему с RxJava2. Выполнение запросов для Api 2 параллельно немного ускоряет работу.

private InformationRepository informationRepository;

//init....

public Single<List<FullInformation>> getFullInformation() {
    return informationRepository.getInformationList()
            .subscribeOn(Schedulers.io())//I usually write subscribeOn() in the repository, here - for clarity
            .flatMapObservable(Observable::fromIterable)
            .flatMapSingle(this::getFullInformation)
            .collect(ArrayList::new, List::add);

}

private Single<FullInformation> getFullInformation(Information information) {
    return informationRepository.getExtendedInformation(information)
            .map(extendedInformation -> new FullInformation(information, extendedInformation))
            .subscribeOn(Schedulers.io());//execute requests in parallel
}

Информационный репозиторий - просто интерфейс. Его реализация нам не интересна.

public interface InformationRepository {

    Single<List<Information>> getInformationList();

    Single<ExtendedInformation> getExtendedInformation(Information information);
}

FullInformation - контейнер для результата.

public class FullInformation {

    private Information information;
    private ExtendedInformation extendedInformation;

    public FullInformation(Information information, ExtendedInformation extendedInformation) {
        this.information = information;
        this.extendedInformation = extendedInformation;
    }
}

Попробуйте использовать Observable.zip() оператор. Он будет ждать завершения обоих вызовов Api, прежде чем продолжить поток. Затем вы можете вставить некоторую логику, вызвав flatMap() после этого.

http://reactivex.io/documentation/operators/zip.html

Другие вопросы по тегам