ConcurrentQueue с многопоточностью

Я новичок в многопоточности концепций. Мне нужно добавить определенное количество строк в очередь и обрабатывать их с несколькими потоками. С помощью ConcurrentQueue что потокобезопасно.

Это то, что я пытался. Но все элементы, добавленные в параллельную очередь, не обрабатываются. обрабатываются только первые 4 элемента.

class Program
{
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
    static void Main(string[] args)
    {
        new Program().run();
    }

    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; i++)
        {
            iQ.Enqueue("Item" + i);
        }

        Task.WaitAll(workers);
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);
        string op;
        if(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }

        Console.WriteLine("Worker {0} is stopping.", workerId);
    }


}

3 ответа

Решение

Есть несколько проблем с вашей реализацией. Первое и очевидное, что worker Метод удаляет только ноль или один элемент, а затем останавливается:

    if(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

Так должно быть:

    while(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

Однако этого будет недостаточно для правильной работы вашей программы. Если ваши рабочие удаляют очереди быстрее, чем ставит в очередь основной поток, они остановятся, пока основная задача все еще ставится в очередь. Вы должны дать сигнал рабочим, что они могут остановиться. Вы можете определить логическую переменную, которая будет установлена ​​в true после постановки в очередь:

for (int i = 0; i < 100; i++)
{
    iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);

Рабочие проверят значение:

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    do {
        string op;
        while(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }
        SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
    }
    while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
    Console.WriteLine("Worker {0} is stopping.", workerId);
}  

Ваши работники берут один предмет из queue а затем закончите работу, просто дайте им работать до queue пустой.

замещать if в рабочей функции с while

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    string op;
    while (iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

    Console.WriteLine("Worker {0} is stopping.", workerId);
}

Как вы запустите его, вы увидите, что около всех предметов будут обрабатываться два рабочих. Причина: ваш процессор имеет два ядра, оба работают, и нет "свободного слота" для создания нового задания. Если вы хотите, чтобы все ваши 4 задачи обрабатывали элементы, вы можете добавить задержку, чтобы дать вашему процессору время для создания других задач, например:

while (iQ.TryDequeue(out op))
{
    Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}

это дает вам вывод, что вы хотите:

...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...

Я на самом деле работал с ConcurrentQueue совсем недавно и думал, что поделюсь этим. Я создал кастом ConcurrentQueue называется CQItems у этого есть методы, чтобы построить себя с данными параметрами. Внутренне, когда вы говорите ему построить х количество у элементов, он делает Parallel.For вызов конструкторов элементов. Преимущество здесь в том, что когда вызывается метод или функция CQItems myCQ = CQItems.Generate(x, y) этот вызов поступает из основного потока приложения, что означает, что ничто не может смотреть в очередь, пока она не закончит сборку. Но внутри класса очереди он строится с потоками и значительно быстрее, чем просто использование List<> или же Queue<>, В основном он генерирует вещи из воздуха, но иногда (на основе параметров) создает элементы из SQL - в основном генерирует объекты на основе существующих данных. Во всяком случае, эти два метода в CQItems класс, который может помочь с этим:

public void Generate(int numberOfItems = 1, ItemOptions options = ItemOptions.NONE)
    {
        try
        {
            Type t = typeof(T);

            if (t == typeof(Item))
                throw new ArgumentException("Type " + t + " is not valid for generation.  Please contact QA.");

            else
                Parallel.For(0, numberOfItems, (i, loopState) =>
                {
                    try
                    {
                        GenerateItemByType(typeof(T), options);
                    }

                    catch
                    {
                        loopState.Stop();
                        throw;
                    }

                });
        }

        catch (AggregateException ae)
        {
            ae.Handle((x) =>
            {
                if (x is SQLNullResultsException)
                {
                    throw x;
                }
                else if (x is ImageNotTIFFException)
                {
                    throw x;
                }
                else
                {
                    throw x;
                }

                return true;
            });
        }

        catch
        {
            throw;
        }

        finally
        {
            ItemManager.Instance.Clear();
        }
    }

    private void GenerateItemByType(Type t, ItemOptions options = ItemOptions.NONE)
    {
        try
        {
            if (t == typeof(ItemA))
            {
                if ((options & ItemOptions.DUPLICATE) != 0)
                {
                    this.Enqueue(new ItemB(options));
                }
                else
                {
                    this.Enqueue(new ItemA(options));
                }
            }
            else if (t == typeof(ItemC))
            {
                this.Enqueue(new ItemC(options));
            }
        }

        catch
        {
            throw;
        }

        finally { }
    }

Некоторые полезные заметки:

Поставка loopState переменная в Parallel.For() позволяет нам установить состояние, чтобы остановить, если исключение поймано. Это хорошо, потому что если ваш цикл запрашивается 1000 вещей, а 5-я итерация выдает исключение, он будет продолжать цикл. Вы можете этого захотеть, но в моем случае исключение должно выйти из многопоточного цикла. Вы все равно будете в конечном итоге с AggregateException исходя из этого (очевидно, именно это и происходит, когда потоки выдают исключение). Разбор тех и только отправка первого может сэкономить МНОГО времени и головных болей, пытающихся прополоть гигантскую группу исключений, где более поздние исключения могут (или не могут) быть вызваны первыми в любом случае.

Что касается повторных бросков, я пытаюсь добавить оператор catch для большинства ожидаемых типов исключений, даже если я все равно планирую просто выбросить их в стек. Часть этого предназначена для устранения неполадок (возможность разбить на определенные исключения может быть удобной). Частично это связано с тем, что иногда я хочу иметь возможность делать другие вещи, такие как остановка цикла, изменение или добавление к сообщению об исключении, или в случае разрыва AggregateException, отправьте только одно исключение в стек, а не весь агрегат. Просто пояснение для тех, кто может смотреть на это.

Наконец, если это сбивает с толку, Type(T) ценность исходит от моего CQItems сам класс:

     public class CQItems<T> : ConcurrentQueue<Item>, IDisposable
Другие вопросы по тегам