Как потоки Async по сравнению с реактивным расширением?
Как сравнить следующие два? Rx более мощный?
Реактивное расширение:
var observable = Observable.Create<char>(async (observer, cancel) =>
{
while (true)
{
string line = await sr.ReadLineAsync();
if (line == null)
break;
observer.OnNext(line);
}
});
observable.Subscribe(
c => Console.WriteLine(c.ToString()),
() => end.Dispose());
Асинхронные потоки:
public async void Run(string path)
{
await foreach (var line in TestAsync())
{
Console.WriteLine(line);
}
}
private async IAsyncEnumerable<string> TestAsync()
{
while (true)
{
string line = await sr.ReadLineAsync();
if (line == null)
break;
yield return line;
}
}
1 ответ
The two features work together. PS: Forget about async streams
, think about await foreach
.
Async streams
Async streams are a (relatively) low level feature that allows asynchronous iteration. By itself, it doesn't offer any other capabilities like filtering, aggregation etc. It's pull based while Rx is push based.
You can use LINQ operators on an async stream through the System.Linq.Async library found in..... the ReacticeX.NET Github repo. It's fast, but doesn't offer the event processing functionality of Rx.
There's no notion of time time for example, much less a way to use a custom scheduler. There are no subscriptions, no error events. GroupBy will consume the entire source and emit group items as separate IAsyncEnumerable
instances, while Rx's GroupBy will emit separate Observables for each group.
In the question's example, IAsyncEnumerable is a natural fit since there's no event logic involved, just iterating over an asynchronous iterator.
If the example tried to poll eg a remote service and detect failure spikes (ie more failures per interval than a threshold) IAsyncEnumerable would be inappropriate as it would block waiting for all responses. In fact, we could't aggregate events per time at all.
Threading
None really - an IAsyncEnumerable or await foreach
call don't specify how events are produced or consumed. If we want to use a separate task to process an item, we have to create it ourselves, eg:
public async Task Run(string path)
{
await foreach (var line in LoadStockTrades())
{
var result = await Task.Run(()=>AnalyzeTrade(line));
Console.WriteLine($"{result} : {line});
}
}
Reactive Extensions
Reactive Extensions is a high level library that deals with event streams. It's push based, it understands time, but it's also slower than lower-level constructs like Async Streams or Channels.
In the question's example, Rx would be overkill. Polling and detecting spikes though is easy, with multiple windowing options.
System.Linq.Async can create an Observable from an IAsyncEnumerable with ToObservable, which means an IAsyncEnumerable can be used as a source for Rx.
Threading
By default, Rx is single threaded, which makes perfect sense for its main scenario - event stream processing.
On the other hand, Rx allows the publisher, subscriber and operators to run on the same or separate threads. In languages that don't have async/await
or DataFlow (eg Java,JavaScript), Rx is used to emulate concurrent processing pipelines by running the publisher and subscribers on different threads.