ConcurrentQueue и Parallel.ForEach

У меня есть ConcurrentQueue со списком URL-адресов, которые мне нужны, чтобы получить источник. При использовании Parallel.ForEach с объектом ConcurrentQueue в качестве входного параметра метод Pop не будет работать ничего (должен возвращать строку).

Я использую Parallel с MaxDegreeOfParallelism, установленным в четыре. Мне действительно нужно заблокировать количество одновременных потоков. Является ли использование очереди с параллелизмом избыточным?

Заранее спасибо.

// On the main class
var items = await engine.FetchPageWithNumberItems(result);
// Enqueue List of items
itemQueue.EnqueueList(items);
var crawl = Task.Run(() => { engine.CrawlItems(itemQueue); });

// On the Engine class
public void CrawlItems(ItemQueue itemQueue)
{
Parallel.ForEach(
            itemQueue,
            new ParallelOptions {MaxDegreeOfParallelism = 4},
            item =>
            {

                var worker = new Worker();
                // Pop doesn't return anything
                worker.Url = itemQueue.Pop();
                /* Some work */
             });
 }

// Item Queue
class ItemQueue : ConcurrentQueue<string>
    {
        private ConcurrentQueue<string> queue = new ConcurrentQueue<string>();

        public string Pop()
        {
            string value = String.Empty;
            if(this.queue.Count == 0)
                throw new Exception();
            this.queue.TryDequeue(out value);
            return value;
        }

        public void Push(string item)
        {
            this.queue.Enqueue(item);
        }

        public void EnqueueList(List<string> list)
        {
            list.ForEach(this.queue.Enqueue);
        }
    }

2 ответа

Вам не нужно ConcurrentQueue<T> если все, что вы собираетесь сделать, это сначала добавить элементы к нему из одного потока, а затем выполнить итерацию в Parallel.ForEach(), Нормальный List<T> было бы достаточно для этого.

Кроме того, ваша реализация ItemQueue очень подозрительно

  • Наследуется от ConcurrentQueue<string> а также содержит другой ConcurrentQueue<string>, Это не имеет большого смысла, сбивает с толку и неэффективно.

  • Методы на ConcurrentQueue<T> были разработаны очень тщательно, чтобы быть потокобезопасными. Ваш Pop() не потокобезопасен. Что может случиться, что вы проверяете Count, заметьте, что это 1, затем позвоните TryDequeue() и не получить никакого значения (т.е. value будет null), потому что другой поток удалил элемент из очереди за время между двумя вызовами.

Проблема связана с методом CrawlItems, так как не следует вызывать Pop в действии, предоставленном для метода ForEach. Причина в том, что действие вызывается для каждого извлеченного элемента, следовательно, элемент уже извлечен. Это причина того, что у действия есть аргумент 'item'.

Я предполагаю, что вы получаете нулевое значение, так как все элементы уже извлечены другими потоками методом ForEach.

Поэтому ваш код должен выглядеть так:

public void CrawlItems(ItemQueue itemQueue)
{
    Parallel.ForEach(
        itemQueue,
        new ParallelOptions {MaxDegreeOfParallelism = 4},
        item =>
        {
            worker.Url = item;
            /* Some work */
         });
}
Другие вопросы по тегам