RxJava2: выполнить асинхронную функцию для каждого элемента в списке и ожидать обратного вызова

Я борюсь с RxJava2. Я хочу выполнить функцию для каждого элемента списка. Эта функция:

public void function(final Result result) {

    FirebaseFirestore.getInstance().collection(COLLECTION_NAME).document(result.getId()).get().addOnSuccessListener(new OnSuccessListener<DocumentSnapshot>() {
        @Override
        public void onSuccess(DocumentSnapshot documentSnapshot) {
            // do some operation
        }
    });
}

Эта функция асинхронная и использует FirebaseFirestore.

Поэтому я попытался использовать RxJava2 в своем списке для вызова функции для каждого элемента:

Observable.fromIterable(resultList)
                    .concatMap(result -> Observable.fromCallable(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            function(result);
                            return "ok";
                        }
                    }))
                    .subscribe(r -> {
                        // do some operation when all firebase async tasks are done
                    });

ConcatMap работает, и функция вызывается для каждого элемента списка. Проблема в том, что мне нужен обратный вызов, когда все асинхронные задачи Firebase выполнены.

Любая помощь приветствуется.

1 ответ

Решение

Я постараюсь нарисовать возможное решение:

public class Callback implements OnSuccessListener<DocumentSnapshot> {
     private final ObservableEmitter<DocumentSnapshot> emitter;
     private final boolean last;

     public Callback(boolean lastvalue, ObservableEmitter<DocumentSnapshot> e) {
        this.last = lastvalue;
        this.emitter = e;
     }

     @Override
     public void onSuccess(DocumentSnapshot value) {
         emitter.onNext(value);
         if (last) {
             emitter.onComplete();
         }
     }
}


Observable<DocumentSnapshot> observable = Observable.create(new ObservableOnSubscribe<DocumentSnapshot>() {
        @Override
        public void subscribe(ObservableEmitter<DocumentSnapshot> e) throws Exception {
            int i = 1;
            for (Result result : resultList) {
                /* callback object now knows which is the last request so it can emit the onComplete */
                Callback callbackInstance = new Callback(resultList.size() == i, e);
                i++;
                FirebaseFirestore.getInstance().collection(COLLECTION_NAME)
                            .document(result.getId()).get().addOnSuccessListener(callbackInstance);
                }
            }
        });

тогда, когда подписчик onComplete действие выполнено, все запросы к Firebase должны быть выполнены.

Другие вопросы по тегам