Rx.net, как запустить последовательность (или иметь побочный эффект) только на первой подписке?
У меня есть экземпляр 'Conversation', который принимает поток сообщений, обрабатывает сообщения в соответствии с внутренним состоянием и выставляет свойство, которое можно наблюдать из сообщений.
Внутренне он использует State Pattern для делегирования обработки потока в соответствии с конечным автоматом.
Разговор имеет Action<>
представляет изменение состояния, и поток составляется с использованием FromEvent
привести к наблюдаемым изменениям состояния, которое подает в Select(state=>state.MessageStream(source))
а затем в Switch(), чтобы превратить конечный автомат в поток сообщений.
Когда разговор сначала создается, а наблюдаемое настроено, как описано выше, я хочу, чтобы разговор начался с установки начального состояния, но я думаю, что мне нужно избегать условий гонки, убедившись, что самый первый наблюдатель (я обычно ожидайте только одного, но для гибкости....) звонков Start()
на разговоре, и последующие наблюдатели должны получить тот же поток (и подписаться на него) без этого побочного эффекта.
То, как я это реализовал, похоже на это, но интуитивное чувство подсказывает мне, что это неправильный подход к проблеме. Есть ли более краткий способ сделать это в Rx? (Код ниже не проверен, я просто в процессе продумывания подхода)
return Observable.Defer<Message>(() =>
{
lock (gate)
{
if (!Started)
{
try
{
Start();
Started = true;
}
catch (Exception ex)
{
throw new Exception("Could not start conversation:" + ex.ToString());
}
}
//this is the previously created observable based on FromEvent, etc.
return observable;
}
});