Реализовать ограниченный и буферизованный исполнитель заданий

Я хочу реализовать ограниченный и буферизованный исполнитель заданий.

У него будет один метод:

public class CappedBufferedExecutor {
  public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec);
  public Task<bool> EnqueueAsync(string val);

Идея состоит в том, что значения ставятся в асинхронном порядке, и один раз fillTimeInMillisec Проходят миллисекунды, или буфер заполняется до предела уникальных значений, выполнение выполняется на практике, и все асинхронные задачи завершаются. В то время как выполнение выполнено (что может занять много времени), буфер может быть заново заполнен и могут быть выполнены новые асинхронные выполнения.

Я думал о чем-то в строках следующего псевдокода

  • Используя Timerподожди fillTime пройти, как только прошло, создайте новое задание, которое будет выполнять работу (см. ниже).
  • На новое значение, заблокируйте rwlock для чтения. Проверьте, заполнен ли буфер, если это так, подождите ManualResetEvent или TaskCompletionSource,
  • Добавить новое значение в буфер (HashSet<string>).
  • Если буфер заполнен, создайте новую задачу выполнения, которая заблокирует rwlock для записи, выполните работу со всеми собранными значениями и разбудите все нерешенные задачи, используя TaskCompletionSource,
  • Подожди TaskCompletionSource для буферизованной задачи (упомянутой в предыдущем шаге), которая будет выполнена.

Мои проблемы: как синхронизировать Timer и проверка заполненного буфера, как ждать, когда буфер заполнен, как переключаться между TaskCompletionSource случаи, когда начинается выполнение и допускается поступление новых значений.

