Почему не вызывается doOnNext()?
Ни onNext(), ни onCompleted() не будут вызваны для моего подписчика ниже. Я попытался реализовать подписчика с помощью doOnNext()/doOnTerminate(). Я также попытался сделать doAfterTerminate(). Я также попытался явно определить подписчика, и ни onNext(), ни onCompleted() не были вызваны для этого.
Согласно RxJS, уменьшение не продолжается, это сокращение (), которое не заканчивается, поэтому я попытался добавить дубль (1), но это не сработало. В том же вопросе stackru кто-то сказал, что проблема может быть в том, что мой поток никогда не закрывается. Помимо take (1), возможно, есть другой способ закрыть поток, но я пока недостаточно хорошо понимаю ReactiveX.
Согласно Почему OnComplete не вызывается в этом коде? (RxAndroid), возможно, исходный поток в серии не заканчивается. Но я не понимаю, почему это будет иметь значение, если я вызываю take (1), который, как мне кажется, должен выдавать сигнал завершения.
По сути, почему не выполняется следующая строка?
System.out.println("doOnNext map.size()=" + map.size());
Хотя следующая строка кода выполняется 98 раз:
map.put(e.getKey(), e.getValue());
JsonObjectObservableRequest.java
import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {
public JsonObjectObservableRequest(int method, String url, JSONObject request) {
jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(), getResponseErrorListener());
}
private Response.Listener<JSONObject> getResponseListener() {
return new Response.Listener<JSONObject>() {
@Override
public void onResponse(JSONObject response) {
publishSubject.onNext(Observable.just(response));
}
};
}
private Response.ErrorListener getResponseErrorListener() {
return new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
Observable<JSONObject> myError = Observable.error(error);
publishSubject.onNext(myError);
}
};
}
public JsonObjectRequest getJsonObjectRequest() {
return jsonObjectRequest;
}
public Observable<JSONObject> getObservable() {
return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable< JSONObject>>() {
@Override
public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
return jsonObjectObservable;
}
});
}
}
JsonObjectObservableRequest вызывающий код
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
.take(1)
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
2 ответа
Я нашел способ получить doOnTerminate() и doOnNext() для вызова. Он должен был поместить оператор take(1) выше при обработке потока. Я получил эту идею, основываясь на комментарии @yosriz. Хотелось бы, чтобы я понял, почему это решило проблему, но я не понимаю, потому что думал, что сигнал завершения будет только что распространен, но я думаю, что мне нужно еще много узнать о ReactiveX / реактивном функциональном программировании / и т.д..
JsonObjectObservableRequest вызывающий код
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.take(1) // SOLUTION: MOVED THIS UP FROM BELOW reduce()
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
// .take(1) // SOLUTION: MOVE THIS ABOVE flatMapIterable()
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
Кроме того, это работает также, если я возьму take(1) в качестве самой первой операции, перед первой операцией map () (той, которая вызывает parseIdJSON()). Тем не менее, если я добавлю take(1) в качестве операции непосредственно перед redu (), она будет работать только частично; хорошо то, что doOnNext() вызывается, но плохо то, что map.size() всегда равен 1, когда для моего набора данных должно быть 98.
Вы правильно определили проблему, вы не звоните onCompleted()
в исходной сети Observable
(на JsonObjectObservableRequest
класс), в то время как take(1)
тоже не поможет, потому что вы поставили его перед reduce
,
Как вы, вероятно, поняли, сокращение должно работать на Observable
с конечным числом выбросов, поскольку он испускает накопленный элемент, когда все элементы были выпущены, и, таким образом, он полагается на onCompleted()
событие, чтобы узнать, когда наблюдаемый поток заканчивается.
- Я думаю, что в вашем случае лучше всего изменить способ
JsonObjectObservableRequest
работает, вам не нужноSubject
для этого вы можете использовать методы создания, чтобы обернуть обратный вызов (вы можете увидеть мой ответ здесь и узнать больше о соединении между миром обратного вызова в RxJava здесь. - Кроме того, вам не нужно выделять
Observable
чего-то, а затемflatmap
это к предметам, вы можете просто испустить предмет сonNext()
и выдать ошибку сonError()
на самом деле вы скрываете ошибки и конвертируете их в эмиссию, это может затруднить обработку ошибок в процессе. - Вам следует позвонить
onCompleted()
вonResponse()
после звонкаonNext()
чтобы сообщить о завершении. а в случае ошибки сигнализацияonError
уведомит о прекращении потока. - Другая проблема - отмена запроса, который, кажется, не обрабатывается таким образом, вы можете прочитать источники, чтобы узнать, как это можно сделать при переносе вызова callbacl.