Реализовать ограниченный и буферизованный исполнитель заданий

Я хочу реализовать ограниченный и буферизованный исполнитель заданий.

У него будет один метод:

public class CappedBufferedExecutor {
  public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec);
  public Task<bool> EnqueueAsync(string val);
}

Идея состоит в том, что значения ставятся в асинхронном порядке, и один раз fillTimeInMillisec Проходят миллисекунды, или буфер заполняется до предела уникальных значений, выполнение выполняется на практике, и все асинхронные задачи завершаются. В то время как выполнение выполнено (что может занять много времени), буфер может быть заново заполнен и могут быть выполнены новые асинхронные выполнения.

Я думал о чем-то в строках следующего псевдокода

  • Используя Timerподожди fillTime пройти, как только прошло, создайте новое задание, которое будет выполнять работу (см. ниже).
  • На новое значение, заблокируйте rwlock для чтения. Проверьте, заполнен ли буфер, если это так, подождите ManualResetEvent или TaskCompletionSource,
  • Добавить новое значение в буфер (HashSet<string>).
  • Если буфер заполнен, создайте новую задачу выполнения, которая заблокирует rwlock для записи, выполните работу со всеми собранными значениями и разбудите все нерешенные задачи, используя TaskCompletionSource,
  • Подожди TaskCompletionSource для буферизованной задачи (упомянутой в предыдущем шаге), которая будет выполнена.

Мои проблемы: как синхронизировать Timer и проверка заполненного буфера, как ждать, когда буфер заполнен, как переключаться между TaskCompletionSource случаи, когда начинается выполнение и допускается поступление новых значений.

2 ответа

Решение

