Почему мой пример разрушителя такой медленный?

Я взял пример кода из примера Disruptor.NET с вопросом о переполнении стека и изменил его, чтобы "измерить" время. Полный список ниже:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }

        public ValueEntry()
        {
            Console.WriteLine("New ValueEntry created");
        }
    }

    public class ValueAdditionHandler : IEventHandler<ValueEntry>
    {
        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            Program.sw.Stop();
            long microseconds = Program.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
            Console.WriteLine("elapsed microseconds = " + microseconds);
            Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
        }
    }

    class Program
    {
        public static Stopwatch sw = Stopwatch.StartNew();

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 16;  // Must be multiple of 2

        static void Main(string[] args)
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            disruptor.HandleEventsWith(new ValueAdditionHandler());

            var ringBuffer = disruptor.Start();

            while (true)
            {
                var valueToSet = _random.Next();
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                sw.Restart();
                ringBuffer.Publish(sequenceNo);

                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                Thread.Sleep(1000);
            }
        }
    }
}

И вывод:

New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
Published entry 0, value 1510145842
elapsed microseconds = 2205
Event handled: Value = 1510145842 (processed event 0
Published entry 1, value 1718075893
elapsed microseconds = 85
Event handled: Value = 1718075893 (processed event 1
Published entry 2, value 1675907645
elapsed microseconds = 32
Event handled: Value = 1675907645 (processed event 2
Published entry 3, value 1563009446
elapsed microseconds = 75
Event handled: Value = 1563009446 (processed event 3
Published entry 4, value 1782914062
elapsed microseconds = 34
Event handled: Value = 1782914062 (processed event 4
Published entry 5, value 1516398244
elapsed microseconds = 50
Event handled: Value = 1516398244 (processed event 5
Published entry 6, value 76829327
elapsed microseconds = 50
Event handled: Value = 76829327 (processed event 6

Таким образом, передача данных из одного потока в другой занимает около 50 микросекунд. Но это совсем не быстро! "Текущая версия Disruptor может делать ~50 нс между потоками со скоростью 1 миллион сообщений в секунду". Так что мои результаты в 1000 раз медленнее, чем ожидалось.

Что не так с моим примером и как добиться скорости 50 нс?

Я изменил программу выше и теперь получаю задержку в 1 микросекунду, что намного лучше. Тем не менее, я все еще жду ответа от disruptor шаблонные эксперты. Я ищу пример, который может доказать, что я действительно могу передавать данные за 50 нс.

Также я написал тот же тест, используя BlockingCollection и получил в среднем 14 микросекунд, что доказывает, что Disruptor быстрее:

Использование BlockingCollection:

average = 14 minimum = 0 0-5 = 890558, 5-10 = 1773781, 10-30 = 6900128, >30 = 435433

Используя Disruptor:

average = 0 minimum = 0 0-5 = 9908469, 5-10 = 64464, 10-30 = 19902, >30 = 7065

Код BlockingCollection:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
            //   Console.WriteLine("New ValueEntry created");
        }
    }

    //public class ValueAdditionHandler : IEventHandler<ValueEntry>
    //{
    //    public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
    //    {

    //        long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    //        Program.results[data.Value] = microseconds;
    //        //Console.WriteLine("elapsed microseconds = " + microseconds);
    //        //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
    //    }
    //}

    class Program
    {
        public const int length = 10000000;
        public static Stopwatch[] sw = new Stopwatch[length];
        public static long[] results = new long[length];

        static BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(150);

        static void Main(string[] args)
        {
            for (int i = 0; i < length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            // A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(() =>
            {
                while (!dataItems.IsCompleted)
                {

                    ValueEntry ve = null;
                    try
                    {
                        ve = dataItems.Take();
                        long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
                        results[ve.Value] = microseconds;

                        //Console.WriteLine("elapsed microseconds = " + microseconds);
                        //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
                    }
                    catch (InvalidOperationException) { }
                }
            }, TaskCreationOptions.LongRunning);

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;

                ValueEntry entry = new ValueEntry();
                entry.Value = valueToSet;

                sw[i].Restart();
                dataItems.Add(entry);

                //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
                //Thread.Sleep(1000);
            }

            // Wait until all events are delivered
            Thread.Sleep(5000);

            long average = 0;
            long minimum = 10000000000;
            int firstFive = 0;
            int fiveToTen = 0;
            int tenToThirty = 0;
            int moreThenThirty = 0;

            // Do not count first 100 items because they could be extremely slow
            for (int i = 100; i < length; i++)
            {
                average += results[i];
                if (results[i] < minimum)
                {
                    minimum = results[i];
                }
                if (results[i] < 5)
                {
                    firstFive++;
                }
                else if (results[i] < 10)
                {
                    fiveToTen++;
                }
                else if (results[i] < 30)
                {
                    tenToThirty++;
                } else
                {
                    moreThenThirty++;
                }
            }
            average /= (length - 100);
            Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
        }
    }
}

Код нарушителя:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
            //   Console.WriteLine("New ValueEntry created");
        }
    }

    public class ValueAdditionHandler : IEventHandler<ValueEntry>
    {
        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {

            long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
            Program.results[data.Value] = microseconds;
            //Console.WriteLine("elapsed microseconds = " + microseconds);
            //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
        }
    }

    class Program
    {
        public const int length = 10000000;
        public static Stopwatch[] sw = new Stopwatch[length];
        public static long[] results = new long[length];

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 1024;  // Must be multiple of 2

        static void Main(string[] args)
        {
            for (int i = 0; i < length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            disruptor.HandleEventsWith(new ValueAdditionHandler());

            var ringBuffer = disruptor.Start();

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                sw[i].Restart();
                ringBuffer.Publish(sequenceNo);

                //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                //Thread.Sleep(1000);
            }

            // wait until all events are delivered
            Thread.Sleep(5000);

            long average = 0;
            long minimum = 10000000000;
            int firstFive = 0;
            int fiveToTen = 0;
            int tenToThirty = 0;
            int moreThenThirty = 0;

            // Do not count first 100 items because they could be extremely slow
            for (int i = 100; i < length; i++)
            {
                average += results[i];
                if (results[i] < minimum)
                {
                    minimum = results[i];
                }
                if (results[i] < 5)
                {
                    firstFive++;
                }
                else if (results[i] < 10)
                {
                    fiveToTen++;
                }
                else if (results[i] < 30)
                {
                    tenToThirty++;
                }
                else
                {
                    moreThenThirty++;
                }
            }
            average /= (length - 100);
            Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
        }
    }
}

