ConcurrentQueue с многопоточностью
Я новичок в многопоточности концепций. Мне нужно добавить определенное количество строк в очередь и обрабатывать их с несколькими потоками. С помощью ConcurrentQueue
что потокобезопасно.
Это то, что я пытался. Но все элементы, добавленные в параллельную очередь, не обрабатываются. обрабатываются только первые 4 элемента.
class Program
{
ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Task.WaitAll(workers);
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
}
3 ответа
Есть несколько проблем с вашей реализацией. Первое и очевидное, что worker
Метод удаляет только ноль или один элемент, а затем останавливается:
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Так должно быть:
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Однако этого будет недостаточно для правильной работы вашей программы. Если ваши рабочие удаляют очереди быстрее, чем ставит в очередь основной поток, они остановятся, пока основная задача все еще ставится в очередь. Вы должны дать сигнал рабочим, что они могут остановиться. Вы можете определить логическую переменную, которая будет установлена в true
после постановки в очередь:
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);
Рабочие проверят значение:
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
do {
string op;
while(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
}
while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
Console.WriteLine("Worker {0} is stopping.", workerId);
}
Ваши работники берут один предмет из queue
а затем закончите работу, просто дайте им работать до queue
пустой.
замещать if
в рабочей функции с while
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
Как вы запустите его, вы увидите, что около всех предметов будут обрабатываться два рабочих. Причина: ваш процессор имеет два ядра, оба работают, и нет "свободного слота" для создания нового задания. Если вы хотите, чтобы все ваши 4 задачи обрабатывали элементы, вы можете добавить задержку, чтобы дать вашему процессору время для создания других задач, например:
while (iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}
это дает вам вывод, что вы хотите:
...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...
Я на самом деле работал с ConcurrentQueue
совсем недавно и думал, что поделюсь этим. Я создал кастом ConcurrentQueue
называется CQItems
у этого есть методы, чтобы построить себя с данными параметрами. Внутренне, когда вы говорите ему построить х количество у элементов, он делает Parallel.For
вызов конструкторов элементов. Преимущество здесь в том, что когда вызывается метод или функция CQItems myCQ = CQItems.Generate(x, y)
этот вызов поступает из основного потока приложения, что означает, что ничто не может смотреть в очередь, пока она не закончит сборку. Но внутри класса очереди он строится с потоками и значительно быстрее, чем просто использование List<>
или же Queue<>
, В основном он генерирует вещи из воздуха, но иногда (на основе параметров) создает элементы из SQL - в основном генерирует объекты на основе существующих данных. Во всяком случае, эти два метода в CQItems
класс, который может помочь с этим:
public void Generate(int numberOfItems = 1, ItemOptions options = ItemOptions.NONE)
{
try
{
Type t = typeof(T);
if (t == typeof(Item))
throw new ArgumentException("Type " + t + " is not valid for generation. Please contact QA.");
else
Parallel.For(0, numberOfItems, (i, loopState) =>
{
try
{
GenerateItemByType(typeof(T), options);
}
catch
{
loopState.Stop();
throw;
}
});
}
catch (AggregateException ae)
{
ae.Handle((x) =>
{
if (x is SQLNullResultsException)
{
throw x;
}
else if (x is ImageNotTIFFException)
{
throw x;
}
else
{
throw x;
}
return true;
});
}
catch
{
throw;
}
finally
{
ItemManager.Instance.Clear();
}
}
private void GenerateItemByType(Type t, ItemOptions options = ItemOptions.NONE)
{
try
{
if (t == typeof(ItemA))
{
if ((options & ItemOptions.DUPLICATE) != 0)
{
this.Enqueue(new ItemB(options));
}
else
{
this.Enqueue(new ItemA(options));
}
}
else if (t == typeof(ItemC))
{
this.Enqueue(new ItemC(options));
}
}
catch
{
throw;
}
finally { }
}
Некоторые полезные заметки:
Поставка loopState
переменная в Parallel.For()
позволяет нам установить состояние, чтобы остановить, если исключение поймано. Это хорошо, потому что если ваш цикл запрашивается 1000 вещей, а 5-я итерация выдает исключение, он будет продолжать цикл. Вы можете этого захотеть, но в моем случае исключение должно выйти из многопоточного цикла. Вы все равно будете в конечном итоге с AggregateException
исходя из этого (очевидно, именно это и происходит, когда потоки выдают исключение). Разбор тех и только отправка первого может сэкономить МНОГО времени и головных болей, пытающихся прополоть гигантскую группу исключений, где более поздние исключения могут (или не могут) быть вызваны первыми в любом случае.
Что касается повторных бросков, я пытаюсь добавить оператор catch для большинства ожидаемых типов исключений, даже если я все равно планирую просто выбросить их в стек. Часть этого предназначена для устранения неполадок (возможность разбить на определенные исключения может быть удобной). Частично это связано с тем, что иногда я хочу иметь возможность делать другие вещи, такие как остановка цикла, изменение или добавление к сообщению об исключении, или в случае разрыва AggregateException
, отправьте только одно исключение в стек, а не весь агрегат. Просто пояснение для тех, кто может смотреть на это.
Наконец, если это сбивает с толку, Type(T)
ценность исходит от моего CQItems
сам класс:
public class CQItems<T> : ConcurrentQueue<Item>, IDisposable