Сортировка буферизованных наблюдаемых

У меня есть поток токенов, которые создаются очень быстро, и процессор, который работает относительно медленно. Токены делятся на три подтипа, и я бы предпочел, чтобы они обрабатывались по приоритету. Итак, я хотел бы, чтобы токены были буферизованы после того, как они были произведены и ожидают обработки и того буфера, отсортированного по приоритету.

Вот мои занятия:

public enum Priority
{
    High   = 3,
    Medium = 2,
    Low    = 1
}

public class Base : IComparable<Base>
{
    public int Id { get; set; }

    public int CompareTo(Base other)
    {
        return Id.CompareTo(other.Id);
    }
}

public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }

public class Token : IComparable<Token>
{
    private readonly string _toString;

    public Foo Foo { get; }

    public Bar Bar { get; }

    public Baz Baz { get; }

    public Priority Priority =>
        Baz == null
            ? Bar == null
                ? Priority.High
                : Priority.Medium
            : Priority.Low;

    public int CompareTo(Token other)
    {
        if (Priority > other.Priority)
        {
            return -1;
        }

        if (Priority < other.Priority)
        {
            return 1;
        }

        switch (Priority)
        {
            case Priority.High:
                return Foo.CompareTo(other.Foo);
            case Priority.Medium:
                return Bar.CompareTo(other.Bar);
            case Priority.Low:
                return Baz.CompareTo(other.Baz);
            default:
                throw new ArgumentOutOfRangeException();
        }
    }

    public override string ToString()
    {
        return _toString;
    }

    public Token(Foo foo)
    {
        _toString = $"{nameof(Foo)}:{foo.Id}";
        Foo = foo;
    }

    public Token(Foo foo, Bar bar) : this(foo)
    {
        _toString += $":{nameof(Bar)}:{bar.Id}";
        Bar = bar;
    }

    public Token(Foo foo, Baz baz) : this(foo)
    {
        _toString += $":{nameof(Baz)}:{baz.Id}";
        Baz = baz;
    }
}

И вот мой код производителя:

var random = new Random();
var bazId = 0;
var barId = 0;

var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
                                      .Select(Convert.ToInt32)
                                      .Take(3)
                 select new Token(new Foo { Id = id }))
                .Publish();

var barTokens = (from fooToken in fooTokens
                 from id in Observable.Range(0, random.Next(5, 10))
                                      .Select(_ => Interlocked.Increment(ref barId))
                 select new Token(fooToken.Foo, new Bar { Id = id }))
                .Publish();

var bazTokens = (from barToken in barTokens
                 from id in Observable.Range(0, random.Next(1, 5))
                                      .Select(_ => Interlocked.Increment(ref bazId))
                 select new Token(barToken.Foo, new Baz { Id = id }))
                .Publish();

var tokens = bazTokens.Merge(barTokens)
                      .Merge(fooTokens)
                      .Do(dt =>
                      {
                          Console.ForegroundColor = ConsoleColor.Red;
                          Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
                      });

// Subscription

bazTokens.Connect();
barTokens.Connect();
fooTokens.Connect();

Однако я немного застрял в том, как буферизовать и сортировать токены. Если я сделаю это, токены, похоже, будут производиться и потребляться одновременно, что говорит о том, что за кулисами происходит некоторая буферизация, но я не могу это контролировать.

tokens.Subscribe(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

Если я использую поток данных TPL ActionBlockЯ вижу, что токены правильно создаются и обрабатываются, но я все еще не уверен, как выполнить сортировку.

var proc = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

tokens.Subscribe(dt => proc.Post(dt));

Любые идеи или указатели, куда идти дальше, будут оценены!

Обновить:

У меня есть что-то для работы. Я добавил помощника для очистки кода для отображения тестовых данных:

private static void Display(Token dt, ConsoleColor col, int? wait = null)
{
    if (wait.HasValue)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value));
    }
    Console.ForegroundColor = col;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}

Я добавил SortedSet:

var set = new SortedSet<Token>();

var tokens = bazTokens
    .Merge(barTokens)
    .Merge(fooTokens)
    .Do(dt => Display(dt, ConsoleColor.Red));

tokens.Subscribe(dt => set.Add(dt));

И я также добавил потребителя, хотя я не фанат моей реализации:

var source = new CancellationTokenSource();

Task.Run(() =>
{
    while (!source.IsCancellationRequested)
    {
        var dt = set.FirstOrDefault();
        if (dt == null)
        {
            continue;
        }

        if (set.Remove(dt))
        {
            Display(dt, ConsoleColor.Green, 250);
        }
    }
}, source.Token);

Итак, теперь я получаю именно те результаты, которые я ищу, но а) Я не доволен while опрос и б) Если я хочу нескольких потребителей, я столкнусь с условиями гонки. Итак, я все еще ищу лучшие реализации, если у кого-то есть!

4 ответа

Решение