1 ответ

Я прочитал код BlockingCollecton, вы добавляете много Console.WriteLine в Disruptor, но никого в BlockingCollection, Console.WriteLine медленный, внутри есть замок.

Ваш RingBufferSize слишком мала, это влияет на производительность, должно быть 1024 или больше.

а также while (!dataItems.IsCompleted) может возникнуть проблема, BlockCollection всегда находится в состоянии добавления, это приведет к преждевременному завершению потока.

Task.Factory.StartNew(() => {
    while (!dataItems.IsCompleted)
    {

        ValueEntry ve = null;
        try
        {
    ve = dataItems.Take();
    long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    results[ve.Value] = microseconds;

    //Console.WriteLine("elapsed microseconds = " + microseconds);
    //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
        }
        catch (InvalidOperationException) { }
    }
}, TaskCreationOptions.LongRunning);


for (int i = 0; i < length; i++)
{
    var valueToSet = i;

    ValueEntry entry = new ValueEntry();
    entry.Value = valueToSet;

    sw[i].Restart();
    dataItems.Add(entry);

    //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
    //Thread.Sleep(1000);
}

Я переписал ваш код, Disruptor в 10 раз быстрее, чем BlockingCollection с несколькими производителями (10 параллельных продуктов), в 2 раза быстрее, чем BlockingCollection с одним производителем:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using NUnit.Framework;

