Как сериализовать Observables в облако и обратно
Мне нужно разделить последовательность обработки (как в этом вопросе Как организовать последовательность обработчиков данных с помощью.net RX) на несколько вычислительных блоков в среде Azure.
Идея состоит в том, чтобы сериализовать наблюдаемую последовательность в очереди Azure (или служебную шину) и десериализовать ее обратно.
Если производитель или потребитель потерпел неудачу, другая сторона должна иметь возможность продолжать производить / потреблять.
Может ли кто-нибудь предложить элегантный способ сделать это и что использовать (очереди Azure или служебная шина)?
Кто-нибудь использовал поставщика TCP Observable - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider для таких проблем это безопасно для сбоев одной из сторон?
2 ответа
Представьте, что у вас есть очередь сообщений со следующим API
class MQ {
public MQ();
// send a single message from your message queue
public void send(string keyPath, string msg);
// Receive a single message from your message queue
public async Task<string> receive(keyPath);
}
Чтобы сделать этот RX-совместимым
class MQRX: IObserver<string> {
MQ _mq;
string _keyPath
MQRX(string keyPath){
_mq = mq;
_keyPath = keyPath;
}
IObservable<string> Observe(){
return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
}
void OnNext(string msg){
_mq.send(msg);
}
void OnError(Exception e){
// The message queue might not
// support serializing exceptions
// or it might or you might build
// a protocol for it.
}
}
Использовать его отказоустойчивым способом. Примечание. Повторная попытка будет повторной подпиской, если в OnError было сгенерировано исключение.
new MQRX("users/1/article/2").
Retry().
Subscribe((msg)=>Console.Writeln(msg));
Например, на стороне записи вы можете отправлять сообщение каждые две секунды и повторять подписку на генератор в случае ошибки. Обратите внимание, что вряд ли будет ошибка в Observable.Interval, который просто генерирует сообщение каждый промежуток времени, но представляет чтение из файла или какой-либо другой очереди сообщений.
var mq = new MQRX("users/1/article/2");
Observable.Interval(TimeSpan.FromSeconds(2)).
Select((x)=>x.ToString()).
Обратите внимание, что вам, вероятно, следует использовать метод расширения IObservable Catch, а не повторять вслепую, поскольку вы можете снова и снова получать одну и ту же ошибку. Повторите попытку (). Подписка (кв.м.);
Я написал свой собственный UDP для оболочки RX в 30 строк кода VB, и он обрабатывает тайм-ауты. TCP обертки были бы похожи, я думаю.
Imports System.Reactive
Imports System.Reactive.Linq
Imports System.Reactive.Threading.Tasks
Imports System.Threading
Imports System.Net
Imports System.Net.Sockets
Public Module UDP
''' <summary>
''' Generates an IObservable as a UDP stream on the IP endpoint with an optional
''' timeout value between results.
''' </summary>
''' <param name="timeout"></param>
''' <returns></returns>
''' <remarks></remarks>
Public Function StreamObserver(
localPort As Integer,
Optional timeout As Nullable(Of TimeSpan) = Nothing
) As IObservable(Of UdpReceiveResult)
Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
Function() New UdpClient(localPort),
Function(udpClient)
Dim o = Observable.Defer(Function() udpClient.
ReceiveAsync().
ToObservable())
If Not timeout Is Nothing Then
Return Observable.Timeout(o.Repeat(), timeout.Value)
Else
Return o.Repeat()
End If
End Function
)
End Function
End Module
Вы можете сделать то же самое для стороны записи, если вам нужно. Затем вы просто используете обычные приемные методы для сериализации ваших данных в кадры UDP.
new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)).
Select( function(udpResult) MyDeserializeFunction(udpResult)).
Subscribe( sub (result) DoSomething(result),
sub(error) DoSomethingWithError )