Обходной путь для ограничения обработки WaitHandle.WaitAll 64?
Мое приложение порождает множество разных небольших рабочих потоков через ThreadPool.QueueUserWorkItem
который я отслеживаю через несколько ManualResetEvent
экземпляров. Я использую WaitHandle.WaitAll
метод, чтобы заблокировать мое приложение от закрытия, пока эти потоки не завершены.
У меня никогда не было проблем раньше, однако, поскольку мое приложение загружается больше, то есть создается больше потоков, я сейчас начинаю получать это исключение:
WaitHandles must be less than or equal to 64 - missing documentation
Каково лучшее альтернативное решение для этого?
Фрагмент кода
List<AutoResetEvent> events = new List<AutoResetEvent>();
// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
// do work
evt.Set();
});
...
WaitHandle.WaitAll(events.ToArray());
Временное решение
int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
}
});
...
finished.WaitOne();
8 ответов
Создайте переменную, которая отслеживает количество запущенных задач:
int numberOfTasks = 100;
Создать сигнал:
ManualResetEvent signal = new ManualResetEvent(false);
Уменьшите количество задач, когда задача завершена:
if (Interlocked.Decrement(ref numberOftasks) == 0)
{
Если задачи не осталось, установите сигнал:
signal.Set();
}
Между тем, где-то еще, подождите, пока будет установлен сигнал:
signal.WaitOne();
Начиная с.NET 4.0, у вас есть еще две (и IMO, более чистые) опции, доступные вам.
Во-первых, использовать CountdownEvent
класс Это избавляет от необходимости самостоятельно обрабатывать увеличение и уменьшение:
int tasks = <however many tasks you're performing>;
// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
// Queue work.
ThreadPool.QueueUserWorkItem(() => {
// Do work
...
// Signal when done.
e.Signal();
});
// Wait till the countdown reaches zero.
e.Wait();
}
Тем не менее, есть еще более надежное решение, и это использовать Task
класс, вот так:
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Factory.StartNew(() => {
// Do work.
}
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Task.WaitAll(tasks);
С использованием Task
класс и призыв к WaitAll
намного чище, IMO, так как вы тратите меньше потоковых примитивов по всему коду (обратите внимание, никаких ручек ожидания); вам не нужно настраивать счетчик, обрабатывать увеличивающиеся / уменьшающиеся значения, вы просто настраиваете свои задачи и затем ждете их. Это позволяет коду быть более выразительным в том, что вы хотите делать, а не в примитивах того, как (по крайней мере, с точки зрения управления его распараллеливанием).
.NET 4.5 предлагает еще больше возможностей, вы можете упростить генерацию последовательности Task
экземпляры, вызывая статический Run
метод на Task
класс:
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Run(() => {
// Do work.
})
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Tasks.WaitAll(tasks);
Или вы можете воспользоваться библиотекой TPL DataFlow (она находится в System
пространство имен, так что оно является официальным, хотя оно загружается из NuGet, как Entity Framework) и использует ActionBlock<TInput>
, вот так:
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
// Do work.
});
// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
Обратите внимание, что ActionBlock<TInput>
по умолчанию обрабатывает по одному элементу за раз, поэтому, если вы хотите, чтобы он обрабатывал несколько действий одновременно, вам нужно установить количество одновременных элементов, которые вы хотите обработать в конструкторе, передав ExecutionDataflowBlockOptions
экземпляр и настройка MaxDegreeOfParallelism
свойство:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
Если ваше действие действительно потокобезопасно, то вы можете установить MaxDegreeOfParallelsim
собственность на DataFlowBlockOptions.Unbounded
:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});
Дело в том, что вы имеете точный контроль над тем, насколько параллельными должны быть ваши параметры.
Конечно, если у вас есть последовательность элементов, которые вы хотите передать в свой ActionBlock<TInput>
Например, вы можете связать ISourceBlock<TOutput>
реализация, чтобы накормить ActionBlock<TInput>
, вот так:
// The buffer block.
var buffer = new BufferBlock<int>();
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
// Do work.
});
// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock,
// Want to propagate completion state to the action block.
new DataflowLinkOptions {
PropagateCompletion = true,
},
// Can filter on items flowing through if you want.
i => true)
{
// Post 100 times to the *buffer*
foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
}
В зависимости от того, что вам нужно сделать, библиотека потока данных TPL становится гораздо более привлекательной опцией, поскольку она обрабатывает параллелизм для всех задач, связанных вместе, и позволяет вам очень точно определить, насколько параллельной должна быть каждая часть., сохраняя при этом правильное разделение проблем для каждого блока.
Ваш обходной путь не верен. Причина в том, что Set
а также WaitOne
может гоняться, если последний рабочий элемент вызывает threadCount
чтобы обнулиться до того, как поток очередей успел поставить в очередь все рабочие элементы. Исправить это просто. Относитесь к своей очереди как к самому рабочему элементу. инициализировать threadCount
до 1 и сделайте декремент и подайте сигнал о завершении очереди.
int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
}
});
...
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
finished.WaitOne();
В качестве личного предпочтения мне нравится использовать CountdownEvent
класс, чтобы сделать подсчет для меня.
var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
finished.Signal();
}
});
...
finished.Signal();
finished.Wait();
Добавив ответ dtb, вы можете превратить это в хороший простой класс.
public class Countdown : IDisposable
{
private readonly ManualResetEvent done;
private readonly int total;
private long current;
public Countdown(int total)
{
this.total = total;
current = total;
done = new ManualResetEvent(false);
}
public void Signal()
{
if (Interlocked.Decrement(ref current) == 0)
{
done.Set();
}
}
public void Wait()
{
done.WaitOne();
}
public void Dispose()
{
((IDisposable)done).Dispose();
}
}
Вот еще одно решение. Вот "события" - это список ManualResetEvent. Размер списка может быть больше 64 (MAX_EVENTS_NO).
int len = events.Count;
if (len <= MAX_EVENTS_NO)
{
WaitHandle.WaitAll(events.ToArray());
} else {
int start = 0;
int num = MAX_EVENTS_NO;
while (true)
{
if(start + num > len)
{
num = len - start;
}
List<ManualResetEvent> sublist = events.GetRange(start, num);
WaitHandle.WaitAll(sublist.ToArray());
start += num;
if (start >= len)
break;
}
}
protected void WaitAllExt(WaitHandle[] waitHandles)
{
//workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
const int waitAllArrayLimit = 64;
var prevEndInd = -1;
while (prevEndInd < waitHandles.Length - 1)
{
var stInd = prevEndInd + 1;
var eInd = stInd + waitAllArrayLimit - 1;
if (eInd > waitHandles.Length - 1)
{
eInd = waitHandles.Length - 1;
}
prevEndInd = eInd;
//do wait
var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
WaitHandle.WaitAll(whSubarray);
}
}
Я решил эту проблему, просто разбив число ожидающих событий на части без потери производительности, и оно отлично работает в производственной среде. Следует за кодом:
var events = new List<ManualResetEvent>();
// code omited
var newEvent = new ManualResetEvent(false);
events.Add(newEvent);
ThreadPool.QueueUserWorkItem(c => {
//thread code
newEvent.Set();
});
// code omited
var wait = true;
while (wait)
{
WaitHandle.WaitAll(events.Take(60).ToArray());
events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
wait = events.Any();
}
Добавление к ответу dtb, когда мы хотим иметь обратные вызовы.
using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;
class Program
{
static void Main(string[] args)
{
Main m = new Main();
m.TestMRE();
Console.ReadKey();
}
}
class Main
{
CalHandler handler = new CalHandler();
int numberofTasks =0;
public void TestMRE()
{
for (int j = 0; j <= 3; j++)
{
Console.WriteLine("Outer Loop is :" + j.ToString());
ManualResetEvent signal = new ManualResetEvent(false);
numberofTasks = 4;
for (int i = 0; i <= 3; i++)
{
CalHandler.count caller = new CalHandler.count(handler.messageHandler);
caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
}
signal.WaitOne();
}
}
private void NumberCallback(IAsyncResult result)
{
AsyncResult asyncResult = (AsyncResult)result;
CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;
int num = caller.EndInvoke(asyncResult);
Console.WriteLine("Number is :"+ num.ToString());
ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
if (Interlocked.Decrement(ref numberofTasks) == 0)
{
mre.Set();
}
}
}
public class CalHandler
{
public delegate int count(int number);
public int messageHandler ( int number )
{
return number;
}
}
Windows XP SP3 поддерживает максимум два WaitHandles. Для случаев более 2 Приложение WaitHandles преждевременно завершается.