Пример разрушителя с 1 издателем и 4 параллельными потребителями

В этом примере /questions/43591398/primer-disruptornet/43591404#43591404 и здесь Почему мой пример с прерывателем такой медленный? (в конце вопроса) есть 1 издатель, который публикует товары, и 1 потребитель.

Но в моем случае работа с потребителем намного сложнее и занимает некоторое время. Поэтому я хочу, чтобы 4 потребителя обрабатывали данные параллельно.

Так, например, если производитель производит цифры: 1,2,3,4,5,6,7,8,9,10,11.

Я хочу, чтобы customer1 поймал 1,5,9,... consumer2 поймал 2,6,10,... consumer3 поймал 3,7,11,... consumer4 поймал 4,8,12... (ну, не совсем эти цифры, идея в том, что данные должны обрабатываться параллельно, мне все равно, какое определенное число обрабатывается на каком потребителе)

И помните, что это нужно делать параллельно, потому что в реальном приложении работа с потребителем довольно дорогая. Я ожидаю, что потребители будут выполняться в разных потоках, чтобы использовать возможности многоядерных систем.

Конечно, я могу просто создать 4 кольцевых буфера и подключить 1 потребителя к одному кольцевому буферу. Таким образом, я могу использовать оригинальный пример. Но я чувствую, что это не будет правильно. Скорее всего, было бы правильно создать 1 издателя (1 рингбуфер) и 4 потребителя - это то, что мне нужно.

Добавление ссылки на очень простой вопрос в группах Google: https://groups.google.com/forum/

Итак, у нас есть два варианта:

  • один звонок - много потребителей (каждый потребитель будет "просыпаться" при каждом добавлении, все потребители должны иметь одинаковую WaitStrategy)
  • много "один звонок - один потребитель" (каждый потребитель проснется только от данных, которые он должен обработать. у каждого потребителя может быть собственный WaitStrategy).

2 ответа

Решение

РЕДАКТИРОВАТЬ: я забыл упомянуть, что код частично взят из FAQ. Я понятия не имею, лучше или хуже этот подход, чем предложение Фрэнка.

Проект сильно недооценен, обидно, как выглядит красиво.
В любом случае попробуйте следующий фрагмент кода (основанный на вашей первой ссылке) - протестирован на моно и, кажется, все в порядке:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}

Из спецификации кольцевого буфера вы увидите, что каждый потребитель будет пытаться обработать ваш ValueEvent, в твоем случае тебе это не нужно.

Я решил это так:

Добавьте обработанное поле к вашему ValueEvent и когда потребитель принимает событие, которое он тестирует в этом поле, если оно уже обработано, он переходит к следующему полю.

Не самый красивый способ, но это как работает буфер.

Другие вопросы по тегам