Как обрабатывать элементы, испускаемые Observable с доступом к значениям из другого?

Мне нужно выполнить асинхронный call_1Лови свою Наблюдаемую reply_1затем сделайте еще одну асинхронную операцию call_2 и при обработке его reply_2 Мне также нужен доступ к reply_1,

Я пробовал что-то вроде:

public rx.Observable<Game> findGame(long templateId, GameModelType game_model, GameStateType state) {

rx.Observable<RxMessage<byte[]>> ebs = context.getGameTemplate(templateId);

return context.findGame(templateId, state) // findGame returns rx.Observable<RxMessage<byte[]>>

    .flatMap(new Func1<RxMessage<byte[]>, rx.Observable<Game>>() {

        @Override
        public Observable<Game> call(RxMessage<byte[]> gameRawReply) {

            Game game = null;

            switch(game_model) {

                case SINGLE: {

                    ebs.subscribe(new Action1<RxMessage<byte[]>>() {

                        @Override
                        public void call(RxMessage<byte[]> t1) {

                            game = singleGames.get(0);

                        }
                    });
                }
            }

            return rx.Observable.from(game);
        }
    });
}

У меня все еще есть проблема при компиляции этого метода из-за final вопросы game,

Это правильный способ работы над этой проблемой или есть более естественный способ выполнить то, что я пытаюсь сделать.

1 ответ

Решение

Если я понимаю, что вы хотите сделать правильно, я думаю, что естественный способ решить эту проблему будет zip:

У вас есть две наблюдаемые, которые излучают свои результаты асинхронно, а именно ebs и возвращаемое значение context.findGame(...),

Вы можете объединить их результат, выполнив что-то вроде этого:

public rx.Observable<Game> findGame(long templateId, GameModelType game_model, GameStateType state) {

    rx.Observable<RxMessage<byte[]>> ebs = context.getGameTemplate(templateId);
    rx.Observable<RxMessage<byte[]>> gameObs = context.findGame(templateId, state);

    return Observable.zip(gameObs, ebs, new Func2<RxMessage<byte[]>, RxMessage<byte[]>, Game>() {

        @Override
        public Game call(RxMessage<byte[]> gameRawReply, RxMessage<byte[]> t1) {

            Game game = null;

            switch(game_model) {
                case SINGLE: {
                    game = singleGames.get(0);
                }
            }

            return game;
        }
    });
}

Func2 - третий аргумент zip - будет вызван, когда обе ваши исходные Observables назвали свои onNext, Он будет использоваться для объединения значений, которые они излучают, с новым значением типа Game, и это будет передаваться подписчикам результирующего Observable.

РЕДАКТИРОВАТЬ: Еще немного информации...

Обратите внимание, что я изменил возврат вызова () с Observable<Game> просто игра. В противном случае результат zip не был бы Observable<Game> но Observable<Observable<Game>>, В отличие от map и flatMap, в rx есть только zip, а не flatZip. Но так как вы всегда хотите испустить ровно одну игру для каждой пары входных элементов (один из ebs, один из gameObs), в этом случае это не проблема.

Кроме того, call() Func2 теперь может быть еще более упрощен до:

@Override
public Game call(RxMessage<byte[]> gameRawReply, RxMessage<byte[]> t1) {

    switch(game_model) {
        case SINGLE: {
            return singleGames.get(0);
        }
    }
}
Другие вопросы по тегам