namespace DisruptorTest.Ds
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class MyHandler : IEventHandler<ValueEntry>
    {
        public void OnEvent(ValueEntry data, long sequence, bool endOfBatch)
        {
        }
    }

    [TestFixture]
    public class DisruptorPerformanceTest
    {
        private volatile bool collectionAddEnded;

        private int producerCount = 10;
        private int runCount = 1000000;
        private int RingBufferAndCapacitySize = 1024;

        [TestCase()]
        public async Task TestBoth()
        {
            for (int i = 0; i < 1; i++)
            {
                foreach (var rs in new int[] {64, 512, 1024, 2048 /*,4096,4096*2*/})
                {
                    Console.WriteLine($"RingBufferAndCapacitySize:{rs}, producerCount:{producerCount}, runCount:{runCount} of {i}");
                    RingBufferAndCapacitySize = rs;
                    await DisruptorTest();
                    await BlockingCollectionTest();
                }
            }
        }

        [TestCase()]
        public async Task BlockingCollectionTest()
        {
            var sw = new Stopwatch();
            BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(RingBufferAndCapacitySize);

            sw.Start();

            collectionAddEnded = false;

            // A simple blocking consumer with no cancellation.
            var task = Task.Factory.StartNew(() =>
            {
                while (!collectionAddEnded && !dataItems.IsCompleted)
                {
                    //if (!dataItems.IsCompleted && dataItems.TryTake(out var ve))
                    if (dataItems.TryTake(out var ve))
                    {
                    }
                }
            }, TaskCreationOptions.LongRunning);


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        ValueEntry entry = new ValueEntry();
                        entry.Id = i;
                        dataItems.Add(entry);
                    }
                });
            }

            await Task.WhenAll(tasks);

            collectionAddEnded = true;
            await task;

            sw.Stop();

            Console.WriteLine($"BlockingCollectionTest Time:{sw.ElapsedMilliseconds/1000d}");
        }


        [TestCase()]
        public async Task DisruptorTest()
        {
            var disruptor =
                new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingBufferAndCapacitySize, TaskScheduler.Default,
                    producerCount > 1 ? ProducerType.Multi : ProducerType.Single, new BlockingWaitStrategy());
            disruptor.HandleEventsWith(new MyHandler());

            var _ringBuffer = disruptor.Start();

            Stopwatch sw = Stopwatch.StartNew();

            sw.Start();


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        long sequenceNo = _ringBuffer.Next();
                        _ringBuffer[sequenceNo].Id = 0;
                        _ringBuffer.Publish(sequenceNo);
                    }
                });
            }


            await Task.WhenAll(tasks);


            disruptor.Shutdown();

            sw.Stop();
            Console.WriteLine($"DisruptorTest Time:{sw.ElapsedMilliseconds/1000d}s");
        }
    }
}

BlockingCollectionTest с общим экземпляром ValueEntry (без нового ValueEntry() в цикле for)

  • RingBufferAndCapacitySize:64, ProducerCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor:16.962 с.

    BlockingCollection Время тестирования:18.399

  • RingBufferAndCapacitySize:512, ProducerCount: 10, runCount: 1000000 из 0 Время DisruptorTest:6,101 с

    BlockingCollection Время тестирования:19.526

  • RingBufferAndCapacitySize: 1024, ProducerCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 2,928 с

    BlockingCollection: время тестирования: 20.25

  • RingBufferAndCapacitySize: 2048, ProducerCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 2,448 с.

    BlockingCollection Время тестирования:20.649

BlockingCollectionTest создает новый ValueEntry() в цикле for

  • RingBufferAndCapacitySize:64, ProducerCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor:27,374 с.

    BlockingCollection Время тестирования:21.955

  • RingBufferAndCapacitySize:512, ProducerCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor:5,011 с

    BlockingCollection Время тестирования: 20,127

  • RingBufferAndCapacitySize: 1024, ProducerCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 2,877 с

    BlockingCollection Время тестирования:22.656

  • RingBufferAndCapacitySize: 2048, ProducerCount: 10, runCount: 1000000 из 0

    Время тестирования Disruptor: 2,384 с.

    BlockingCollection Время тестирования:23.567

https://www.cnblogs.com/darklx/p/11755686.html

Здесь я исправил ваш код:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
         //   Console.WriteLine("New ValueEntry created");
        }
    }

    class Program
    {
        public const int length = 1000000;
        public static Stopwatch sw;

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 1024;  // Must be multiple of 2

        static void Main(string[] args)
        {
            sw = Stopwatch.StartNew();

            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            var ringBuffer = disruptor.Start();

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                ringBuffer.Publish(sequenceNo);

                //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                //Thread.Sleep(1000);
            }

            var elapsed = sw.Elapsed.Miliseconds();
            // wait until all events are delivered
            Thread.Sleep(10000);

            double average = /(double)length;
            Console.WriteLine("average = " + average);
        }
    }
}

Это должно правильно проверить, сколько времени требуется для каждого элемента.

Другие вопросы по тегам