Почему темы не рекомендуются в.NET Reactive Extensions?
В настоящее время я знакомлюсь с платформой Reactive Extensions для.NET и прорабатываю различные вводные ресурсы, которые я нашел (в основном http://www.introtorx.com/)
Наше приложение включает в себя несколько аппаратных интерфейсов, которые обнаруживают сетевые фреймы, это будут мои IObservables, тогда у меня будет множество компонентов, которые будут использовать эти фреймы или выполнять какое-то преобразование данных и создавать фреймы нового типа. Также будут другие компоненты, которые должны отображать каждый n-й кадр, например. Я убежден, что Rx будет полезен для нашего приложения, однако я борюсь с деталями реализации интерфейса IObserver.
В большинстве (если не во всех) ресурсах, которые я читал, говорилось, что я не должен сам реализовывать интерфейс IObservable, а использовать одну из предоставленных функций или классов. Из моего исследования видно, что создание Subject<IBaseFrame>
предоставил бы мне то, что мне нужно, у меня был бы мой единственный поток, который читает данные от аппаратного интерфейса и затем вызывает функцию OnNext моего Subject<IBaseFrame>
пример. Различные компоненты IObserver будут получать свои уведомления от этого субъекта.
Мое замешательство исходит из совета, приведенного в приложении к этому уроку, где говорится:
Избегайте использования типов предметов. Rx - фактически парадигма функционального программирования. Использование предметов означает, что мы теперь управляем государством, которое потенциально мутирует. Работать с изменяющимся состоянием и асинхронным программированием в одно и то же время очень сложно. Кроме того, многие операторы (методы расширения) были тщательно написаны, чтобы обеспечить правильное и согласованное время жизни подписок и последовательностей; когда вы вводите предметы, вы можете сломать это. В будущих выпусках может также наблюдаться значительное снижение производительности, если вы явно используете темы.
Мое приложение очень критично к производительности, и я собираюсь проверить производительность использования шаблонов Rx, прежде чем оно перейдет в рабочий код; однако меня беспокоит то, что я делаю что-то, что противоречит духу инфраструктуры Rx, используя класс Subject, и что будущая версия этой платформы будет снижать производительность.
Есть ли лучший способ сделать то, что я хочу? Поток аппаратного опроса будет работать непрерывно, независимо от того, есть ли наблюдатели или нет (HW-буфер будет резервировать в противном случае), так что это очень горячая последовательность. Затем мне нужно передать полученные кадры нескольким наблюдателям.
Любой совет будет принята с благодарностью.
5 ответов
Хорошо, если мы игнорируем мои догматические пути и игнорируем "предметы хороши / плохи" все вместе. Давайте посмотрим на проблемное пространство.
Могу поспорить, у вас есть один из двух стилей системы, в которую вы должны войти.
- Система вызывает событие или обратный звонок, когда приходит сообщение
- Вам нужно опросить систему, чтобы увидеть, есть ли какие-либо сообщения для обработки
Для варианта 1, просто, мы просто обертываем его соответствующим методом FromEvent, и все готово. В паб!
Для варианта 2 теперь нам нужно рассмотреть, как мы опрашиваем это и как это сделать эффективно. Кроме того, когда мы получаем значение, как мы публикуем его?
Я хотел бы представить, что вы хотите отдельную тему для голосования. Вы не хотели бы, чтобы какой-то другой кодер ударил ThreadPool/TaskPool и оставил вас в ситуации истощения ThreadPool. В качестве альтернативы вы не хотите хлопот переключения контекста (я думаю). Итак, предположим, что у нас есть свой собственный поток, у нас, вероятно, будет какой-то цикл While/Sleep, который мы будем использовать для опроса. Когда проверка находит некоторые сообщения, мы публикуем их. Ну, все это звучит идеально для Observable.Create. Теперь мы, вероятно, не можем использовать цикл While, поскольку это не позволит нам когда-либо возвращать Disposable, чтобы разрешить отмену. К счастью, вы прочитали всю книгу, поэтому разбираетесь в рекурсивном планировании!
Я представляю, что-то подобное может сработать. #Не испытано
public class MessageListener
{
private readonly IObservable<IMessage> _messages;
private readonly IScheduler _scheduler;
public MessageListener()
{
_scheduler = new EventLoopScheduler();
var messages = ListenToMessages()
.SubscribeOn(_scheduler)
.Publish();
_messages = messages;
messages.Connect();
}
public IObservable<IMessage> Messages
{
get {return _messages;}
}
private IObservable<IMessage> ListenToMessages()
{
return Observable.Create<IMessage>(o=>
{
return _scheduler.Schedule(recurse=>
{
try
{
var messages = GetMessages();
foreach (var msg in messages)
{
o.OnNext(msg);
}
recurse();
}
catch (Exception ex)
{
o.OnError(ex);
}
});
});
}
private IEnumerable<IMessage> GetMessages()
{
//Do some work here that gets messages from a queue,
// file system, database or other system that cant push
// new data at us.
//
//This may return an empty result when no new data is found.
}
}
Причина, по которой я действительно не люблю Предметы, заключается в том, что разработчик обычно не совсем понимает проблему. Взломайте тему, ткните ее здесь и всюду, а затем дайте догадке бедного помощника поддержки по поводу WTF. Когда вы используете методы Create/Generate и т. Д., Вы локализуете эффекты последовательности. Вы можете увидеть все это в одном методе, и вы знаете, что никто другой не добавляет неприятного побочного эффекта. Если я вижу предметные поля, мне нужно искать все места в классе, который он использует. Если какой-то MFer выставляет один публично, то все ставки сняты, кто знает, как эта последовательность используется! Async/Concurrency/Rx - это сложно. Вам не нужно усложнять задачу, позволяя побочным эффектам и программированию причинно-следственных связей еще больше крутить голову.
В общем, вы должны избегать использования Subject
Однако за то, что вы делаете здесь, я думаю, что они работают довольно хорошо. Я задал похожий вопрос, когда наткнулся на сообщение "избегать тем" в учебниках по Rx.
Цитировать Дейва Секстона (из Rxx)
"Субъекты являются компонентами Rx с отслеживанием состояния. Они полезны, когда вам нужно создать наблюдаемую в виде события наблюдаемую в качестве поля или локальной переменной".
Я склонен использовать их в качестве точки входа в Rx. Поэтому, если у меня есть код, который должен сказать "что-то случилось" (как у вас), я бы использовал Subject
и позвонить OnNext
, Затем разоблачить это как IObservable
для других, чтобы подписаться (вы можете использовать AsObservable()
на вашу тему, чтобы убедиться, что никто не может бросить тему и все испортить).
Вы также можете достичь этого с помощью события.NET и использовать FromEventPattern
, но если я только собираюсь превратить событие в IObservable
во всяком случае, я не вижу выгоды от проведения мероприятия вместо Subject
(что может означать, что я что-то здесь упускаю)
Однако, чего вам следует избегать, так это подписаться на IObservable
с Subject
не проходите Subject
в IObservable.Subscribe
метод.
Часто, когда вы управляете Субъектом, вы на самом деле просто реализуете функции уже в Rx, и, вероятно, не таким надежным, простым и расширяемым способом.
Когда вы пытаетесь адаптировать некоторый асинхронный поток данных в Rx (или создать асинхронный поток данных из того, который в настоящее время не асинхронный), наиболее распространенными случаями обычно являются:
Источником данных является событие: как говорит Ли, это самый простой случай: используйте FromEvent и направляйтесь в паб.
Источником данных является синхронная операция, и вам нужны обновления по запросу (например, веб-сервис или вызов базы данных): в этом случае вы можете использовать предложенный Ли подход, или для простых случаев вы можете использовать что-то вроде
Observable.Interval.Select(_ => <db fetch>)
, Возможно, вы захотите использовать DistinctUntilChanged(), чтобы предотвратить публикацию обновлений, когда ничего не изменилось в исходных данных.Источником данных является своего рода асинхронный API, который вызывает ваш обратный вызов: в этом случае используйте Observable.Create, чтобы подключить ваш обратный вызов для вызова OnNext/OnError/OnComplete для наблюдателя.
Источником данных является вызов, который блокируется до тех пор, пока не станут доступны новые данные (например, некоторые синхронные операции чтения сокетов): в этом случае вы можете использовать Observable.Create, чтобы обернуть императивный код, который читает из сокета и публикует его в Observer.OnNext. когда данные читаются. Это может быть похоже на то, что вы делаете с субъектом.
Использование Observable.Create против создания класса, который управляет Subject, вполне эквивалентно использованию ключевого слова yield против создания целого класса, реализующего IEnumerator. Конечно, вы можете написать IEnumerator, чтобы он был таким же чистым и хорошим гражданином, как и код выхода, но какой из них лучше инкапсулирован и чувствует более аккуратный дизайн? То же самое верно для Observable.Create против управления Предметами.
Observable.Create дает вам чистый шаблон для ленивой настройки и чистого демонтажа. Как вы достигаете этого с помощью класса, охватывающего тему? Вам нужен какой-то метод Start... как вы знаете, когда его вызывать? Или вы просто всегда запускаете его, даже когда никто не слушает? И когда вы закончите, как вы получите его, чтобы остановить чтение из сокета / опроса базы данных и т. Д.? У вас должен быть какой-то метод Stop, и вы все равно должны иметь доступ не только к IObservable, на который вы подписаны, но и к классу, который создал Subject в первую очередь.
С Observable.Create, все это в одном месте. Тело Observable.Create не запускается, пока кто-то не подпишется, поэтому, если никто не подписывается, вы никогда не используете свой ресурс. И Observable.Create возвращает Disposable, который может полностью отключить ваш ресурс / обратные вызовы и т. Д. - он вызывается, когда Observer отписывается. Время жизни ресурсов, которые вы используете для создания Observable, аккуратно связано с временем жизни самого Observable.
Цитированный текст блока в значительной степени объясняет, почему вы не должны использовать Subject<T>
Проще говоря, вы комбинируете функции наблюдателя и наблюдаемого, в то же время вводя какое-то состояние между ними (независимо от того, инкапсулируете вы или расширяете).
Здесь вы сталкиваетесь с неприятностями; эти обязанности должны быть отдельными и отличными друг от друга.
Тем не менее, в вашем конкретном случае, я бы порекомендовал вам разбить ваши проблемы на более мелкие части.
Во-первых, у вас есть горячая нить, и вы всегда отслеживаете аппаратные средства на наличие сигналов, на которые нужно подавать уведомления. Как бы вы сделали это нормально? События Итак, начнем с этого.
Давайте определим EventArgs
что ваше событие сработает.
// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
public BaseFrameEventArgs(IBaseFrame baseFrame)
{
// Validate parameters.
if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");
// Set values.
BaseFrame = baseFrame;
}
// Poor man's immutability.
public IBaseFrame BaseFrame { get; private set; }
}
Теперь класс, который будет запускать событие. Обратите внимание, что это может быть статический класс (поскольку у вас всегда есть поток, выполняющий мониторинг аппаратного буфера), или что-то, что вы вызываете по требованию, которое подписывается на это. Вы должны будете изменить это по мере необходимости.
public class BaseFrameMonitor
{
// You want to make this access thread safe
public event EventHandler<BaseFrameEventArgs> HardwareEvent;
public BaseFrameMonitor()
{
// Create/subscribe to your thread that
// drains hardware signals.
}
}
Итак, теперь у вас есть класс, который представляет событие. Observables хорошо работают с событиями. Настолько, что существует первоклассная поддержка для преобразования потоков событий (представьте, что поток событий представляет собой множественное срабатывание события) в IObservable<T>
реализации, если вы будете следовать стандартному шаблону событий, через статический FromEventPattern
метод на Observable
класс
С источником ваших событий, и FromEventPattern
метод, мы можем создать IObservable<EventPattern<BaseFrameEventArgs>>
легко (EventPattern<TEventArgs>
Класс воплощает то, что вы увидите в событии.NET, в частности, экземпляр, производный от EventArgs
и объект, представляющий отправителя), например так:
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();
// Create the observable. It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);
Конечно, вы хотите IObservable<IBaseFrame>
, но это легко, используя Select
метод расширения на Observable
класс для создания проекции (как в LINQ, и мы можем обернуть все это простым в использовании методом):
public IObservable<IBaseFrame> CreateHardwareObservable()
{
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();
// Create the observable. It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);
// Return the observable, but projected.
return observable.Select(i => i.EventArgs.BaseFrame);
}
Плохо обобщать, что темы не подходят для публичного интерфейса. Хотя это, безусловно, правда, что подход, основанный на реактивном программировании, не должен выглядеть так, но он определенно является хорошим вариантом улучшения / рефакторинга для вашего классического кода.
Если у вас есть обычное свойство с общедоступным аксессором и вы хотите уведомить об изменениях, ничего не говорится против его замены на BehaviorSubject. INPC или другие другие мероприятия просто не так чисты, и это лично меня утомляет. Для этого вы можете и должны использовать BehaviorSubjects в качестве открытых свойств вместо обычных свойств и канавы INPC или других событий.
Кроме того, Subject-интерфейс делает пользователей вашего интерфейса более осведомленными о функциональности ваших свойств и с большей вероятностью подпишется, чем просто получит значение.
Лучше всего использовать, если вы хотите, чтобы другие слушали / подписывались на изменения свойства.