.net ядро AsyncLocal теряет контекст с System.Reactive
Я хочу использовать AsyncLocal для передачи информации через асинхронные рабочие процессы для целей трассировки. Теперь я столкнулся с проблемой с RX.
Тиос мой тестовый код:
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
public class RxTest
{
private readonly Subject<int> test = new Subject<int>();
private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();
public void Test()
{
this.test
// .ObserveOn(Scheduler.Default)
.Subscribe(this.OnNextNormal);
this.test
// .ObserveOn(Scheduler.Default)
.Delay(TimeSpan.FromMilliseconds(1))
.Subscribe(this.OnNextDelayed);
for (var i = 0; i < 2; i++)
{
var index = i;
Task.Run(() =>
{
this.asyncContext.Value = index;
Console.WriteLine(
$"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
this.test.OnNext(index);
});
}
Console.ReadKey();
}
private void OnNextNormal(int obj)
{
Console.WriteLine(
$"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
private void OnNextDelayed(int obj)
{
Console.WriteLine(
$"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
}
}
Выход:
Main 0 (Тема: 5): AsyncLocal.Value => 0
Основной 1 (поток: 6): AsyncLocal.Value => 1
OnNextNormal 0 (поток: 5): AsyncLocal.Value => 0
OnNextNormal 1 (поток: 6): AsyncLocal.Value => 1
OnNextDelayed 0 (поток: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (поток: 4): AsyncLocal.Value => 0
Как видите, AsyncLocal.Value не передается отложенным подписанным методам.
=> AsyncValue теряется на задержанной дорожке
Насколько я понимаю, обычный Subscribe() не использует планировщик, а Delay() использует планировщик.
Когда я использую ObserveOn() для обоих вызовов, вывод для обоих выглядит следующим образом
Main 0 (Тема: 5): AsyncLocal.Value => 0
Основной 1 (Тема: 7): AsyncLocal.Value => 1
OnNextNormal 0 (поток: 9): AsyncLocal.Value => 0
OnNextNormal 1 (поток: 9): AsyncLocal.Value => 0
OnNextDelayed 0 (поток: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (поток: 4): AsyncLocal.Value => 0
=> AsyncValue теряется на каждом треке
Есть ли способ, как позволить ExecutionContext течь с RX?
Я только нашел это, но вот проблема, наоборот. Они решили вопрос о том, как протекает контекст наблюдателя. Я хочу передать контекст издателя.
Чего я хочу достичь, так это:
- Сообщение извне приходит ко мне на службу
- Распространить сообщение внутри службы (RX)
- При регистрации сообщения отформатируйте сообщение журнала с помощью MessageId.
- Я не хочу передавать сообщения где-либо
Заранее спасибо за ответы.
1 ответ
Свободно текущий контекст выполнения в Rx - это то, что делает его великолепным для большинства многопоточных сценариев. Вы можете применить контекст потока, обойдя запланированные методы, например так:
public static class Extensions
{
public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
{
return Observable.Create<T>(
observer => observable.Subscribe(
onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
onError: observer.OnError,
onCompleted: observer.OnCompleted
)
);
}
}
Выход:
OnNextDelayed 2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed 3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed 1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed 0 (Thread: 5): AsyncLocal.Value => 0
Это переносит контекст, но быстро усложняется для больших запросов. Я не уверен, если реализация IScheduler
который сохраняет контекст, когда он уведомляет, будет работать хорошо. Если копирование сообщений не слишком много накладных расходов, это может быть лучше всего подходит для Rx.