ConcurrentQueue один элемент занят двумя потоками
Я хочу, чтобы два потока работали с одной очередью. Первый поток должен вызываться каждые 2 секунды, а второй поток - каждые 3 секунды. Оба потока должны начаться одновременно. У меня проблема при доступе к первому элементу очереди. Оба потока получают элемент с индексом 0. Иногда это происходит с другими элементами очереди, а не только с первым элементом. У меня есть такой вывод на консоли:
- Элемент 0 обработан 1 раз: 3:27:8
- Элемент 0 обработан 2 Время: 3:27:8
- Пункт 2 обработан 1 Время: 3:27:10
- Пункт 3 обработан 2 Время: 3:27:11
- Пункт 4 обработан 1 Время: 3:27:12
и так далее..
Вот код, который я использую:
ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();
for (int i = 0; i < 10; i++)
{
sharedQueue.Enqueue(i);
}
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
// create the new task
tasks[i] = new Task(() =>
{
while (sharedQueue.Count > 0)
{
// define a variable for the dequeue requests
int queueElement;
// take an item from the queue
bool gotElement = sharedQueue.TryDequeue(out queueElement);
// increment the count of items processed
if (gotElement)
{
DateTime dt = DateTime.Now;
Console.WriteLine("Item " + itemCount + "processed by "
+ Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
Interlocked.Increment(ref itemCount);
if (Task.CurrentId == 1)
Thread.Sleep(2000);
else
Thread.Sleep(3000);
}
}
});
// start the new task
tasks[i].Start();
}
// wait for the tasks to complete
Task.WaitAll(tasks);
// report on the number of items processed
Console.WriteLine("Items processed: {0}", itemCount);
// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
2 ответа
Заменить следующую строку:
Console.WriteLine("Item " + itemCount + "processed by " ...);
С этой строкой:
Console.WriteLine("Item " + queueElement + "processed by " ...);
Вероятно, проблема, которую вы видите, связана с выполнением задач Console.WriteLine
почти в одно и то же время, и оба видят одно и то же значение itemCount
потому что они чередуются таким образом, что Interlocked.Increment
звонки еще не произошли. Вероятно, имеет смысл распечатать queueElement
во всяком случае, так как это более значимым.
Смотрите отличный ответ Брайана Гидеона относительно вашего itemCount
проблема.
Вы можете переписать свой код, чтобы использовать BlockingCollection, а не ConcurrentQueue<T>
, С ним намного проще работать. BlockingCollection
является оберткой для одновременных коллекций В конфигурации по умолчанию резервное хранилище является ConcurrentQueue
, Таким образом, вы получаете ту же функциональность параллельной очереди, но с гораздо более приятным интерфейсом.
BlockingCollection<int> sharedQueue = new BlockingCollection<int>();
for (int i = 0; i < 10; i++)
{
sharedQueue.Add(i);
}
// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
// create the new task
tasks[i] = new Task(() =>
{
foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
{
DateTime dt = DateTime.Now;
Console.WriteLine("Item " + itemCount + "processed by "
+ Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
Interlocked.Increment(ref itemCount);
if (Task.CurrentId == 1)
Thread.Sleep(2000);
else
Thread.Sleep(3000);
}
});
// start the new task
tasks[i].Start();
}
GetConsumingEnumerable возвращает перечислитель, который будет получать следующий элемент из очереди до тех пор, пока очередь не станет пустой. Это также хорошо обрабатывает отмену, что немного сложнее с ConcurrentQueue
,
В общем, каждый раз, когда вы думаете об использовании ConcurrentQueue<T>
вы, вероятно, хотите BlockingCollection<T>
,