Rx.net, как запустить последовательность (или иметь побочный эффект) только на первой подписке?

У меня есть экземпляр 'Conversation', который принимает поток сообщений, обрабатывает сообщения в соответствии с внутренним состоянием и выставляет свойство, которое можно наблюдать из сообщений.

Внутренне он использует State Pattern для делегирования обработки потока в соответствии с конечным автоматом.

Разговор имеет Action<> представляет изменение состояния, и поток составляется с использованием FromEvent привести к наблюдаемым изменениям состояния, которое подает в Select(state=>state.MessageStream(source)) а затем в Switch(), чтобы превратить конечный автомат в поток сообщений.

Когда разговор сначала создается, а наблюдаемое настроено, как описано выше, я хочу, чтобы разговор начался с установки начального состояния, но я думаю, что мне нужно избегать условий гонки, убедившись, что самый первый наблюдатель (я обычно ожидайте только одного, но для гибкости....) звонков Start() на разговоре, и последующие наблюдатели должны получить тот же поток (и подписаться на него) без этого побочного эффекта.

То, как я это реализовал, похоже на это, но интуитивное чувство подсказывает мне, что это неправильный подход к проблеме. Есть ли более краткий способ сделать это в Rx? (Код ниже не проверен, я просто в процессе продумывания подхода)

   return Observable.Defer<Message>(() =>
            {
                lock (gate)
                {
                    if (!Started)
                    {
                        try
                        {
                            Start();
                            Started = true;
                        }
                        catch (Exception ex)
                        {
                            throw new Exception("Could not start conversation:" + ex.ToString());
                        }

                    }
                    //this is the previously created observable based on FromEvent, etc.
                    return observable;

                }
            });

0 ответов

Другие вопросы по тегам