Пример разрушителя с 1 издателем и 4 параллельными потребителями
В этом примере /questions/43591398/primer-disruptornet/43591404#43591404 и здесь Почему мой пример с прерывателем такой медленный? (в конце вопроса) есть 1 издатель, который публикует товары, и 1 потребитель.
Но в моем случае работа с потребителем намного сложнее и занимает некоторое время. Поэтому я хочу, чтобы 4 потребителя обрабатывали данные параллельно.
Так, например, если производитель производит цифры: 1,2,3,4,5,6,7,8,9,10,11.
Я хочу, чтобы customer1 поймал 1,5,9,... consumer2 поймал 2,6,10,... consumer3 поймал 3,7,11,... consumer4 поймал 4,8,12... (ну, не совсем эти цифры, идея в том, что данные должны обрабатываться параллельно, мне все равно, какое определенное число обрабатывается на каком потребителе)
И помните, что это нужно делать параллельно, потому что в реальном приложении работа с потребителем довольно дорогая. Я ожидаю, что потребители будут выполняться в разных потоках, чтобы использовать возможности многоядерных систем.
Конечно, я могу просто создать 4 кольцевых буфера и подключить 1 потребителя к одному кольцевому буферу. Таким образом, я могу использовать оригинальный пример. Но я чувствую, что это не будет правильно. Скорее всего, было бы правильно создать 1 издателя (1 рингбуфер) и 4 потребителя - это то, что мне нужно.
Добавление ссылки на очень простой вопрос в группах Google: https://groups.google.com/forum/
Итак, у нас есть два варианта:
- один звонок - много потребителей (каждый потребитель будет "просыпаться" при каждом добавлении, все потребители должны иметь одинаковую WaitStrategy)
- много "один звонок - один потребитель" (каждый потребитель проснется только от данных, которые он должен обработать. у каждого потребителя может быть собственный WaitStrategy).
2 ответа
РЕДАКТИРОВАТЬ: я забыл упомянуть, что код частично взят из FAQ. Я понятия не имею, лучше или хуже этот подход, чем предложение Фрэнка.
Проект сильно недооценен, обидно, как выглядит красиво.
В любом случае попробуйте следующий фрагмент кода (основанный на вашей первой ссылке) - протестирован на моно и, кажется, все в порядке:
using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
namespace DisruptorTest
{
public sealed class ValueEntry
{
public long Value { get; set; }
}
public class MyHandler : IEventHandler<ValueEntry>
{
private static int _consumers = 0;
private readonly int _ordinal;
public MyHandler()
{
this._ordinal = _consumers++;
}
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
if ((sequence % _consumers) == _ordinal)
Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
else
Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);
}
}
class Program
{
private static readonly Random _random = new Random();
private const int SIZE = 16; // Must be multiple of 2
private const int WORKERS = 4;
static void Main()
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
for (int i=0; i < WORKERS; i++)
disruptor.HandleEventsWith(new MyHandler());
var ringBuffer = disruptor.Start();
while (true)
{
long sequenceNo = ringBuffer.Next();
ringBuffer[sequenceNo].Value = _random.Next();;
ringBuffer.Publish(sequenceNo);
Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
Console.ReadKey();
}
}
}
}
Из спецификации кольцевого буфера вы увидите, что каждый потребитель будет пытаться обработать ваш ValueEvent
, в твоем случае тебе это не нужно.
Я решил это так:
Добавьте обработанное поле к вашему ValueEvent
и когда потребитель принимает событие, которое он тестирует в этом поле, если оно уже обработано, он переходит к следующему полю.
Не самый красивый способ, но это как работает буфер.