Реализовать ограниченный и буферизованный исполнитель заданий
Я хочу реализовать ограниченный и буферизованный исполнитель заданий.
У него будет один метод:
public class CappedBufferedExecutor {
public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec);
public Task<bool> EnqueueAsync(string val);
}
Идея состоит в том, что значения ставятся в асинхронном порядке, и один раз fillTimeInMillisec
Проходят миллисекунды, или буфер заполняется до предела уникальных значений, выполнение выполняется на практике, и все асинхронные задачи завершаются. В то время как выполнение выполнено (что может занять много времени), буфер может быть заново заполнен и могут быть выполнены новые асинхронные выполнения.
Я думал о чем-то в строках следующего псевдокода
- Используя
Timer
подождиfillTime
пройти, как только прошло, создайте новое задание, которое будет выполнять работу (см. ниже). - На новое значение, заблокируйте
rwlock
для чтения. Проверьте, заполнен ли буфер, если это так, подождитеManualResetEvent
илиTaskCompletionSource
, - Добавить новое значение в буфер (
HashSet<string>
). - Если буфер заполнен, создайте новую задачу выполнения, которая заблокирует
rwlock
для записи, выполните работу со всеми собранными значениями и разбудите все нерешенные задачи, используяTaskCompletionSource
, - Подожди
TaskCompletionSource
для буферизованной задачи (упомянутой в предыдущем шаге), которая будет выполнена.
Мои проблемы: как синхронизировать Timer
и проверка заполненного буфера, как ждать, когда буфер заполнен, как переключаться между TaskCompletionSource
случаи, когда начинается выполнение и допускается поступление новых значений.
2 ответа
Это просто концепция, так что не ожидайте многого:-)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main (string[] args)
{
var buffer = CreateBuffer ();
var executor = new Executor<string> (SomeWork, buffer);
executor.ProcessingStarted += Executor_ProcessingStarted;
string userInput = null;
do
{
userInput = Console.ReadLine ();
buffer.Enqueue (userInput);
}
while (!string.IsNullOrWhiteSpace (userInput));
executor.Dispose ();
}
//----------------------------------------------------------------------------------------------------------------------------------
private static IBuffer<string> CreateBuffer ()
{
var buffer = new UniqueItemsBuffer<string> (3);
buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised.");
var alert = new Alert ();
var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5));
return bufferWithTimeout;
}
//----------------------------------------------------------------------------------------------------------------------------------
static Random rnd = new Random (); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks
public static bool SomeWork (string x)
{
int delay = rnd.Next (1000, 8000);
Console.WriteLine ($" +++ Starting SomeWork for: {x}, delay: {delay} ms");
Thread.Sleep (delay);
Console.WriteLine ($" --- SomeWork for: {x} - finished.");
return true;
}
//----------------------------------------------------------------------------------------------------------------------------------
private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items)
{
Task.Run (() =>
{
Task.WaitAll (items.ToArray ());
Console.WriteLine ("Finished processing tasks, count = " + items.Count);
});
}
}
//====== actual code ===================================================================================================================
public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code
public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success
//======================================================================================================================================
public interface IDataAvailableEvent<T>
{
event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared)
}
//======================================================================================================================================
public interface IProcessingStartedEvent<T>
{
event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started
}
//======================================================================================================================================
public interface IBuffer<T> : IDataAvailableEvent<T>
{
bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list)
// returns: true = buffer is not empty, false = is emtpy
void FlushBuffer (); // should clear buffer and raise event (or not raise if buffer was already empty)
}
//======================================================================================================================================
// raises DataAvailable event when buffer cap is reached
// ignores duplicates
// you can only use this class from one thread
public class UniqueItemsBuffer<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly int capacity;
HashSet<T> items = new HashSet<T> ();
public UniqueItemsBuffer (int capacity = 10)
{
this.capacity = capacity;
}
public bool Enqueue (T item)
{
if (items.Add (item) && items.Count == capacity)
{
FlushBuffer ();
}
return items.Count > 0;
}
public void FlushBuffer ()
{
Console.WriteLine ("BUFFER :: flush, item count = " + items.Count);
if (items.Count > 0)
{
var itemsCopy = items.ToList ();
items.Clear ();
DataAvailable?.Invoke (itemsCopy);
}
}
}
//======================================================================================================================================
public class Executor<T> : IProcessingStartedEvent<T>, IDisposable
{
public event ItemsAvailable<Task<bool>> ProcessingStarted;
readonly ProcessItem<T> work;
readonly IDataAvailableEvent<T> dataEvent;
public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent)
{
this.work = work;
this.dataEvent = dataEvent;
dataEvent.DataAvailable += DataEvent_DataAvailable;
}
private void DataEvent_DataAvailable (IReadOnlyList<T> items)
{
Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count);
var list = new List<Task<bool>> ();
foreach (var item in items)
{
var task = Task.Run (() => work (item));
list.Add (task);
}
Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)");
ProcessingStarted?.Invoke (list);
}
public void Dispose ()
{
dataEvent.DataAvailable -= DataEvent_DataAvailable;
}
}
//======================================================================================================================================
// if you want to fill buffer using many threads - use this decorator
public sealed class ThreadSafeBuffer<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly IBuffer<T> target;
readonly object sync = new object ();
private ThreadSafeBuffer (IBuffer<T> target)
{
this.target = target;
this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P
}
public bool Enqueue (T item)
{
lock (sync) return target.Enqueue (item);
}
public void FlushBuffer ()
{
lock (sync) target.FlushBuffer ();
}
public static IBuffer<T> MakeThreadSafe (IBuffer<T> target)
{
if (target is ThreadSafeBuffer<T>) return target;
return new ThreadSafeBuffer<T> (target);
}
}
//======================================================================================================================================
// and now if you want to process buffer after elapsed time
public interface IAlert
{
CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking)
}
// I didn't use much timers, so idk is this code good
public class Alert : IAlert
{
List<System.Timers.Timer> timers = new List<System.Timers.Timer> (); // we need to keep reference to timer to avoid dispose
public CancellationTokenSource CreateAlert (TimeSpan delay, Action action)
{
var cts = new CancellationTokenSource ();
var timer = new System.Timers.Timer (delay.TotalMilliseconds);
timers.Add (timer);
timer.Elapsed += (sender, e) =>
{
timers.Remove (timer);
timer.Dispose ();
if (cts.Token.IsCancellationRequested) return;
action.Invoke ();
};
timer.AutoReset = false; // just one tick
timer.Enabled = true;
return cts;
}
}
// thread safe (maybe :-D)
public class BufferWithTimeout<T> : IBuffer<T>
{
public event ItemsAvailable<T> DataAvailable;
readonly IBuffer<T> target;
readonly IAlert alert;
readonly TimeSpan timeout;
CancellationTokenSource cts;
readonly object sync = new object ();
public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout)
{
this.target = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread
this.alert = alert;
this.timeout = timeout;
target.DataAvailable += Target_DataAvailable; // TODO: unpin event
}
private void Target_DataAvailable (IReadOnlyList<T> items)
{
lock (sync)
{
DisableTimer ();
}
DataAvailable?.Invoke (items);
}
public bool Enqueue (T item)
{
lock (sync)
{
bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer)
// and now if buffer is empty, we cannot start timer
if (hasItems && cts == null) // if timer is not enabled
{
Console.WriteLine ("TIMER :: created alert");
cts = alert.CreateAlert (timeout, HandleAlert);
}
return hasItems;
}
}
public void FlushBuffer ()
{
lock (sync)
{
DisableTimer ();
target.FlushBuffer ();
}
}
private void HandleAlert ()
{
lock (sync)
{
Console.WriteLine ("TIMER :: handler, will call buffer flush");
target.FlushBuffer ();
}
}
private void DisableTimer ()
{
cts?.Cancel ();
cts = null;
Console.WriteLine ("TIMER :: disable");
}
}
}
Вы можете сделать что-то легко, используя Reactive Extensions. Базовый пример использования Buffer
метод:
void Main()
{
var c = new Processor();
c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000));
c.Enqueue("A");
c.Enqueue("B");
c.Enqueue("C");
Console.ReadLine();
// When application has ended, flush the buffer
c.Dispose();
}
public sealed class Processor : IDisposable
{
private IDisposable subscription;
private Subject<string> subject = new Subject<string>();
public void Enqueue(string item)
{
subject.OnNext(item);
}
public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan)
{
// Create a subscription that will produce a set of strings every second
// or when buffer has 2 items, whatever comes first
subscription = subject.AsObservable()
.Buffer(bufferCloseTimespan, bufferSize)
.Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second)
.Subscribe(async list =>
{
await Task.Run(() =>
{
Console.WriteLine(string.Join(",", list));
Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches.
});
});
}
public void Dispose()
{
subscription?.Dispose();
}
}
Это будет выводить
A,B
и через одну секунду
C
Код для rx находится на GitHub Больше на rx: http://www.introtorx.com/
Этот пример может быть улучшен для хранения ссылок на созданные Task
объекты, чтобы их можно было правильно ожидать перед завершением приложения, но это даст вам общее представление.