Параллельные сообщения и сообщения от perfmon

У меня есть приложение, которое читает из текстового файла, который содержит 100-1000 строк.

Каждая строка обрабатывается с использованием параллельного. Foreach с использованием ParallelOptions, чтобы ограничить количество выполняемых задач. Это задача "контроллера".

Внутри этого "контроллера" параллельный. Foreach - это еще один параллельный. Foreach, который будет выполнять реальную работу. Каждая запущенная задача контроллера будет выполнять одну и ту же работу с вводом разных данных, как указано в исходной строке файла. Опять же, эта работа параллельная. Foreach использует параллелепсии для ограничения количества выполняемых задач.

В моем последнем тесте я использовал контроллер foreach: MaxDegreeOfParallelism=4 Рабочий foreach: MaxDegreeOfParallelism:4

Что, согласно моей математике, должно означать, что в каждый момент времени работает максимум 16 задач.

Однако, когда я проверяю perfmon.exe, я вижу свое приложение, используя 700 потоков. Еще через несколько часов это будет более 1000.

Как это может быть? Почему GC не собирает эти законченные темы?

Ранее мой код запускал реальные потоки в Thread[] с той же проблемой. Затем я переместил его в задачу [] и у меня возникла та же проблема. Я предположил, что где-то произошла утечка потока, и ссылка все еще указывала на поток / задачу. Я потратил много часов на поиски этого, но безрезультатно.

Поэтому я перешел к параллельному процессу с мыслью, что, если у меня никогда не будет ссылки на задачу, утечка задачи не может произойти, как это происходит внутри лямды.

Но проблема сохраняется. Есть идеи, почему это так? Или это нормально?

Добавил приведенный ниже код, это немного путаница из-за всех тестов и, пытаясь отладить эту проблему, попытался немного ее исправить.

public static void RunActions(
    List<paramsActionSettings> listActions,
    string[] arrList,
    int numThreads,
    string domain = null,
    delGetParamsActionSettings delGetActionsList = null,
    delProcessString callbackActionsComplete = null
    )
{

    int iCntr= 0;
    int iTotal = arrList.Length;


    ParallelOptions prlOptions = new ParallelOptions
    {
        MaxDegreeOfParallelism = numThreads
    };

    //foreach (string listItemIter in arrList)
    object oLock = new object();
    Parallel.ForEach(arrList, prlOptions,(listItemIter) =>
    {
        lock (oLock)
        {
            Console.WriteLine("starting "+iCntr + " of " + iTotal + " run actions");
            iCntr++;
        }

        string listItemCopySafe = string.Copy(listItemIter);

        bool bCanDo = true;
        List<paramsActionSettings> listActionsUse;
        if (listActions == null)
        {
            listActionsUse = delGetActionsList();
        }
        else
        {
            listActionsUse = listActions;
        }
        foreach (paramsActionSettings prms in listActionsUse)
        {
            if (prms.delCanDo != null && !prms.delCanDo(listItemCopySafe, domain))
            {
                bCanDo = false;
                break;
            }
        }
        if (!bCanDo) return;


        List<paramsFire> listParams = new List<paramsFire>();

        //create a list of paramsfire objects, the object holds the params and the delfunction
        foreach (paramsActionSettings prms in listActionsUse)
        {
            listParams.Add(new paramsFire(prms.delGetDoParams(listItemCopySafe), prms.delDoSomething));
        }


        FireActions(listParams, callbackActionsComplete, listItemCopySafe);
        Console.WriteLine("Finished " + iCntr + " of " + iTotal );
    }); 
}



private static void FireActions(List<paramsFire> list, delProcessString callbackActionsComplete, string itemArr)
{
    int icntr = 0;
    foreach (paramsFire prms in list)
    {
        try
        {
            if (icntr == 0) 
            {
                if (!prms.delDoSomething(prms.oParams))
                {
                    break;
                }
            }
            else
            {
                prms.delDoSomething(prms.oParams); 
            }
            icntr++;

        }
        catch (Exception e)
        {
            ErrorLog.WriteLine("foreach (paramsFire prms in list)");
            UtilException.Dump(e, "foreach (paramsFire prms in list)");
        }
    } 
     if (callbackActionsComplete != null)
    {
        try
        {
            callbackActionsComplete(itemArr);
        }
        catch { }
    }
}

2 ответа

Я немного переписал. Это только получило до 22 потоков на моей машине (8 процедурных боксов). Не стесняйтесь использовать любое из этих изменений:

    public static void RunActions(
        IEnumerable<paramsActionSettings> listActions,
        IEnumerable<string> arrList,
        int numThreads,
        string domain = null,
        delGetParamsActionSettings delGetActionsList = null,
        delProcessString callbackActionsComplete = null)
    {

        var cntr = 0;
        var total = arrList.Count();

        var prlOptions = new ParallelOptions
        {
            MaxDegreeOfParallelism = numThreads
        };

        ////foreach (var listItemIter in arrList)
        Parallel.ForEach(arrList, prlOptions, listItemIter =>
        {
            Interlocked.Increment(ref cntr);
            Console.WriteLine("starting " + cntr + " of " + total + " run actions");

            var listItemCopySafe = string.Copy(listItemIter);

            var listActionsUse = listActions ??
                ((delGetActionsList == null) ? new paramsActionSettings[0] : delGetActionsList());
            var canDo = listActionsUse.All(prms => prms.delCanDo == null
                || prms.delCanDo(listItemCopySafe, domain));

            if (!canDo)
            {
                return;
            }

            var listParams = listActionsUse.Select(prms => new paramsFire(
                prms.delGetDoParams(listItemCopySafe),
                prms.delDoSomething));

            // create a list of paramsfire objects, the object holds the params and the delfunction
            FireActions(listParams, callbackActionsComplete, listItemCopySafe);
            Console.WriteLine("Finished " + cntr + " of " + total);
        });
    }

    private static void FireActions(
        IEnumerable<paramsFire> list,
        delProcessString callbackActionsComplete,
        string itemArr)
    {
        var icntr = 0;

        foreach (var prms in list)
        {
            try
            {
                if (icntr == 0)
                {
                    if (!prms.delDoSomething(prms.oParams))
                    {
                        break;
                    }
                }
                else
                {
                    prms.delDoSomething(prms.oParams);
                }

                icntr++;
            }
            catch (Exception e)
            {
                ErrorLog.WriteLine("foreach (paramsFire prms in list)");
                UtilException.Dump(e, "foreach (paramsFire prms in list)");
            }
        }

        if (callbackActionsComplete == null)
        {
            return;
        }

        try
        {
            callbackActionsComplete(itemArr);
        }
        catch
        {
        }
    }

Ясно, что проблема не в каком-либо конкретном API, как вы сказали, что проблема одинакова для потоков, задач и Parallel.ForEach,

Не спрашивайте, почему фреймворк не выполняет свою работу (потому что это так). Спросите, почему ваш код порождает больше потоков, чем вы предполагали. Не видя больше кода, эта проблема не может быть полностью решена.

Другие вопросы по тегам