Почему не вызывается doOnNext()?

Ни onNext(), ни onCompleted() не будут вызваны для моего подписчика ниже. Я попытался реализовать подписчика с помощью doOnNext()/doOnTerminate(). Я также попытался сделать doAfterTerminate(). Я также попытался явно определить подписчика, и ни onNext(), ни onCompleted() не были вызваны для этого.

Согласно RxJS, уменьшение не продолжается, это сокращение (), которое не заканчивается, поэтому я попытался добавить дубль (1), но это не сработало. В том же вопросе stackru кто-то сказал, что проблема может быть в том, что мой поток никогда не закрывается. Помимо take (1), возможно, есть другой способ закрыть поток, но я пока недостаточно хорошо понимаю ReactiveX.

Согласно Почему OnComplete не вызывается в этом коде? (RxAndroid), возможно, исходный поток в серии не заканчивается. Но я не понимаю, почему это будет иметь значение, если я вызываю take (1), который, как мне кажется, должен выдавать сигнал завершения.

По сути, почему не выполняется следующая строка?

System.out.println("doOnNext map.size()=" + map.size());

Хотя следующая строка кода выполняется 98 раз:

map.put(e.getKey(), e.getValue());

JsonObjectObservableRequest.java

import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {

    public JsonObjectObservableRequest(int method, String url, JSONObject request) {
    jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(),     getResponseErrorListener());
    }

    private Response.Listener<JSONObject> getResponseListener() {
        return new Response.Listener<JSONObject>() {
            @Override
            public void onResponse(JSONObject response) {
                publishSubject.onNext(Observable.just(response));
            }
        };
    }

    private Response.ErrorListener getResponseErrorListener() {
        return new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Observable<JSONObject> myError = Observable.error(error);
                publishSubject.onNext(myError);
            }
        };
    }

    public JsonObjectRequest getJsonObjectRequest() {
        return jsonObjectRequest;
    }

    public Observable<JSONObject> getObservable() {
    return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable<    JSONObject>>() {
            @Override
            public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
                return jsonObjectObservable;
            }
        });
    }
}

JsonObjectObservableRequest вызывающий код

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            .take(1)
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();

    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

2 ответа

Решение

Я нашел способ получить doOnTerminate() и doOnNext() для вызова. Он должен был поместить оператор take(1) выше при обработке потока. Я получил эту идею, основываясь на комментарии @yosriz. Хотелось бы, чтобы я понял, почему это решило проблему, но я не понимаю, потому что думал, что сигнал завершения будет только что распространен, но я думаю, что мне нужно еще много узнать о ReactiveX / реактивном функциональном программировании / и т.д..

JsonObjectObservableRequest вызывающий код

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

jsonObjectObservable
        .map(json -> {
            try {
                return NetworkAccountIdDatasource.parseIdJSON(json);
            } catch (JSONException e) {
                e.printStackTrace();
                return null;
            }
        })
        .take(1) // SOLUTION: MOVED THIS UP FROM BELOW reduce()
        .flatMapIterable(x -> x)
        .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
        .reduce(new HashMap<String, String>(), (map, e) -> {
            map.put(e.getKey(), e.getValue());
            return map;
        })
        // .take(1) // SOLUTION: MOVE THIS ABOVE flatMapIterable() 
        .doOnNext(map -> {
            System.out.println("doOnNext map.size()=" + map.size());
        })
        .doOnTerminate(() -> {
            System.out.println("doOnTerminate");
        })
        .subscribe();

final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

Кроме того, это работает также, если я возьму take(1) в качестве самой первой операции, перед первой операцией map () (той, которая вызывает parseIdJSON()). Тем не менее, если я добавлю take(1) в качестве операции непосредственно перед redu (), она будет работать только частично; хорошо то, что doOnNext() вызывается, но плохо то, что map.size() всегда равен 1, когда для моего набора данных должно быть 98.

Вы правильно определили проблему, вы не звоните onCompleted() в исходной сети Observable (на JsonObjectObservableRequest класс), в то время как take(1) тоже не поможет, потому что вы поставили его перед reduce,
Как вы, вероятно, поняли, сокращение должно работать на Observable с конечным числом выбросов, поскольку он испускает накопленный элемент, когда все элементы были выпущены, и, таким образом, он полагается на onCompleted() событие, чтобы узнать, когда наблюдаемый поток заканчивается.

  • Я думаю, что в вашем случае лучше всего изменить способ JsonObjectObservableRequest работает, вам не нужно Subject для этого вы можете использовать методы создания, чтобы обернуть обратный вызов (вы можете увидеть мой ответ здесь и узнать больше о соединении между миром обратного вызова в RxJava здесь.
  • Кроме того, вам не нужно выделять Observable чего-то, а затем flatmap это к предметам, вы можете просто испустить предмет с onNext() и выдать ошибку с onError()на самом деле вы скрываете ошибки и конвертируете их в эмиссию, это может затруднить обработку ошибок в процессе.
  • Вам следует позвонитьonCompleted() в onResponse() после звонка onNext() чтобы сообщить о завершении. а в случае ошибки сигнализация onError уведомит о прекращении потока.
  • Другая проблема - отмена запроса, который, кажется, не обрабатывается таким образом, вы можете прочитать источники, чтобы узнать, как это можно сделать при переносе вызова callbacl.
Другие вопросы по тегам