Rx Observables: испускать дополнительный предмет для каждого оригинального предмета, уменьшать его до другого типа, потреблять

У меня проблемы с выполнением следующих действий с использованием Couchbase Java client 2.2.2 и Rx Observables 1.0.15:

  • У меня есть список строк, которые являются именами документов
  • Наряду с каждым оригинальным документом для имени документа я хотел бы загрузить другой документ (выводимый из оригинального имени документа), чтобы получить пару документов. Если какой-либо из этих двух документов не существует, больше не используйте эту пару.
  • Если пара действительна (т.е. оба документа существуют), используйте оба документа для создания из них пользовательского объекта.
  • объединить эти преобразованные элементы в список

То, что я придумала до сих пор, выглядит действительно значимым:

List<E> resultList = new ArrayList<>();

Observable
    .from(originalDocumentNames)
    .flatmap(key -> {
        Observable firstDocument = bucket.async().get(key);
        Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
        return Observable.merge(firstDocument, secondDocument);
    })
    .reduce((jsonDocument1, jsonDocument2) -> {
        if (jsonDocument1 == null || jsonDocument2 == null) {
            return null;
        }
        resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
        return null;
    })
    .filter(Objects.nonNull)
    .singleOrDefault(null)
    .subscribe(new Subscriber<E>() {
        public void onComplete() {
            //use resultList in a callback function
        }
    });

Это не работает. Я не знаю, где, но я думаю, что я использую Observable.merge неправильный путь. Также я думаю, что подхожу ко всей проблеме неправильно.

Итак, основные вопросы, по-видимому, таковы:

  • Как отправить дополнительный элемент в наблюдаемый поток?
  • Как я могу уменьшить два элемента в элемент другого типа? (уменьшить (T, T, T) не позволяет)
  • я ошибаюсь?

2 ответа

Решение

Вы могли бы использовать zip внутри плоской карты. Zip будет излучать столько же предметов, сколько Observable с наименьшим количеством предметов. Поэтому, если один из документов отсутствует, его последовательность будет пустой, и zip пропустит его.

Observable
.from(originalDocumentNames)
.flatmap(key -> {
    //the stream of 0-1 original document
    Observable firstDocument = bucket.async().get(key);
    //the stream of 0-1 associated document
    Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));

    //using zip and the createCustomObject method reference as a zip function to combine pairs of documents
    return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
})
.toList() //let RxJava aggregate into a List
.subscribe(
    //the "callback" function, onNext will be called only once with toList
    list -> doSomething(list), 
    //always try to define onError (best practice)
    e -> processErrors(e)
);

В этом коде есть несколько проблем:

  1. побочный эффект, reduce операция добавляет в список за пределами Observable цепь, это неправильно. reduce должен либо вернуть список, либо его вообще не существует, так как Rx имеет toList операция. Также из-за операции уменьшения, которая возвращает null следующие операции должны обрабатывать это; это довольно не элегантно.

  2. merge операция не так, вы должны вместо zip в flatmap и построить пару / совокупность.

  3. Необязательный момент: операция flatmap не обрабатывается, если любая из операций get вернет несколько элементов (возможно, это де-факто случай с couchbase)

Обратите внимание, у меня нет IDE, поэтому пока нет кода. Но с моей точки зрения замена merge от zip и удаление reduce конечно должно помочь.

Другие вопросы по тегам