TaskFactory, запуск новой задачи после ее завершения
Я нашел много способов использования TaskFactory
но я не мог найти ничего о запуске большего количества задач и наблюдении, когда один заканчивается и запуск другого.
Я всегда хочу, чтобы 10 задач работали.
Я хочу что то подобное
int nTotalTasks=10;
int nCurrentTask=0;
Task<bool>[] tasks=new Task<bool>[nThreadsNum];
for (int i=0; i<1000; i++)
{
string param1="test";
string param2="test";
if (nCurrentTask<10) // if there are less than 10 tasks then start another one
tasks[nCurrentThread++] = Task.Factory.StartNew<bool>(() =>
{
MyClass cls = new MyClass();
bool bRet = cls.Method1(param1, param2, i); // takes up to 2 minutes to finish
return bRet;
});
// How can I stop the for loop until a new task is finished and start a new one?
}
5 ответов
Проверьте метод Task.WaitAny:
Ожидает завершения выполнения любого из предоставленных объектов Task.
Пример из документации:
var t1 = Task.Factory.StartNew(() => DoOperation1());
var t2 = Task.Factory.StartNew(() => DoOperation2());
Task.WaitAny(t1, t2)
Для этого я бы использовал комбинацию Microsoft Reactive Framework (NuGet "Rx-Main") и TPL. Это становится очень просто.
Вот код:
int nTotalTasks=10;
string param1="test";
string param2="test";
IDisposable subscription =
Observable
.Range(0, 1000)
.Select(i => Observable.FromAsync(() => Task.Factory.StartNew<bool>(() =>
{
MyClass cls = new MyClass();
bool bRet = cls.Method1(param1, param2, i); // takes up to 2 minutes to finish
return bRet;
})))
.Merge(nTotalTasks)
.ToArray()
.Subscribe((bool[] results) =>
{
/* Do something with the results. */
});
Ключевой частью здесь является .Merge(nTotalTasks)
что ограничивает количество одновременных задач.
Если вам нужно остановить процесс обработки, просто позвоните subscription.Dispose()
и все очищается для вас.
Если вы хотите обрабатывать каждый результат по мере его появления, вы можете изменить код из .Merge(...)
как это:
.Merge(nTotalTasks)
.Subscribe((bool result) =>
{
/* Do something with each result. */
});
Ответ зависит от того, связаны ли задачи, которые будут запланированы, с процессором или с I/O.
Для работы с интенсивным использованием процессора я бы использовал Parallel.For()
API устанавливает количество потоков / задач через MaxDegreeOfParallelism
собственностью ParallelOptions
Для работы, связанной с вводом / выводом, количество одновременно выполняемых задач может быть значительно больше, чем количество доступных процессоров, поэтому стратегия состоит в том, чтобы максимально полагаться на асинхронные методы, что уменьшает общее количество потоков, ожидающих завершения.
Как я могу остановить цикл for, пока новая задача не будет завершена, и начать новую?
Цикл можно регулировать с помощью await:
static void Main(string[] args)
{
var task = DoWorkAsync();
task.Wait();
// handle results
// task.Result;
Console.WriteLine("Done.");
}
async static Task<bool> DoWorkAsync()
{
const int NUMBER_OF_SLOTS = 10;
string param1="test";
string param2="test";
var results = new bool[NUMBER_OF_SLOTS];
AsyncWorkScheduler ws = new AsyncWorkScheduler(NUMBER_OF_SLOTS);
for (int i = 0; i < 1000; ++i)
{
await ws.ScheduleAsync((slotNumber) => DoWorkAsync(i, slotNumber, param1, param2, results));
}
ws.Complete();
await ws.Completion;
}
async static Task DoWorkAsync(int index, int slotNumber, string param1, string param2, bool[] results)
{
results[slotNumber] = results[slotNumber} && await Task.Factory.StartNew<bool>(() =>
{
MyClass cls = new MyClass();
bool bRet = cls.Method1(param1, param2, i); // takes up to 2 minutes to finish
return bRet;
}));
}
Вспомогательный класс AsyncWorkScheduler
использует компоненты TPL.DataFlow, а также Task.WhenAll()
:
class AsyncWorkScheduler
{
public AsyncWorkScheduler(int numberOfSlots)
{
m_slots = new Task[numberOfSlots];
m_availableSlots = new BufferBlock<int>();
m_errors = new List<Exception>();
m_tcs = new TaskCompletionSource<bool>();
m_completionPending = 0;
// Initial state: all slots are available
for(int i = 0; i < m_slots.Length; ++i)
{
m_slots[i] = Task.FromResult(false);
m_availableSlots.Post(i);
}
}
public async Task ScheduleAsync(Func<int, Task> action)
{
if (Volatile.Read(ref m_completionPending) != 0)
{
throw new InvalidOperationException("Unable to schedule new items.");
}
// Acquire a slot
int slotNumber = await m_availableSlots.ReceiveAsync().ConfigureAwait(false);
// Schedule a new task for a given slot
var task = action(slotNumber);
// Store a continuation on the task to handle completion events
m_slots[slotNumber] = task.ContinueWith(t => HandleCompletedTask(t, slotNumber), TaskContinuationOptions.ExecuteSynchronously);
}
public async void Complete()
{
if (Interlocked.CompareExchange(ref m_completionPending, 1, 0) != 0)
{
return;
}
// Signal the queue's completion
m_availableSlots.Complete();
await Task.WhenAll(m_slots).ConfigureAwait(false);
// Set completion
if (m_errors.Count != 0)
{
m_tcs.TrySetException(m_errors);
}
else
{
m_tcs.TrySetResult(true);
}
}
public Task Completion
{
get
{
return m_tcs.Task;
}
}
void SetFailed(Exception error)
{
lock(m_errors)
{
m_errors.Add(error);
}
}
void HandleCompletedTask(Task task, int slotNumber)
{
if (task.IsFaulted || task.IsCanceled)
{
SetFailed(task.Exception);
return;
}
if (Volatile.Read(ref m_completionPending) == 1)
{
return;
}
// Release a slot
m_availableSlots.Post(slotNumber);
}
int m_completionPending;
List<Exception> m_errors;
BufferBlock<int> m_availableSlots;
TaskCompletionSource<bool> m_tcs;
Task[] m_slots;
}
Это должно быть все, что вам нужно, не завершено, но все, что вам нужно сделать, это дождаться завершения первого и затем запустить второе.
Task.WaitAny(task to wait on);
Task.Factory.StartNew()
Вы видели класс BlockingCollection? Это позволяет вам иметь несколько потоков, работающих параллельно, и вы можете ждать результатов одной задачи для выполнения другой. Смотрите больше информации здесь.