.NET Как сообщить (коллективный) прогресс о параллельных рабочих нагрузках, где каждый объект / задача сообщает о своем собственном прогрессе?
Я работаю над параллельными рабочими нагрузками, где каждый объект или задача сообщает о своем собственном прогрессе, и я хочу сообщить о коллективном прогрессе задачи в целом.
Например, представьте, что у меня есть 10 рабочих объектов, каждый из которых сообщает об индивидуальном прогрессе. Они содержат 0-100 "задач", которые должны быть выполнены.
Если бы нам нужно было линейно выполнять итерации по каждому из объектов Work, мы могли бы легко сообщить о нашем прогрессе и увидеть результат примерно так:
Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #1 of 10 is currently 2 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.
Однако при параллельной работе вывод будет выглядеть примерно так:
Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #4 of 10 is currently 16 of 100 tasks completed.
Work item #7 of 10 is currently 4 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.
Проблема, которую я пытаюсь решить, состоит в объединении всего прогресса в параллельных циклах, так что вывод для пользователя больше похож на "1/1000" или "10/1000", представляющий общий объем выполненной работы, и обновление числителя как работа продолжается.
Я ожидал бы, что есть решение или шаблон, который подходит независимо от Async/Await или использования Task Asynchronous Pattern - я использую оба - и я надеюсь, что уже есть способы справиться с этим в.NET Framework, которого у меня нет обнаружено.
Используя этот простой (псевдокод) пример из TAP:
Parallel.ForEach(WorkObject, wo =>
{
// Perhaps each WorkObject has a "ProgressChanged" delegate that fires progress notifications.
wo.ProgressChanged += delegate (int currentProgress, int totalProgress)
{
ReportProgress($"Work item #{wo.ID} of {WorkObject.Count} is currently {currentProgress} of {totalProgress} tasks completed.
};
// Or perhaps using IProgress<T> or Progress?
// wo.PerformWork(/*IProgress<T> or Progress<T>, etc.*/);
});
Мы можем выполнять итерации параллельно, и обновления / уведомления о прогрессе будут поступать по мере того, как каждый поток завершает единицу работы.
Как мы можем эффективно объединить прогресс всех WorkObjects таким образом, чтобы мы могли сообщать о более равномерном "1/1000" выполненном?
Проблема в том, что у каждого WorkObject может быть разное количество "заданий" для выполнения, и у нас может быть разное количество WorkObject, которые должны работать. Если просто объединить числитель и знаменатель из всех рабочих объектов при поступлении каждого уведомления о ходе выполнения (при условии, что они обновляются после завершения каждой единицы работы), к концу параллельной рабочей нагрузки уведомление о ходе выполнения будет отображать что-то вроде "1000/100 000". вместо "1000/1000".
Похоже, нам нужен способ отслеживать текущий прогресс, X, а также общий прогресс, Y, чтобы сформировать связное сообщение для пользователя о состоянии общего прогресса (X из Y завершено).
Существует ли существующая модель (в платформе или иным образом) для этого?
Моя текущая мысль состоит в том, чтобы создать структуру данных, записывающую идентификатор потока каждого потока, выполняющегося параллельно, и затем отслеживать прогресс каждого потока в значении этой структуры данных (в виде X/Y) и, наконец, когда каждый поток публикует обновление прогресса, повторяя итерации. по структуре данных для суммирования X / Y из каждого потока для генерации общего "X/Y" для отображения пользователю.
Но, безусловно, разработчики сталкиваются с этой проблемой каждый день, поэтому должен быть другой путь?
1 ответ
Я закончил тем, что создал класс для управления продвижением потоков; вот что я придумал:
// A Parallel Progress Manager is designed to collect progress information from multiple sources and provide a total sum of progress.
// For example, if 3 objects are going to perform some work in parallel, and the first object has 10 tasks, the second has 100, and the last has 1000,
// when executing in parallel, it isn't useful to have each task fire a ProgressChanged() event (or something similar), as it would result in the progress
// being returned something like 0/10, 1/10, 2/10, 0/100, 3/10, 1/100, 0/1000, etc. (As each thread executes independently.)
//
// Instead, this class aggregates progress and provides a total sum of progress: 0/1110, 1/1110, etc.
//
// NOTE: The intention of this class is to manage parallelized workloads across numerous jobs. For example, operating in parallel against 3 different objects
// that all report progress independently, such as Paralle.ForEach(IEnumerable<T>). This is not suggested for parallelized workloads of a single job, such as
// Parallel.For(i, 100)—in this case, it is recommended to update progress using Interlocked.Increment() or a lock() on a synchronization object as one would normally.
// Example:
//
// ParallelProgressManager ppm = new ParallelProgressManager();
//
// Parallel.ForEach(IEnumerable<T>, t =>
// {
// t.ProgressChanged += delegate (long current, long total, bool indeterminate, string message)
// {
// lock(ppm)
// {
// var x = ppm.SetGetProgress(t.GetHashCode(), current, total);
//
// ReportProgress(x.Item1, x.Item2, false, $"Working... {x.Item1} / {x.Item2}");
// }
// }
// });
using System;
using System.Collections.Generic;
namespace Threading
{
/// <summary>
/// A Parallel Progress Manager used to aggregate and sum progress across multiple objects working in parallel.
/// </summary>
public class ParallelProgressManager
{
/// <summary>
/// The progress class contains current and total progress and
/// </summary>
protected class Progress
{
public long Current { get; set; } = 0;
public long Total { get; set; } = 0;
}
/// <summary>
/// The ProgressDictionary associates each working object's Hash Code with it's current progress (via a Progress object.)
/// This way an object can operate in parallel and as progress updates come in, the last update is replaced by the new one.
/// We can then sum the "current" and "total" to produce an overall progress value.
/// </summary>
private Dictionary<int, Progress> ProgressDictionary { get; set; } = new Dictionary<int, Progress>();
/// <summary>
/// Sets an object's progress via it's Hash Code. If the object isn't recognized, a new entry will be made for it. If it is recognized,
/// it's progress will be updated accordingly.
/// </summary>
/// <param name="hashCode">
/// The Hash Code of the object (.GetHashCode()) that's reporting progress. The Hash Code is used to distinguish the objects to manage progress of.
/// </param>
/// <param name="current">
/// The current progress.
/// </param>
/// <param name="total">
/// The total progress.
/// </param>
public void SetProgress(int hashCode, long current, long total)
{
if (!ProgressDictionary.ContainsKey(hashCode))
ProgressDictionary.Add(hashCode, new Progress() { Current = current, Total = total });
else
{
ProgressDictionary[hashCode].Current = current;
ProgressDictionary[hashCode].Total = total;
}
}
/// <summary>
/// Retrieves the total progress of all objects currently being managed.
/// </summary>
/// <returns>
/// A Tuple where the first value represents the summed current progress, and the second value represents the summed total progress.
/// </returns>
public Tuple<long, long> GetProgress()
{
long c = 0;
long t = 0;
foreach (var p in ProgressDictionary)
{
c += p.Value.Current;
t += p.Value.Total;
}
return Tuple.Create(c, t);
}
/// <summary>
/// Sets progress for the provided object and retrieves an updated total progress. This is equivalent to calling SetProgress() and then calling
/// GetProgress() immediately after.
/// </summary>
/// <param name="hashCode"></param>
/// <param name="currentStep"></param>
/// <param name="totalSteps"></param>
/// <returns></returns>
public Tuple<long, long> SetGetProgress(int hashCode, long currentStep, long totalSteps)
{
SetProgress(hashCode, currentStep, totalSteps);
return GetProgress();
}
}
}
Ниже приведен один из возможных подходов. Подобно тому, что я описал выше, за исключением того, что я передал "работу" Задаче и перекачал ReportProgress из начального контекста потока.
Во-первых, пара занятий. Я использую Random, чтобы решить, сколько времени займет каждое задание и сколько заданий в каждом WorkObject. Работа эмулирует высокую загрузку процессора с помощью плотной петли. Вы бы использовали свои собственные объекты (и действительно полезную работу).
public class Job
{
private readonly TimeSpan timeForJobToTake;
public Job(TimeSpan timeForJobToTake)
{
this.timeForJobToTake = timeForJobToTake;
}
public void DoJob()
{
DateTime endTime = DateTime.UtcNow.Add(this.timeForJobToTake);
while (DateTime.UtcNow < endTime)
{
// emulate high CPU load during job
}
}
}
public class WorkObject
{
private readonly List<Job> jobs = new List<Job>();
public WorkObject(Random random)
{
int jobsToCreate = random.Next(1, 10);
for (int i = 0; i < jobsToCreate; i++)
{
Job job = new Job(TimeSpan.FromMilliseconds(random.Next(100, 200)));
this.jobs.Add(job);
}
}
public int JobCount => this.jobs.Count;
public void PerformWork()
{
foreach (Job job in this.jobs)
{
job.DoJob();
}
}
}
Затем вы можете сделать что-то вроде следующего (консольное приложение, но код может работать в других контекстах):
internal class Program
{
private static readonly object syncObj = new object();
private static int lastNumerator;
private static int numerator;
private static int denominator;
private static void ReportProgress()
{
int currentNumerator = numerator;
// Don't emit progress if nothing changed
if (currentNumerator == lastNumerator) return;
Console.WriteLine($"{currentNumerator} of {denominator}");
lastNumerator = currentNumerator;
}
private static void Main(string[] args)
{
MainAsync().Wait();
Console.ReadLine();
}
private static async Task MainAsync()
{
// Setup example objects
Random random = new Random();
List<WorkObject> workObjects = new List<WorkObject>();
int numberOfWorkObjects = random.Next(50, 100);
for (int i = 0; i < numberOfWorkObjects; i++)
{
WorkObject workObject = new WorkObject(random);
denominator += workObject.JobCount;
workObjects.Add(workObject);
}
// The CancellationTokenSource is used to immediately abort the progress reporting once the work is complete
CancellationTokenSource progressReportCancellationTokenSource = new CancellationTokenSource();
Task workTask = Task.Run(() =>
{
Parallel.ForEach(workObjects,
wo =>
{
wo.PerformWork();
lock (syncObj)
{
numerator += wo.JobCount;
}
});
progressReportCancellationTokenSource.Cancel();
});
while (!workTask.IsCompleted)
{
try
{
ReportProgress();
await Task.Delay(250, progressReportCancellationTokenSource.Token);
}
catch (TaskCanceledException)
{
break;
}
}
await workTask;
ReportProgress();
}
}