RxJS: рекурсивный список наблюдаемых и один наблюдатель
У меня были некоторые проблемы с рекурсивной цепочкой наблюдаемых.
Я работаю с RxJS, которая в настоящее время находится в версии 1.0.10621 и содержит основную функциональность Rx в сочетании с Rx для jQuery.
Позвольте мне представить примерный сценарий для моей проблемы: я опрашиваю API поиска в Twitter (ответ JSON) для твитов / обновлений, содержащих определенное ключевое слово. Ответ также включает "refresh_url", который следует использовать для генерации последующего запроса. Ответ на этот последующий запрос снова будет содержать новый refresh_url и т. Д.
Rx.jQuery позволяет мне заставить API поиска в Твиттере вызывать наблюдаемое событие, которое генерирует одно onNext и затем завершается. До сих пор я пытался, чтобы обработчик onNext запомнил refresh_url и использовал его в обработчике onCompleted для создания нового наблюдаемого и соответствующего наблюдателя для следующего запроса. Таким образом, одна наблюдаемая пара + наблюдатель следует за другой бесконечно.
Проблема с этим подходом заключается в следующем:
Наблюдаемые наблюдения / наблюдатели уже живы, когда их предшественники еще не были уничтожены.
Я должен сделать много неприятного бухгалтерии, чтобы сохранить действительную ссылку на ныне живущего наблюдателя, которых на самом деле может быть два. (Один в onCompleted, а другой где-то еще в его жизненном цикле). Эта ссылка, конечно, необходима для отмены подписки / удаления наблюдателя. Альтернативой бухгалтерии было бы реализовать побочный эффект с помощью "все еще работает?"- логического значения, как я сделал в своем примере.
Пример кода:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function () {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function () {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Не обманывайтесь двойным слоем наблюдаемых / наблюдателей от запросов к твитам. Мой пример касается в основном первого уровня: запрос данных из Twitter. Если при решении этой проблемы второй слой (преобразование ответов в твиты) может стать одним с первым, это было бы фантастически; Но я думаю, что это совсем другое. Теперь.
Эрик Мейер указал мне на оператор Expand (см. Пример ниже) и предложил шаблоны Join в качестве альтернативы.
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => ( i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
Это должно быть скопировано в LINQPad. Он предполагает одноэлементные наблюдаемые и производит одного конечного наблюдателя.
Итак, мой вопрос: как я могу сделать лучший трюк расширения в RxJS?
РЕДАКТИРОВАТЬ:
Оператор расширения, вероятно, может быть реализован, как показано в этом потоке. Но нужны генераторы (а у меня только JS < 1.6).
К сожалению, RxJS 2.0.20304-бета не реализует метод Extend.
1 ответ
Поэтому я собираюсь попытаться решить вашу проблему немного иначе, чем вы, и взять некоторые вольности, которые вы можете решить проще.
Итак, я не могу сказать, что вы пытаетесь сделать следующие шаги
- Получить первый список твитов, который содержит следующий URL
- После получения списка твитов, onNext текущий наблюдатель и получить следующий набор твитов
- Делать это до бесконечности
Или есть действие пользователя (получить больше / прокрутка вниз). В любом случае, это действительно та же проблема. Возможно, я неправильно читаю вашу проблему. Вот ответ на этот вопрос.
function getMyTweets(headUrl) {
return Rx.Observable.create(function(observer) {
innerRequest(headUrl);
function innerRequest(url) {
var next = '';
// Some magic get ajax function
Rx.get(url).subscribe(function(res) {
observer.onNext(res);
next = res.refresh_url;
},
function() {
// Some sweet handling code
// Perhaps get head?
},
function() {
innerRequest(next);
});
}
});
}
Это может быть не тот ответ, который вы просили. Если нет, извините!
Изменить: После просмотра вашего кода, похоже, что вы хотите взять результаты в виде массива и наблюдать его.
// From the results perform a select then a merge (if ordering does not matter).
getMyTweets('url')
.selectMany(function(data) {
return Rx.Observable.fromArray(data.results.reverse());
});
// Ensures ordering
getMyTweets('url')
.select(function(data) {
return Rx.Observable.fromArray(data.results.reverse());
})
.concat();