Как сериализовать Observables в облако и обратно

Мне нужно разделить последовательность обработки (как в этом вопросе Как организовать последовательность обработчиков данных с помощью.net RX) на несколько вычислительных блоков в среде Azure.
Идея состоит в том, чтобы сериализовать наблюдаемую последовательность в очереди Azure (или служебную шину) и десериализовать ее обратно.
Если производитель или потребитель потерпел неудачу, другая сторона должна иметь возможность продолжать производить / потреблять.

Может ли кто-нибудь предложить элегантный способ сделать это и что использовать (очереди Azure или служебная шина)?
Кто-нибудь использовал поставщика TCP Observable - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider для таких проблем это безопасно для сбоев одной из сторон?

2 ответа

Решение

Представьте, что у вас есть очередь сообщений со следующим API

class MQ {

    public MQ();

    // send a single message from your message queue
    public void send(string keyPath, string msg);

    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);

}

Чтобы сделать этот RX-совместимым

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath

    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }

    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }

    void OnNext(string msg){
        _mq.send(msg);
    }

    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

Использовать его отказоустойчивым способом. Примечание. Повторная попытка будет повторной подпиской, если в OnError было сгенерировано исключение.

new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

Например, на стороне записи вы можете отправлять сообщение каждые две секунды и повторять подписку на генератор в случае ошибки. Обратите внимание, что вряд ли будет ошибка в Observable.Interval, который просто генерирует сообщение каждый промежуток времени, но представляет чтение из файла или какой-либо другой очереди сообщений.

var mq = new MQRX("users/1/article/2");

Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

Обратите внимание, что вам, вероятно, следует использовать метод расширения IObservable Catch, а не повторять вслепую, поскольку вы можете снова и снова получать одну и ту же ошибку. Повторите попытку (). Подписка (кв.м.);

Я написал свой собственный UDP для оболочки RX в 30 строк кода VB, и он обрабатывает тайм-ауты. TCP обертки были бы похожи, я думаю.

Imports System.Reactive
Imports System.Reactive.Linq
Imports System.Reactive.Threading.Tasks
Imports System.Threading
Imports System.Net
Imports System.Net.Sockets

Public Module UDP
    ''' <summary>
    ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional
    ''' timeout value between results.
    ''' </summary>
    ''' <param name="timeout"></param>
    ''' <returns></returns>
    ''' <remarks></remarks>
    Public Function StreamObserver(
                localPort As Integer, 
                Optional timeout As Nullable(Of TimeSpan) = Nothing
                ) As IObservable(Of UdpReceiveResult)

        Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
            Function() New UdpClient(localPort),
            Function(udpClient)
                Dim o = Observable.Defer(Function() udpClient.
                                                                ReceiveAsync().
                                                                ToObservable())
                If Not timeout Is Nothing Then
                    Return Observable.Timeout(o.Repeat(), timeout.Value)
                Else
                    Return o.Repeat()
                End If
            End Function
        )
    End Function
End Module

Вы можете сделать то же самое для стороны записи, если вам нужно. Затем вы просто используете обычные приемные методы для сериализации ваших данных в кадры UDP.

new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)).
        Select( function(udpResult) MyDeserializeFunction(udpResult)).
        Subscribe( sub (result) DoSomething(result), 
                   sub(error) DoSomethingWithError ) 
Другие вопросы по тегам