Это просто концепция, так что не ожидайте многого:-)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main (string[] args)
        {
            var buffer = CreateBuffer (); 

            var executor = new Executor<string> (SomeWork, buffer);
            executor.ProcessingStarted += Executor_ProcessingStarted;

            string userInput = null;

            do
            {
                userInput = Console.ReadLine ();

                buffer.Enqueue (userInput);
            }
            while (!string.IsNullOrWhiteSpace (userInput));

            executor.Dispose ();
        }

        //----------------------------------------------------------------------------------------------------------------------------------

        private static IBuffer<string> CreateBuffer ()
        {
            var buffer = new UniqueItemsBuffer<string> (3);

            buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised.");

            var alert = new Alert ();

            var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5));

            return bufferWithTimeout;
        }

        //----------------------------------------------------------------------------------------------------------------------------------

        static Random rnd = new Random (); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks

        public static bool SomeWork (string x)
        {
            int delay = rnd.Next (1000, 8000);

            Console.WriteLine ($"  +++ Starting SomeWork for: {x}, delay: {delay} ms");

            Thread.Sleep (delay);

            Console.WriteLine ($"  --- SomeWork for: {x} - finished.");

            return true;
        }

        //----------------------------------------------------------------------------------------------------------------------------------

        private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items)
        {
            Task.Run (() =>
            {
                Task.WaitAll (items.ToArray ());
                Console.WriteLine ("Finished processing tasks, count = " + items.Count);
            });
        }
    }

    //====== actual code ===================================================================================================================

    public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code

    public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success

    //======================================================================================================================================

    public interface IDataAvailableEvent<T>
    {
        event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared)
    }

    //======================================================================================================================================

    public interface IProcessingStartedEvent<T>
    {
        event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started
    }

    //======================================================================================================================================

    public interface IBuffer<T> : IDataAvailableEvent<T>
    {
        bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list)
                               // returns: true = buffer is not empty, false = is emtpy

        void FlushBuffer ();   // should clear buffer and raise event (or not raise if buffer was already empty)
    }

    //======================================================================================================================================

    // raises DataAvailable event when buffer cap is reached
    // ignores duplicates

    // you can only use this class from one thread

    public class UniqueItemsBuffer<T> : IBuffer<T>
    {
        public event ItemsAvailable<T> DataAvailable;

        readonly int capacity;
        HashSet<T> items = new HashSet<T> ();

        public UniqueItemsBuffer (int capacity = 10)
        {
            this.capacity = capacity;
        }

        public bool Enqueue (T item)
        {
            if (items.Add (item) && items.Count == capacity)
            {
                FlushBuffer ();
            }

            return items.Count > 0;
        }

        public void FlushBuffer ()
        {
            Console.WriteLine ("BUFFER :: flush, item count = " + items.Count);

            if (items.Count > 0)
            {
                var itemsCopy = items.ToList ();
                items.Clear ();

                DataAvailable?.Invoke (itemsCopy);
            }
        }
    }

    //======================================================================================================================================

    public class Executor<T> : IProcessingStartedEvent<T>, IDisposable
    {
        public event ItemsAvailable<Task<bool>> ProcessingStarted;

        readonly ProcessItem<T> work;
        readonly IDataAvailableEvent<T> dataEvent;

        public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent)
        {
            this.work = work;
            this.dataEvent = dataEvent;

            dataEvent.DataAvailable += DataEvent_DataAvailable;
        }

        private void DataEvent_DataAvailable (IReadOnlyList<T> items)
        {
            Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count);

            var list = new List<Task<bool>> ();

            foreach (var item in items)
            {
                var task = Task.Run (() => work (item));

                list.Add (task);
            }

            Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)");

            ProcessingStarted?.Invoke (list);
        }

        public void Dispose ()
        {
            dataEvent.DataAvailable -= DataEvent_DataAvailable;
        }
    }

    //======================================================================================================================================

    // if you want to fill buffer using many threads - use this decorator

    public sealed class ThreadSafeBuffer<T> : IBuffer<T>
    {
        public event ItemsAvailable<T> DataAvailable;

        readonly IBuffer<T> target;
        readonly object sync = new object ();

        private ThreadSafeBuffer (IBuffer<T> target)
        {
            this.target = target;
            this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P
        }

        public bool Enqueue (T item)
        {
            lock (sync) return target.Enqueue (item);
        }

        public void FlushBuffer ()
        {
            lock (sync) target.FlushBuffer ();
        }

        public static IBuffer<T> MakeThreadSafe (IBuffer<T> target)
        {
            if (target is ThreadSafeBuffer<T>) return target;

            return new ThreadSafeBuffer<T> (target);
        }
    }

    //======================================================================================================================================

    // and now if you want to process buffer after elapsed time

    public interface IAlert
    {
        CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking)
    }

    // I didn't use much timers, so idk is this code good

    public class Alert : IAlert
    {
        List<System.Timers.Timer> timers = new List<System.Timers.Timer> (); // we need to keep reference to timer to avoid dispose

        public CancellationTokenSource CreateAlert (TimeSpan delay, Action action)
        {
            var cts = new CancellationTokenSource ();

            var timer = new System.Timers.Timer (delay.TotalMilliseconds);
            timers.Add (timer);

            timer.Elapsed += (sender, e) =>
            {
                timers.Remove (timer);

                timer.Dispose ();

                if (cts.Token.IsCancellationRequested) return;

                action.Invoke ();
            };

            timer.AutoReset = false; // just one tick
            timer.Enabled = true;

            return cts;
        }
    }

    // thread safe (maybe :-D)

    public class BufferWithTimeout<T> : IBuffer<T>
    {
        public event ItemsAvailable<T> DataAvailable;

        readonly IBuffer<T> target;
        readonly IAlert     alert;
        readonly TimeSpan   timeout;

        CancellationTokenSource cts;

        readonly object sync = new object ();

        public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout)
        {
            this.target  = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread
            this.alert   = alert;
            this.timeout = timeout;

            target.DataAvailable += Target_DataAvailable; // TODO: unpin event
        }

        private void Target_DataAvailable (IReadOnlyList<T> items)
        {
            lock (sync)
            {
                DisableTimer ();
            }

            DataAvailable?.Invoke (items);
        }

        public bool Enqueue (T item)
        {
            lock (sync)
            {
                bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer)

                // and now if buffer is empty, we cannot start timer

                if (hasItems && cts == null) // if timer is not enabled
                {
                    Console.WriteLine ("TIMER :: created alert");
                    cts = alert.CreateAlert (timeout, HandleAlert);
                }

                return hasItems;
            }
        }

        public void FlushBuffer ()
        {
            lock (sync)
            {
                DisableTimer ();
                target.FlushBuffer ();
            }
        }

        private void HandleAlert ()
        {
            lock (sync)
            {
                Console.WriteLine ("TIMER :: handler, will call buffer flush");
                target.FlushBuffer ();
            }
        }

        private void DisableTimer ()
        {
            cts?.Cancel ();
            cts = null;

            Console.WriteLine ("TIMER :: disable");
        }
    }
}

Вы можете сделать что-то легко, используя Reactive Extensions. Базовый пример использования Buffer метод:

void Main()
{
    var c = new Processor();
    c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000));

    c.Enqueue("A");
    c.Enqueue("B");
    c.Enqueue("C");

    Console.ReadLine(); 

    // When application has ended, flush the buffer
    c.Dispose(); 
}


public sealed class Processor : IDisposable
{
    private IDisposable subscription;
    private Subject<string> subject = new Subject<string>();

    public void Enqueue(string item)
    {
        subject.OnNext(item);       
    }

    public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan)
    {
        // Create a subscription that will produce a set of strings every second 
        // or when buffer has 2 items, whatever comes first
        subscription = subject.AsObservable()
            .Buffer(bufferCloseTimespan, bufferSize)
            .Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second)
            .Subscribe(async list =>
            {
                await Task.Run(() =>
                {
                    Console.WriteLine(string.Join(",", list)); 
                    Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches.
                });
            });
    }

    public void Dispose()
    {
        subscription?.Dispose();
    }
}

Это будет выводить

A,B

и через одну секунду

C

Код для rx находится на GitHub Больше на rx: http://www.introtorx.com/

Этот пример может быть улучшен для хранения ссылок на созданные Task объекты, чтобы их можно было правильно ожидать перед завершением приложения, но это даст вам общее представление.

Другие вопросы по тегам