.net ядро ​​AsyncLocal теряет контекст с System.Reactive

Я хочу использовать AsyncLocal для передачи информации через асинхронные рабочие процессы для целей трассировки. Теперь я столкнулся с проблемой с RX.
Тиос мой тестовый код:

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

public class RxTest
{
    private readonly Subject<int> test = new Subject<int>();

    private readonly AsyncLocal<int> asyncContext = new AsyncLocal<int>();

    public void Test()
    {
        this.test
             // .ObserveOn(Scheduler.Default)
            .Subscribe(this.OnNextNormal);
        this.test
             // .ObserveOn(Scheduler.Default)
            .Delay(TimeSpan.FromMilliseconds(1))
            .Subscribe(this.OnNextDelayed);

        for (var i = 0; i < 2; i++)
        {
            var index = i;
            Task.Run(() =>
            {
                this.asyncContext.Value = index;
                Console.WriteLine(
                    $"Main\t\t{index} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
                this.test.OnNext(index);
            });
        }

        Console.ReadKey();
    }

    private void OnNextNormal(int obj)
    {
        Console.WriteLine(
            $"OnNextNormal\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }

    private void OnNextDelayed(int obj)
    {
        Console.WriteLine(
            $"OnNextDelayed\t{obj} (Thread: {Thread.CurrentThread.ManagedThreadId}): AsyncLocal.Value => {this.asyncContext.Value}");
    }
}

Выход:

Main 0 (Тема: 5): AsyncLocal.Value => 0
Основной 1 (поток: 6): AsyncLocal.Value => 1
OnNextNormal 0 (поток: 5): AsyncLocal.Value => 0
OnNextNormal 1 (поток: 6): AsyncLocal.Value => 1
OnNextDelayed 0 (поток: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (поток: 4): AsyncLocal.Value => 0

Как видите, AsyncLocal.Value не передается отложенным подписанным методам.
=> AsyncValue теряется на задержанной дорожке

Насколько я понимаю, обычный Subscribe() не использует планировщик, а Delay() использует планировщик.
Когда я использую ObserveOn() для обоих вызовов, вывод для обоих выглядит следующим образом

Main 0 (Тема: 5): AsyncLocal.Value => 0
Основной 1 (Тема: 7): AsyncLocal.Value => 1
OnNextNormal 0 (поток: 9): AsyncLocal.Value => 0
OnNextNormal 1 (поток: 9): AsyncLocal.Value => 0
OnNextDelayed 0 (поток: 4): AsyncLocal.Value => 0
OnNextDelayed 1 (поток: 4): AsyncLocal.Value => 0

=> AsyncValue теряется на каждом треке

Есть ли способ, как позволить ExecutionContext течь с RX?
Я только нашел это, но вот проблема, наоборот. Они решили вопрос о том, как протекает контекст наблюдателя. Я хочу передать контекст издателя.

Чего я хочу достичь, так это:

  1. Сообщение извне приходит ко мне на службу
  2. Распространить сообщение внутри службы (RX)
  3. При регистрации сообщения отформатируйте сообщение журнала с помощью MessageId.
  4. Я не хочу передавать сообщения где-либо

Заранее спасибо за ответы.

1 ответ

Свободно текущий контекст выполнения в Rx - это то, что делает его великолепным для большинства многопоточных сценариев. Вы можете применить контекст потока, обойдя запланированные методы, например так:

public static class Extensions
{
    public static IObservable<T> TaskPoolDelay<T>(this IObservable<T> observable, TimeSpan delay)
    {
        return Observable.Create<T>(
            observer => observable.Subscribe(
                onNext: value => Task.Delay(delay).ContinueWith(_ => observer.OnNext(value)),
                onError: observer.OnError,
                onCompleted: observer.OnCompleted
            )
        );
    }
}

Выход:

OnNextDelayed   2 (Thread: 6): AsyncLocal.Value => 2
OnNextDelayed   3 (Thread: 10): AsyncLocal.Value => 3
OnNextDelayed   1 (Thread: 7): AsyncLocal.Value => 1
OnNextDelayed   0 (Thread: 5): AsyncLocal.Value => 0

Это переносит контекст, но быстро усложняется для больших запросов. Я не уверен, если реализация IScheduler который сохраняет контекст, когда он уведомляет, будет работать хорошо. Если копирование сообщений не слишком много накладных расходов, это может быть лучше всего подходит для Rx.

Другие вопросы по тегам