RxJS: рекурсивный список наблюдаемых и один наблюдатель

У меня были некоторые проблемы с рекурсивной цепочкой наблюдаемых.

Я работаю с RxJS, которая в настоящее время находится в версии 1.0.10621 и содержит основную функциональность Rx в сочетании с Rx для jQuery.

Позвольте мне представить примерный сценарий для моей проблемы: я опрашиваю API поиска в Twitter (ответ JSON) для твитов / обновлений, содержащих определенное ключевое слово. Ответ также включает "refresh_url", который следует использовать для генерации последующего запроса. Ответ на этот последующий запрос снова будет содержать новый refresh_url и т. Д.

Rx.jQuery позволяет мне заставить API поиска в Твиттере вызывать наблюдаемое событие, которое генерирует одно onNext и затем завершается. До сих пор я пытался, чтобы обработчик onNext запомнил refresh_url и использовал его в обработчике onCompleted для создания нового наблюдаемого и соответствующего наблюдателя для следующего запроса. Таким образом, одна наблюдаемая пара + наблюдатель следует за другой бесконечно.

Проблема с этим подходом заключается в следующем:

  1. Наблюдаемые наблюдения / наблюдатели уже живы, когда их предшественники еще не были уничтожены.

  2. Я должен сделать много неприятного бухгалтерии, чтобы сохранить действительную ссылку на ныне живущего наблюдателя, которых на самом деле может быть два. (Один в onCompleted, а другой где-то еще в его жизненном цикле). Эта ссылка, конечно, необходима для отмены подписки / удаления наблюдателя. Альтернативой бухгалтерии было бы реализовать побочный эффект с помощью "все еще работает?"- логического значения, как я сделал в своем примере.

Пример кода:

            running = true;
            twitterUrl = "http://search.twitter.com/search.json";
            twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
            twitterMaxId = 0; //actually twitter ignores its since_id parameter

            newTweetObserver = function () {
                return Rx.Observer.create(
                        function (tweet) {
                            if (tweet.id > twitterMaxId) {
                                twitterMaxId = tweet.id;
                                displayTweet(tweet);
                            }
                        }
                    );
            }

            createTwitterObserver = function() {
                twitterObserver = Rx.Observer.create(
                        function (response) {
                            if (response.textStatus == "success") {
                                var data = response.data;
                                if (data.error == undefined) {
                                    twitterQuery = data.refresh_url;
                                    var tweetObservable;
                                    tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                    tweetObservable.subscribe(newTweetObserver());
                                }
                            }
                        },
                        function(error) { alert(error); },
                        function () {
                            //create and listen to new observer that includes a delay 
                            if (running) {
                                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                twitterObservable.subscribe(createTwitterObserver());
                            }
                        } 
                    );
                return twitterObserver;
            }
            twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
            twitterObservable.subscribe(createTwitterObserver());

Не обманывайтесь двойным слоем наблюдаемых / наблюдателей от запросов к твитам. Мой пример касается в основном первого уровня: запрос данных из Twitter. Если при решении этой проблемы второй слой (преобразование ответов в твиты) может стать одним с первым, это было бы фантастически; Но я думаю, что это совсем другое. Теперь.

Эрик Мейер указал мне на оператор Expand (см. Пример ниже) и предложил шаблоны Join в качестве альтернативы.

var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
                   , i => ( i == 10 ? Observable.Empty<int>() // terminate
         : new[]{i+1}.ToObservable() // recurse
 )
);

ys.ToArray().Select(a => string.Join(",", a)).DumpLive();

Это должно быть скопировано в LINQPad. Он предполагает одноэлементные наблюдаемые и производит одного конечного наблюдателя.

Итак, мой вопрос: как я могу сделать лучший трюк расширения в RxJS?

РЕДАКТИРОВАТЬ:
Оператор расширения, вероятно, может быть реализован, как показано в этом потоке. Но нужны генераторы (а у меня только JS < 1.6).
К сожалению, RxJS 2.0.20304-бета не реализует метод Extend.

1 ответ

Поэтому я собираюсь попытаться решить вашу проблему немного иначе, чем вы, и взять некоторые вольности, которые вы можете решить проще.

Итак, я не могу сказать, что вы пытаетесь сделать следующие шаги

  • Получить первый список твитов, который содержит следующий URL
  • После получения списка твитов, onNext текущий наблюдатель и получить следующий набор твитов
    • Делать это до бесконечности

Или есть действие пользователя (получить больше / прокрутка вниз). В любом случае, это действительно та же проблема. Возможно, я неправильно читаю вашу проблему. Вот ответ на этот вопрос.

function getMyTweets(headUrl) {
    return Rx.Observable.create(function(observer) {

        innerRequest(headUrl);
        function innerRequest(url) {
            var next = '';

            // Some magic get ajax function
            Rx.get(url).subscribe(function(res) {
                observer.onNext(res);
                next = res.refresh_url;
            },
            function() {
                // Some sweet handling code
                // Perhaps get head?
            },
            function() {
                innerRequest(next);
            });
        }
    });
}

Это может быть не тот ответ, который вы просили. Если нет, извините!


Изменить: После просмотра вашего кода, похоже, что вы хотите взять результаты в виде массива и наблюдать его.

// From the results perform a select then a merge (if ordering does not matter).
getMyTweets('url')
    .selectMany(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    });

// Ensures ordering
getMyTweets('url')
    .select(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    })
    .concat();
Другие вопросы по тегам