Rx Observables: испускать дополнительный предмет для каждого оригинального предмета, уменьшать его до другого типа, потреблять
У меня проблемы с выполнением следующих действий с использованием Couchbase Java client 2.2.2 и Rx Observables 1.0.15:
- У меня есть список строк, которые являются именами документов
- Наряду с каждым оригинальным документом для имени документа я хотел бы загрузить другой документ (выводимый из оригинального имени документа), чтобы получить пару документов. Если какой-либо из этих двух документов не существует, больше не используйте эту пару.
- Если пара действительна (т.е. оба документа существуют), используйте оба документа для создания из них пользовательского объекта.
- объединить эти преобразованные элементы в список
То, что я придумала до сих пор, выглядит действительно значимым:
List<E> resultList = new ArrayList<>();
Observable
.from(originalDocumentNames)
.flatmap(key -> {
Observable firstDocument = bucket.async().get(key);
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
return Observable.merge(firstDocument, secondDocument);
})
.reduce((jsonDocument1, jsonDocument2) -> {
if (jsonDocument1 == null || jsonDocument2 == null) {
return null;
}
resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
return null;
})
.filter(Objects.nonNull)
.singleOrDefault(null)
.subscribe(new Subscriber<E>() {
public void onComplete() {
//use resultList in a callback function
}
});
Это не работает. Я не знаю, где, но я думаю, что я использую Observable.merge
неправильный путь. Также я думаю, что подхожу ко всей проблеме неправильно.
Итак, основные вопросы, по-видимому, таковы:
- Как отправить дополнительный элемент в наблюдаемый поток?
- Как я могу уменьшить два элемента в элемент другого типа? (уменьшить (T, T, T) не позволяет)
- я ошибаюсь?
2 ответа
Вы могли бы использовать zip
внутри плоской карты. Zip будет излучать столько же предметов, сколько Observable
с наименьшим количеством предметов. Поэтому, если один из документов отсутствует, его последовательность будет пустой, и zip пропустит его.
Observable
.from(originalDocumentNames)
.flatmap(key -> {
//the stream of 0-1 original document
Observable firstDocument = bucket.async().get(key);
//the stream of 0-1 associated document
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
//using zip and the createCustomObject method reference as a zip function to combine pairs of documents
return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
})
.toList() //let RxJava aggregate into a List
.subscribe(
//the "callback" function, onNext will be called only once with toList
list -> doSomething(list),
//always try to define onError (best practice)
e -> processErrors(e)
);
В этом коде есть несколько проблем:
побочный эффект,
reduce
операция добавляет в список за пределамиObservable
цепь, это неправильно.reduce
должен либо вернуть список, либо его вообще не существует, так как Rx имеетtoList
операция. Также из-за операции уменьшения, которая возвращаетnull
следующие операции должны обрабатывать это; это довольно не элегантно.merge
операция не так, вы должны вместоzip
вflatmap
и построить пару / совокупность.Необязательный момент: операция flatmap не обрабатывается, если любая из операций get вернет несколько элементов (возможно, это де-факто случай с couchbase)
Обратите внимание, у меня нет IDE, поэтому пока нет кода. Но с моей точки зрения замена merge
от zip
и удаление reduce
конечно должно помочь.