ConcurrentQueue и Parallel.ForEach
У меня есть ConcurrentQueue со списком URL-адресов, которые мне нужны, чтобы получить источник. При использовании Parallel.ForEach с объектом ConcurrentQueue в качестве входного параметра метод Pop не будет работать ничего (должен возвращать строку).
Я использую Parallel с MaxDegreeOfParallelism, установленным в четыре. Мне действительно нужно заблокировать количество одновременных потоков. Является ли использование очереди с параллелизмом избыточным?
Заранее спасибо.
// On the main class
var items = await engine.FetchPageWithNumberItems(result);
// Enqueue List of items
itemQueue.EnqueueList(items);
var crawl = Task.Run(() => { engine.CrawlItems(itemQueue); });
// On the Engine class
public void CrawlItems(ItemQueue itemQueue)
{
Parallel.ForEach(
itemQueue,
new ParallelOptions {MaxDegreeOfParallelism = 4},
item =>
{
var worker = new Worker();
// Pop doesn't return anything
worker.Url = itemQueue.Pop();
/* Some work */
});
}
// Item Queue
class ItemQueue : ConcurrentQueue<string>
{
private ConcurrentQueue<string> queue = new ConcurrentQueue<string>();
public string Pop()
{
string value = String.Empty;
if(this.queue.Count == 0)
throw new Exception();
this.queue.TryDequeue(out value);
return value;
}
public void Push(string item)
{
this.queue.Enqueue(item);
}
public void EnqueueList(List<string> list)
{
list.ForEach(this.queue.Enqueue);
}
}
2 ответа
Вам не нужно ConcurrentQueue<T>
если все, что вы собираетесь сделать, это сначала добавить элементы к нему из одного потока, а затем выполнить итерацию в Parallel.ForEach()
, Нормальный List<T>
было бы достаточно для этого.
Кроме того, ваша реализация ItemQueue
очень подозрительно
Наследуется от
ConcurrentQueue<string>
а также содержит другойConcurrentQueue<string>
, Это не имеет большого смысла, сбивает с толку и неэффективно.Методы на
ConcurrentQueue<T>
были разработаны очень тщательно, чтобы быть потокобезопасными. ВашPop()
не потокобезопасен. Что может случиться, что вы проверяетеCount
, заметьте, что это 1, затем позвонитеTryDequeue()
и не получить никакого значения (т.е.value
будетnull
), потому что другой поток удалил элемент из очереди за время между двумя вызовами.
Проблема связана с методом CrawlItems, так как не следует вызывать Pop в действии, предоставленном для метода ForEach. Причина в том, что действие вызывается для каждого извлеченного элемента, следовательно, элемент уже извлечен. Это причина того, что у действия есть аргумент 'item'.
Я предполагаю, что вы получаете нулевое значение, так как все элементы уже извлечены другими потоками методом ForEach.
Поэтому ваш код должен выглядеть так:
public void CrawlItems(ItemQueue itemQueue)
{
Parallel.ForEach(
itemQueue,
new ParallelOptions {MaxDegreeOfParallelism = 4},
item =>
{
worker.Url = item;
/* Some work */
});
}