Rx.Net: наблюдать асинхронные события бесконечно
У меня есть вспомогательный класс, который сохраняет текстовые сообщения в локальной файловой системе. Этот метод возвращает Task
объект, и является асинхронным по определению.
Я хочу иметь возможность наблюдать, когда вызывается этот метод, поэтому я могу постоянно отслеживать размер и длину буфера и принимать решения на основе этого.
Я пытаюсь реализовать это с помощью Reactive Extension для.NET. Тем не менее, я не могу придумать дизайн, который позволяет мне постоянно слушать сообщения, добавляемые в буфер. Ниже моя текущая реализация:
public IObservable<Unit> Receive(InternalMessage message)
{
var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable
return observable;
}
Вот как я подписываюсь на наблюдаемое:
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
receiverObservable.Subscribe(
x => Console.WriteLine("On next"),
ex => //TODO,
() => // Completed);
Я хочу, чтобы абонент вызывался каждый раз, когда метод Receive
называется. Тем не менее, AFAIK, как только этот метод вызывается, наблюдаемое завершается и последовательность завершается, поэтому будущие вызовы Receive
не будет слушать
Может ли кто-нибудь порекомендовать способ использования библиотек Rx.Net для реализации этого наблюдаемого шаблона, который я ищу, то есть как сохранить последовательность открытой и передать ее результаты для асинхронных методов?
1 ответ
Receive
как вы это закодировали, возвращает IObservable<Unit>
, представляющий завершение единственной задачи. Вы хотите подписаться на то, что возвращает IObservable<IObservable<Unit>>
представляющий поток завершений задач.
Есть несколько способов сделать это, лучший из которых, вероятно, зависит от того, как настроен ваш класс и как вы его называете.
Вот самый ленивый:
Вы объявляете переменную уровня класса subject
это представляет поток ваших звонков:
Subject<IObservable<Unit>> subject = new Subject<IObservable<Unit>>();
subject.Merge().Subscribe(
x => Console.WriteLine("On next"),
ex => { }, //TODO
() => { } // Completed
);
Затем, когда у вас есть новый вызов, вы просто добавляете его в тему.
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
subject.OnNext(receiverObservable);
Причина, по которой это действительно лениво, заключается в том, что Rx является функциональным по своей сути, что имеет тенденцию смотреть свысока на переменные изменяемого состояния. Subjects
в основном изменчивое состояние.
Лучший способ сделать это - выяснить, когда / почему вы звоните Receive
и структурировать это как наблюдаемое. Как только это будет сделано, вы можете работать с этим:
IObservable<Unit> sourceReasonsToCallReceive; // Most likely sourced from event
sourceReasonsToCallReceive.SelectMany(_ => batchHandler.Receive(message))
.SubScribe(
x => Console.WriteLine("On next"),
ex => { }, //TODO
() => { } // Completed
);
Надеюсь, это поможет.