Как организовать последовательность обработки данных с помощью.net RX
Каков наилучший способ организовать последовательность процессоров данных с.net RX?
- а. Вызовите методы на наблюдаемых, как observable.Do(log).Select(transformation).Do(work).Aggregate(someState)...
- б. Реализация пользовательских наблюдателей, если так - как их связать
- в. Другой вариант... А также, что является лучшим вариантом для обработки возможных исключений в самой наблюдаемой (см. Мои проблемы выше) и для обработки исключений внутри Do, Select и т. Д. (Как я знаю, лучшая практика заключается в том, что подписчики не должны выдавать).
Также мне иногда нужно разрешить возвращение исключений в виде некоторых элементов наблюдаемой последовательности без остановки последовательности (см. Этот вопрос Обработка исключений в Reactive Extensions без остановки последовательности)
1 ответ
Кажется, вам нужен рабочий процесс вместо Rx. Похоже, на основе ваших других вопросов * вы пытаетесь взять то, что выглядит очень хорошо подходящим для сценария организации очередей ProducerConsumer, и вводите его в Rx.
Похоже на тебя
- хотите прочитать из очереди и заблокировать до получения значения, а затем повторно подписаться. Просто используйте функции очереди BlockingCollection. По мере поступления значений их можно вставить в коллекцию из любого потока.
- на самом деле не нужно иметь последовательность значений, но рабочий процесс, который может дать сбой при любом заданном значении, перенаправить его и затем обработать следующее значение. Просто используйте очередь. Обработайте каждое значение и поместите результаты в следующую подходящую очередь (Ошибка / Успех) -> См. Шаблоны интеграции предприятия, в частности, Недопустимый канал сообщений и Канал мертвой буквы.
- потенциально обрабатывать эти значения параллельно. Посмотрите BlockingCollection.GetConsumingEnumerable или Disruptor для реализации высокой производительности. С этими инструментами вы можете иметь много производителей и потребителей. Конечно, вы можете сделать это с помощью Rx, но это просто опрос и связывание потока. Я думаю, что лучше говорить об этом
Я думаю, что ваше использование Rx должно окупиться, и вы не должны сражаться с ним (как любая другая технология или фреймворк).
* другие вопросы, такие как Обработка исключений в Reactive Extensions без остановки последовательности и Как сериализовать Observables в облако и обратно