Сортировка буферизованных наблюдаемых
У меня есть поток токенов, которые создаются очень быстро, и процессор, который работает относительно медленно. Токены делятся на три подтипа, и я бы предпочел, чтобы они обрабатывались по приоритету. Итак, я хотел бы, чтобы токены были буферизованы после того, как они были произведены и ожидают обработки и того буфера, отсортированного по приоритету.
Вот мои занятия:
public enum Priority
{
High = 3,
Medium = 2,
Low = 1
}
public class Base : IComparable<Base>
{
public int Id { get; set; }
public int CompareTo(Base other)
{
return Id.CompareTo(other.Id);
}
}
public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }
public class Token : IComparable<Token>
{
private readonly string _toString;
public Foo Foo { get; }
public Bar Bar { get; }
public Baz Baz { get; }
public Priority Priority =>
Baz == null
? Bar == null
? Priority.High
: Priority.Medium
: Priority.Low;
public int CompareTo(Token other)
{
if (Priority > other.Priority)
{
return -1;
}
if (Priority < other.Priority)
{
return 1;
}
switch (Priority)
{
case Priority.High:
return Foo.CompareTo(other.Foo);
case Priority.Medium:
return Bar.CompareTo(other.Bar);
case Priority.Low:
return Baz.CompareTo(other.Baz);
default:
throw new ArgumentOutOfRangeException();
}
}
public override string ToString()
{
return _toString;
}
public Token(Foo foo)
{
_toString = $"{nameof(Foo)}:{foo.Id}";
Foo = foo;
}
public Token(Foo foo, Bar bar) : this(foo)
{
_toString += $":{nameof(Bar)}:{bar.Id}";
Bar = bar;
}
public Token(Foo foo, Baz baz) : this(foo)
{
_toString += $":{nameof(Baz)}:{baz.Id}";
Baz = baz;
}
}
И вот мой код производителя:
var random = new Random();
var bazId = 0;
var barId = 0;
var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
.Select(Convert.ToInt32)
.Take(3)
select new Token(new Foo { Id = id }))
.Publish();
var barTokens = (from fooToken in fooTokens
from id in Observable.Range(0, random.Next(5, 10))
.Select(_ => Interlocked.Increment(ref barId))
select new Token(fooToken.Foo, new Bar { Id = id }))
.Publish();
var bazTokens = (from barToken in barTokens
from id in Observable.Range(0, random.Next(1, 5))
.Select(_ => Interlocked.Increment(ref bazId))
select new Token(barToken.Foo, new Baz { Id = id }))
.Publish();
var tokens = bazTokens.Merge(barTokens)
.Merge(fooTokens)
.Do(dt =>
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
// Subscription
bazTokens.Connect();
barTokens.Connect();
fooTokens.Connect();
Однако я немного застрял в том, как буферизовать и сортировать токены. Если я сделаю это, токены, похоже, будут производиться и потребляться одновременно, что говорит о том, что за кулисами происходит некоторая буферизация, но я не могу это контролировать.
tokens.Subscribe(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
Если я использую поток данных TPL ActionBlock
Я вижу, что токены правильно создаются и обрабатываются, но я все еще не уверен, как выполнить сортировку.
var proc = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
tokens.Subscribe(dt => proc.Post(dt));
Любые идеи или указатели, куда идти дальше, будут оценены!
Обновить:
У меня есть что-то для работы. Я добавил помощника для очистки кода для отображения тестовых данных:
private static void Display(Token dt, ConsoleColor col, int? wait = null)
{
if (wait.HasValue)
{
Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value));
}
Console.ForegroundColor = col;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}
Я добавил SortedSet
:
var set = new SortedSet<Token>();
var tokens = bazTokens
.Merge(barTokens)
.Merge(fooTokens)
.Do(dt => Display(dt, ConsoleColor.Red));
tokens.Subscribe(dt => set.Add(dt));
И я также добавил потребителя, хотя я не фанат моей реализации:
var source = new CancellationTokenSource();
Task.Run(() =>
{
while (!source.IsCancellationRequested)
{
var dt = set.FirstOrDefault();
if (dt == null)
{
continue;
}
if (set.Remove(dt))
{
Display(dt, ConsoleColor.Green, 250);
}
}
}, source.Token);
Итак, теперь я получаю именно те результаты, которые я ищу, но а) Я не доволен while
опрос и б) Если я хочу нескольких потребителей, я столкнусь с условиями гонки. Итак, я все еще ищу лучшие реализации, если у кого-то есть!
4 ответа
Хорошо, я использовал нормальный lock
для доступа к SortedSet
затем увеличил число потребителей, и, похоже, он работает нормально, поэтому, хотя я не смог придумать решение с полным RX или разделенным RX / TPL DataFlow, теперь это делает то, что я хочу, поэтому я буду просто покажите изменения, которые я сделал в дополнение к обновлению в исходном вопросе, и оставьте его там.
var set = new SortedSet<Token>();
var locker = new object();
var tokens = bazTokens
.Merge(barTokens)
.Merge(fooTokens)
.Do(dt => Display(dt, ConsoleColor.Red));
tokens.Subscribe(dt =>
{
lock (locker)
{
set.Add(dt);
}
});
for (var i = 0; i < Environment.ProcessorCount; i++)
{
Task.Run(() =>
{
while (!source.IsCancellationRequested)
{
Token dt;
lock (locker)
{
dt = set.FirstOrDefault();
}
if (dt == null)
{
continue;
}
bool removed;
lock (locker)
{
removed = set.Remove(dt);
}
if (removed)
{
Display(dt, ConsoleColor.Green, 750);
}
}
}, source.Token);
}
Спасибо людям, которые разместили решения, я ценю время, которое вы потратили.
Контейнер, который вы хотите, является приоритетной очередью, к сожалению, во время выполнения.net реализации нет (есть в C++ stl/cli, но priority_queue не доступен для других языков из этого).
Существуют контейнеры, не относящиеся к MS, которые выполняют эту роль, вам нужно будет искать и просматривать результаты, чтобы выбрать тот, который соответствует вашим потребностям.
Используя Dataflow, вы можете фильтровать токены так, чтобы каждый уровень приоритета проходил по другому пути в вашем конвейере. Токены фильтруются с помощью предиката для каждой приоритетной ссылки. Тогда вам решать, как вы хотите отдавать предпочтение в зависимости от приоритета.
Сортировка:
var highPriority = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
var midPriority = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
var lowPriority = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
var proc = new BufferBlock<Token>();
proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);
tokens.Subscribe(dt => proc.Post(dt));
Один из способов отдать предпочтение элементам с более высоким приоритетом - разрешить последовательную обработку, отличную от стандартной. Вы можете сделать это, установив MaxDegreeOfParallelism
для каждого приоритетного блока.
Предоставление предпочтения:
var highPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 3}
var highPriority = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, highPriOptions);
var midPriOptions = new DataflowLinkOptions(){MaxDegreeOfParallelism = 2}
var midPriority = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}, midPriOptions);
var lowPriority = new ActionBlock<Token>(dt =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});
var proc = new BufferBlock<Token>();
proc.LinkTo(highPriority, dt => dt.Priority == Priority.High);
proc.LinkTo(midPriority, dt => dt.Priority == Priority.Medium);
proc.LinkTo(lowPriority, dt => dt.Priority == Priority.Low);
tokens.Subscribe(dt => proc.Post(dt));
Эти образцы ни в коем случае не являются полными, но, по крайней мере, должны дать вам представление.
Я думаю, что загадка здесь в том, что то, что вы, по-видимому, действительно ищете, - это результаты модели, основанной на быстрых, горячих, толкающих источниках. То, что вы, похоже, хотите, - это "самый высокий" приоритет, который еще получен, но вопрос "получен чем?" Если бы у вас было несколько подписчиков, работающих с разной скоростью, у каждого из них могло бы быть собственное представление о том, что было "самым высоким".
Таким образом, я вижу, что вы хотите объединить источники в своего рода реактивную приоритетную (отсортированную) очередь, из которой вы извлекаете результаты, когда наблюдатель готов.
Я подошел к этому, используя сигнал обратно в буфер, говоря, что "мой единственный наблюдатель теперь готов увидеть состояние списка приоритетов". Это достигается с помощью перегрузки буфера, которая принимает наблюдаемый сигнал закрытия. Этот буфер содержит новый список полученных элементов, который я просто объединяю с последним списком, без 'наивысшего'.
Код только для демонстрации кода для целей этого вопроса - возможно, есть ошибки:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxTests
{
class Program
{
static void Main(string[] args)
{
var p = new Program();
p.TestPrioritisedBuffer();
Console.ReadKey();
}
void TestPrioritisedBuffer()
{
var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Do((source) => Console.WriteLine("Source1:"+source));
var source2 = Observable.Interval(TimeSpan.FromSeconds(5)).Scan((x,y)=>(x+100)).Do((source) => Console.WriteLine("Source2:" + source)); ;
BehaviorSubject<bool> closingSelector = new BehaviorSubject<bool>(true);
var m = Observable.Merge(source1, source2).
Buffer(closingSelector).
Select(s => new { list =s.ToList(), max=(long)0 }).
Scan((x, y) =>
{
var list = x.list.Union(y.list).OrderBy(k=>k);
var max = list.LastOrDefault();
var res = new
{
list = list.Take(list.Count()-1).ToList(),
max= max
};
return res;
}
).
Do((sorted) => Console.WriteLine("Sorted max:" + sorted.max + ". Priority queue length:" + sorted.list.Count)).
ObserveOn(Scheduler.Default); //observe on other thread
m.Subscribe((v)=> { Console.WriteLine("Observed: "+v.max); Thread.Sleep(3000); closingSelector.OnNext(true); }) ;
}
}
}