Хорошо, я использовал нормальный lock для доступа к SortedSetзатем увеличил число потребителей, и, похоже, он работает нормально, поэтому, хотя я не смог придумать решение с полным RX или разделенным RX / TPL DataFlow, теперь это делает то, что я хочу, поэтому я буду просто покажите изменения, которые я сделал в дополнение к обновлению в исходном вопросе, и оставьте его там.

var set = new SortedSet<Token>();
var locker = new object();

var tokens = bazTokens
    .Merge(barTokens)
    .Merge(fooTokens)
    .Do(dt => Display(dt, ConsoleColor.Red));

tokens.Subscribe(dt =>
{
    lock (locker)
    {
        set.Add(dt);
    }
});

for (var i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Run(() =>
    {
        while (!source.IsCancellationRequested)
        {
            Token dt;

            lock (locker)
            {
                dt = set.FirstOrDefault();
            }

            if (dt == null)
            {
                continue;
            }

            bool removed;

            lock (locker)
            {
                removed = set.Remove(dt);
            }

            if (removed)
            {
                Display(dt, ConsoleColor.Green, 750);
            }
        }
    }, source.Token);
}

Спасибо людям, которые разместили решения, я ценю время, которое вы потратили.

Контейнер, который вы хотите, является приоритетной очередью, к сожалению, во время выполнения.net реализации нет (есть в C++ stl/cli, но priority_queue не доступен для других языков из этого).

Существуют контейнеры, не относящиеся к MS, которые выполняют эту роль, вам нужно будет искать и просматривать результаты, чтобы выбрать тот, который соответствует вашим потребностям.

Используя Dataflow, вы можете фильтровать токены так, чтобы каждый уровень приоритета проходил по другому пути в вашем конвейере. Токены фильтруются с помощью предиката для каждой приоритетной ссылки. Тогда вам решать, как вы хотите отдавать предпочтение в зависимости от приоритета.

Сортировка:

var highPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var midPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var lowPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var proc = new BufferBlock<Token>();

proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);

tokens.Subscribe(dt => proc.Post(dt));

Один из способов отдать предпочтение элементам с более высоким приоритетом - разрешить последовательную обработку, отличную от стандартной. Вы можете сделать это, установив MaxDegreeOfParallelism для каждого приоритетного блока.

Предоставление предпочтения:

var highPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 3}
var highPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, highPriOptions);

var midPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 2}   
var midPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, midPriOptions);

var lowPriority = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

var proc = new BufferBlock<Token>();

proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);

tokens.Subscribe(dt => proc.Post(dt));

Эти образцы ни в коем случае не являются полными, но, по крайней мере, должны дать вам представление.

Я думаю, что загадка здесь в том, что то, что вы, по-видимому, действительно ищете, - это результаты модели, основанной на быстрых, горячих, толкающих источниках. То, что вы, похоже, хотите, - это "самый высокий" приоритет, который еще получен, но вопрос "получен чем?" Если бы у вас было несколько подписчиков, работающих с разной скоростью, у каждого из них могло бы быть собственное представление о том, что было "самым высоким".

Таким образом, я вижу, что вы хотите объединить источники в своего рода реактивную приоритетную (отсортированную) очередь, из которой вы извлекаете результаты, когда наблюдатель готов.

Я подошел к этому, используя сигнал обратно в буфер, говоря, что "мой единственный наблюдатель теперь готов увидеть состояние списка приоритетов". Это достигается с помощью перегрузки буфера, которая принимает наблюдаемый сигнал закрытия. Этот буфер содержит новый список полученных элементов, который я просто объединяю с последним списком, без 'наивысшего'.

Код только для демонстрации кода для целей этого вопроса - возможно, есть ошибки:

 using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RxTests
{
    class Program
    {
        static void Main(string[] args)
        {
            var p = new Program();
            p.TestPrioritisedBuffer();
            Console.ReadKey();


        }

        void TestPrioritisedBuffer()
        {
            var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Do((source) => Console.WriteLine("Source1:"+source));
            var source2 = Observable.Interval(TimeSpan.FromSeconds(5)).Scan((x,y)=>(x+100)).Do((source) => Console.WriteLine("Source2:" + source)); ;

            BehaviorSubject<bool> closingSelector = new BehaviorSubject<bool>(true);



            var m = Observable.Merge(source1, source2).
                Buffer(closingSelector).
                Select(s => new { list =s.ToList(), max=(long)0 }).
               Scan((x, y) =>
               {
                   var list = x.list.Union(y.list).OrderBy(k=>k);

                   var max = list.LastOrDefault();


                   var res = new
                   {
                       list = list.Take(list.Count()-1).ToList(),
                       max= max
                   };

                   return res;



               }
               ).
               Do((sorted) => Console.WriteLine("Sorted max:" + sorted.max + ".  Priority queue length:" + sorted.list.Count)).
               ObserveOn(Scheduler.Default); //observe on other thread

            m.Subscribe((v)=> { Console.WriteLine("Observed: "+v.max); Thread.Sleep(3000); closingSelector.OnNext(true); }) ;
        }
    }
}
Другие вопросы по тегам