Это просто концепция, так что не ожидайте многого:-)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
    class Program
        static void Main (string[] args)
            var buffer = CreateBuffer (); 

            var executor = new Executor<string> (SomeWork, buffer);
            executor.ProcessingStarted += Executor_ProcessingStarted;

            string userInput = null;

                userInput = Console.ReadLine ();

                buffer.Enqueue (userInput);
            while (!string.IsNullOrWhiteSpace (userInput));

            executor.Dispose ();


        private static IBuffer<string> CreateBuffer ()
            var buffer = new UniqueItemsBuffer<string> (3);

            buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised.");

            var alert = new Alert ();

            var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5));

            return bufferWithTimeout;


        static Random rnd = new Random (); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks

        public static bool SomeWork (string x)
            int delay = rnd.Next (1000, 8000);

            Console.WriteLine ($"  +++ Starting SomeWork for: {x}, delay: {delay} ms");

            Thread.Sleep (delay);

            Console.WriteLine ($"  --- SomeWork for: {x} - finished.");

            return true;


        private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items)
            Task.Run (() =>
                Task.WaitAll (items.ToArray ());
                Console.WriteLine ("Finished processing tasks, count = " + items.Count);

    //====== actual code ===================================================================================================================

    public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code

    public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success


    public interface IDataAvailableEvent<T>
        event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared)


    public interface IProcessingStartedEvent<T>
        event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started


    public interface IBuffer<T> : IDataAvailableEvent<T>
        bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list)
                               // returns: true = buffer is not empty, false = is emtpy

        void FlushBuffer ();   // should clear buffer and raise event (or not raise if buffer was already empty)


    // raises DataAvailable event when buffer cap is reached
    // ignores duplicates

    // you can only use this class from one thread

    public class UniqueItemsBuffer<T> : IBuffer<T>
        public event ItemsAvailable<T> DataAvailable;

        readonly int capacity;
        HashSet<T> items = new HashSet<T> ();

        public UniqueItemsBuffer (int capacity = 10)
            this.capacity = capacity;

        public bool Enqueue (T item)
            if (items.Add (item) && items.Count == capacity)
                FlushBuffer ();

            return items.Count > 0;

        public void FlushBuffer ()
            Console.WriteLine ("BUFFER :: flush, item count = " + items.Count);

            if (items.Count > 0)
                var itemsCopy = items.ToList ();
                items.Clear ();

                DataAvailable?.Invoke (itemsCopy);


    public class Executor<T> : IProcessingStartedEvent<T>, IDisposable
        public event ItemsAvailable<Task<bool>> ProcessingStarted;

        readonly ProcessItem<T> work;
        readonly IDataAvailableEvent<T> dataEvent;

        public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent)
            this.work = work;
            this.dataEvent = dataEvent;

            dataEvent.DataAvailable += DataEvent_DataAvailable;

        private void DataEvent_DataAvailable (IReadOnlyList<T> items)
            Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count);

            var list = new List<Task<bool>> ();

            foreach (var item in items)
                var task = Task.Run (() => work (item));

                list.Add (task);

            Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)");

            ProcessingStarted?.Invoke (list);

        public void Dispose ()
            dataEvent.DataAvailable -= DataEvent_DataAvailable;


    // if you want to fill buffer using many threads - use this decorator

    public sealed class ThreadSafeBuffer<T> : IBuffer<T>
        public event ItemsAvailable<T> DataAvailable;

        readonly IBuffer<T> target;
        readonly object sync = new object ();

        private ThreadSafeBuffer (IBuffer<T> target)
            this.target = target;
            this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P

        public bool Enqueue (T item)
            lock (sync) return target.Enqueue (item);

        public void FlushBuffer ()
            lock (sync) target.FlushBuffer ();

        public static IBuffer<T> MakeThreadSafe (IBuffer<T> target)
            if (target is ThreadSafeBuffer<T>) return target;

            return new ThreadSafeBuffer<T> (target);


    // and now if you want to process buffer after elapsed time

    public interface IAlert
        CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking)

    // I didn't use much timers, so idk is this code good

    public class Alert : IAlert
        List<System.Timers.Timer> timers = new List<System.Timers.Timer> (); // we need to keep reference to timer to avoid dispose

        public CancellationTokenSource CreateAlert (TimeSpan delay, Action action)
            var cts = new CancellationTokenSource ();

            var timer = new System.Timers.Timer (delay.TotalMilliseconds);
            timers.Add (timer);

            timer.Elapsed += (sender, e) =>
                timers.Remove (timer);

                timer.Dispose ();

                if (cts.Token.IsCancellationRequested) return;

                action.Invoke ();

            timer.AutoReset = false; // just one tick
            timer.Enabled = true;

            return cts;

    // thread safe (maybe :-D)

    public class BufferWithTimeout<T> : IBuffer<T>
        public event ItemsAvailable<T> DataAvailable;

        readonly IBuffer<T> target;
        readonly IAlert     alert;
        readonly TimeSpan   timeout;

        CancellationTokenSource cts;

        readonly object sync = new object ();

        public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout)
            this.target  = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread
            this.alert   = alert;
            this.timeout = timeout;

            target.DataAvailable += Target_DataAvailable; // TODO: unpin event

        private void Target_DataAvailable (IReadOnlyList<T> items)
            lock (sync)
                DisableTimer ();

            DataAvailable?.Invoke (items);

        public bool Enqueue (T item)
            lock (sync)
                bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer)

                // and now if buffer is empty, we cannot start timer

                if (hasItems && cts == null) // if timer is not enabled
                    Console.WriteLine ("TIMER :: created alert");
                    cts = alert.CreateAlert (timeout, HandleAlert);

                return hasItems;

        public void FlushBuffer ()
            lock (sync)
                DisableTimer ();
                target.FlushBuffer ();

        private void HandleAlert ()
            lock (sync)
                Console.WriteLine ("TIMER :: handler, will call buffer flush");
                target.FlushBuffer ();

        private void DisableTimer ()
            cts?.Cancel ();
            cts = null;

            Console.WriteLine ("TIMER :: disable");

Вы можете сделать что-то легко, используя Reactive Extensions. Базовый пример использования Buffer метод:

void Main()
    var c = new Processor();
    c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000));



    // When application has ended, flush the buffer

public sealed class Processor : IDisposable
    private IDisposable subscription;
    private Subject<string> subject = new Subject<string>();

    public void Enqueue(string item)

    public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan)
        // Create a subscription that will produce a set of strings every second 
        // or when buffer has 2 items, whatever comes first
        subscription = subject.AsObservable()
            .Buffer(bufferCloseTimespan, bufferSize)
            .Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second)
            .Subscribe(async list =>
                await Task.Run(() =>
                    Console.WriteLine(string.Join(",", list)); 
                    Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches.

    public void Dispose()

Это будет выводить


и через одну секунду


Код для rx находится на GitHub Больше на rx: http://www.introtorx.com/

Этот пример может быть улучшен для хранения ссылок на созданные Task объекты, чтобы их можно было правильно ожидать перед завершением приложения, но это даст вам общее представление.

