Rx.Net: наблюдать асинхронные события бесконечно

У меня есть вспомогательный класс, который сохраняет текстовые сообщения в локальной файловой системе. Этот метод возвращает Task объект, и является асинхронным по определению.

Я хочу иметь возможность наблюдать, когда вызывается этот метод, поэтому я могу постоянно отслеживать размер и длину буфера и принимать решения на основе этого.

Я пытаюсь реализовать это с помощью Reactive Extension для.NET. Тем не менее, я не могу придумать дизайн, который позволяет мне постоянно слушать сообщения, добавляемые в буфер. Ниже моя текущая реализация:

public IObservable<Unit> Receive(InternalMessage message)
        {
            var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable
            return observable;
        }

Вот как я подписываюсь на наблюдаемое:

IObservable<Unit> receiverObservable = batchHandler.Receive(message);
            receiverObservable.Subscribe(
                x => Console.WriteLine("On next"),
                ex => //TODO,
                () => // Completed);

Я хочу, чтобы абонент вызывался каждый раз, когда метод Receive называется. Тем не менее, AFAIK, как только этот метод вызывается, наблюдаемое завершается и последовательность завершается, поэтому будущие вызовы Receive не будет слушать

Может ли кто-нибудь порекомендовать способ использования библиотек Rx.Net для реализации этого наблюдаемого шаблона, который я ищу, то есть как сохранить последовательность открытой и передать ее результаты для асинхронных методов?

1 ответ

Receive как вы это закодировали, возвращает IObservable<Unit>, представляющий завершение единственной задачи. Вы хотите подписаться на то, что возвращает IObservable<IObservable<Unit>> представляющий поток завершений задач.

Есть несколько способов сделать это, лучший из которых, вероятно, зависит от того, как настроен ваш класс и как вы его называете.

Вот самый ленивый:

Вы объявляете переменную уровня класса subject это представляет поток ваших звонков:

Subject<IObservable<Unit>> subject = new Subject<IObservable<Unit>>();
subject.Merge().Subscribe(
    x => Console.WriteLine("On next"),
    ex => { },  //TODO
    () => { }   // Completed
);

Затем, когда у вас есть новый вызов, вы просто добавляете его в тему.

IObservable<Unit> receiverObservable = batchHandler.Receive(message);
subject.OnNext(receiverObservable);

Причина, по которой это действительно лениво, заключается в том, что Rx является функциональным по своей сути, что имеет тенденцию смотреть свысока на переменные изменяемого состояния. Subjects в основном изменчивое состояние.

Лучший способ сделать это - выяснить, когда / почему вы звоните Receiveи структурировать это как наблюдаемое. Как только это будет сделано, вы можете работать с этим:

IObservable<Unit> sourceReasonsToCallReceive; // Most likely sourced from event

sourceReasonsToCallReceive.SelectMany(_ => batchHandler.Receive(message))
    .SubScribe(
    x => Console.WriteLine("On next"),
    ex => { },  //TODO
    () => { }   // Completed
);

Надеюсь, это поможет.

Другие вопросы по тегам