ConcurrentQueue один элемент занят двумя потоками

Я хочу, чтобы два потока работали с одной очередью. Первый поток должен вызываться каждые 2 секунды, а второй поток - каждые 3 секунды. Оба потока должны начаться одновременно. У меня проблема при доступе к первому элементу очереди. Оба потока получают элемент с индексом 0. Иногда это происходит с другими элементами очереди, а не только с первым элементом. У меня есть такой вывод на консоли:

  • Элемент 0 обработан 1 раз: 3:27:8
  • Элемент 0 обработан 2 Время: 3:27:8
  • Пункт 2 обработан 1 Время: 3:27:10
  • Пункт 3 обработан 2 Время: 3:27:11
  • Пункт 4 обработан 1 Время: 3:27:12

и так далее..

Вот код, который я использую:

    ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();

    for (int i = 0; i < 10; i++)
    {
        sharedQueue.Enqueue(i);
    }


    int itemCount= 0;


    Task[] tasks = new Task[2];
    for (int i = 0; i < tasks.Length; i++)
    {
        // create the new task
        tasks[i] = new Task(() =>
        {
            while (sharedQueue.Count > 0)
            {
                // define a variable for the dequeue requests
                int queueElement;
                // take an item from the queue
                bool gotElement = sharedQueue.TryDequeue(out queueElement);
                // increment the count of items processed
                if (gotElement)
                {
                    DateTime dt = DateTime.Now;
                    Console.WriteLine("Item " + itemCount + "processed by " 
                        + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
                    Interlocked.Increment(ref itemCount);   
               if (Task.CurrentId == 1) 
                    Thread.Sleep(2000);
                else 
                    Thread.Sleep(3000);                       
                }

            }
        });
        // start the new task
        tasks[i].Start();


    }
    // wait for the tasks to complete
    Task.WaitAll(tasks);
    // report on the number of items processed
    Console.WriteLine("Items processed: {0}", itemCount);
    // wait for input before exiting
    Console.WriteLine("Press enter to finish");
    Console.ReadLine();
}

2 ответа

Решение

Заменить следующую строку:

Console.WriteLine("Item " + itemCount + "processed by " ...);

С этой строкой:

Console.WriteLine("Item " + queueElement + "processed by " ...);

Вероятно, проблема, которую вы видите, связана с выполнением задач Console.WriteLine почти в одно и то же время, и оба видят одно и то же значение itemCount потому что они чередуются таким образом, что Interlocked.Increment звонки еще не произошли. Вероятно, имеет смысл распечатать queueElement во всяком случае, так как это более значимым.

Смотрите отличный ответ Брайана Гидеона относительно вашего itemCount проблема.

Вы можете переписать свой код, чтобы использовать BlockingCollection, а не ConcurrentQueue<T>, С ним намного проще работать. BlockingCollection является оберткой для одновременных коллекций В конфигурации по умолчанию резервное хранилище является ConcurrentQueue, Таким образом, вы получаете ту же функциональность параллельной очереди, но с гораздо более приятным интерфейсом.

BlockingCollection<int> sharedQueue = new BlockingCollection<int>();

for (int i = 0; i < 10; i++)
{
    sharedQueue.Add(i);
}

// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();

int itemCount= 0;

Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
    // create the new task
    tasks[i] = new Task(() =>
    {
        foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
        {
            DateTime dt = DateTime.Now;
            Console.WriteLine("Item " + itemCount + "processed by " 
                + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
            Interlocked.Increment(ref itemCount);   
            if (Task.CurrentId == 1) 
                Thread.Sleep(2000);
            else 
                Thread.Sleep(3000);                       
        }
    });

    // start the new task
    tasks[i].Start();
}

GetConsumingEnumerable возвращает перечислитель, который будет получать следующий элемент из очереди до тех пор, пока очередь не станет пустой. Это также хорошо обрабатывает отмену, что немного сложнее с ConcurrentQueue,

В общем, каждый раз, когда вы думаете об использовании ConcurrentQueue<T>вы, вероятно, хотите BlockingCollection<T>,

Другие вопросы по тегам