Только разветвление (и забыть) в длительных функциях
У меня есть приложение-функция с двумя функциями и очередь хранения. F1 вызывается сообщением в теме служебной шины. Для каждого полученного сообщения F1 вычисляет некоторые подзадачи (T1,T2,...), которые должны выполняться с различной задержкой. Ex - T1 срабатывает через 3 минуты, T2 - через 5 минут и т. Д. F1 отправляет сообщения в очередь хранения с соответствующими тайм-аутами видимости (для имитации задержки), и F2 запускается всякий раз, когда сообщение видно в очереди. Все работает отлично.
Теперь я хочу перенести это приложение в "Durable Functions". F1 теперь только запускает оркестратор. Код оркестровщика выглядит примерно так:
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
var pnTask = context.CallActivityAsync("PerformSubTask", value);
tasks.Add(pnTask);
}
//dont't await as we want to fire and forget. No fan-in!
//await Task.WhenAll(tasks);
}
[FunctionName("PerformSubTask")]
public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
{
TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
//will still keep the activity function running and incur costs??
await Task.Delay(actualDelay);
//perform subtask work after delay!
}
Я хотел бы только разойтись (без разборки, чтобы собрать результаты) и запустить подзадачи. Оркестратор запускает все задачи и избегает вызова "await Task.WhenAll". Функция действия вызывает Task.Delay, чтобы подождать указанное количество времени, а затем выполняет свою работу.
Мои вопросы
- Имеет ли смысл использовать Durable Functions для этого рабочего процесса?
- Является ли это правильным подходом для организации рабочего процесса "разветвления"?
- Мне не нравится тот факт, что функция активности работает в течение определенного времени (3 или 5 минут), ничего не делая. Это будет нести расходы, или?
- Кроме того, если требуется задержка более 10 минут, функция активности не сможет добиться успеха с этим подходом!
Моя ранняя попытка избежать этого состояла в том, чтобы использовать "CreateTimer" в оркестраторе, а затем добавить действие в качестве продолжения, но я вижу только записи таймера в таблице "История". Продолжение не стреляет! Нарушаю ли я ограничение для кода оркестратора - "Код оркестратора никогда не должен инициировать какую-либо асинхронную операцию"?
foreach (var value in results) { //calculate time to start var timeToStart = ; var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value)); tasks.Add(pnTask); }
ОБНОВЛЕНИЕ: использование подхода, предложенного Крисом
Деятельность, которая вычисляет подзадачи и задержки
[FunctionName("CalculateTasks")] public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log) { //in reality time is obtained by calling an endpoint DateTime currentTime = DateTime.UtcNow; return new List<TaskInfo> { new TaskInfo{ DelayInSeconds = 10, Origin = currentTime }, new TaskInfo{ DelayInSeconds = 20, Origin = currentTime }, new TaskInfo{ DelayInSeconds = 30, Origin = currentTime }, }; } public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log) { var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput"); var currentTime = context.CurrentUtcDateTime; List<Task> tasks = new List<Task>(); foreach (var value in results) { TimeSpan timeDifference = currentTime - value.Origin; TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds); var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference; var timeToStart = currentTime.Add(actualDelay); Task delayedActivityCall = context .CreateTimer(timeToStart, CancellationToken.None) .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value)); tasks.Add(delayedActivityCall); } await Task.WhenAll(tasks); }
Кажется, что просто планирование задач изнутри оркестратора работает. В моем случае я вычисляю задачи и задержки в другой операции (CalculateTasks) перед циклом. Я хочу, чтобы задержки вычислялись с использованием "текущего времени", когда выполнялась операция. Я использую DateTime.UtcNow в деятельности. Это как-то не очень хорошо работает при использовании в оркестраторе. Действия, указанные в "ContinueWith", просто не выполняются, и оркестратор всегда находится в состоянии "Выполнено".
Могу ли я не использовать время, записанное действием из оркестратора?
ОБНОВЛЕНИЕ 2
Так что обходной путь, предложенный Крисом, работает!
Так как я не хочу собирать результаты деятельности, я избегаю звонить await Tasks.WhenAll(tasks)
после планирования всех мероприятий. Я делаю это для того, чтобы уменьшить конкуренцию в очереди управления, т. Е. Иметь возможность запустить другую оркестровку, если требуется. Тем не менее, статус "оркестратор" по-прежнему " работает ", пока все действия не закончат работу. Я предполагаю, что он перемещается в " Выполнено " только после того, как последнее действие отправляет сообщение "Готово" в очередь управления.
Я прав? Есть ли способ освободить оркестратора раньше, то есть сразу после планирования всех действий?
4 ответа
ContinueWith
подход работал нормально для меня. Мне удалось смоделировать версию вашего сценария, используя следующий код оркестратора:
[FunctionName("Orchestrator")]
public static async Task Orchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context,
TraceWriter log)
{
var tasks = new List<Task>(10);
for (int i = 0; i < 10; i++)
{
int j = i;
DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
tasks.Add(delayedActivityCall);
}
await Task.WhenAll(tasks);
}
И для чего это стоит, вот код функции деятельности.
[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
log.Warning($"{DateTime.Now:o}: {j:00}");
}
Из выходных данных журнала я увидел, что все вызовы активности выполнялись на расстоянии 10 секунд друг от друга.
Другой подход заключается в том, чтобы развернуться к нескольким суб-оркестрациям (как предложено @jeffhollan), которые представляют собой простую короткую последовательность длительной задержки таймера и вызова вашей активности.
ОБНОВЛЕНИЕ Я попытался использовать ваш обновленный образец и смог воспроизвести вашу проблему! Если вы запускаете локально в Visual Studio и настраиваете параметры исключений так, чтобы они всегда нарушали исключения, то вы должны увидеть следующее:
System.InvalidOperationException: 'Обнаружено многопоточное выполнение. Это может произойти, если код функции оркестратора ожидает выполнения задачи, которая не была создана методом DurableOrchestrationContext. Более подробную информацию можно найти в этой статье https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay.'
Это означает поток, который называется context.CallActivityAsync("PerformSubtask", j)
не был тем же самым потоком, который вызвал функцию оркестратора. Я не знаю, почему мой первоначальный пример не ударил это, или почему ваша версия сделала. Это как-то связано с тем, как TPL решает, какой поток использовать для запуска ContinueWith
делегат - кое-что, что я должен изучить больше.
Хорошей новостью является то, что существует простой обходной путь, который заключается в задании TaskContinuationOptions.ExecuteSynchronously, например:
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(
t => context.CallActivityAsync("PerformSubtask", j),
TaskContinuationOptions.ExecuteSynchronously);
Пожалуйста, попробуйте и дайте мне знать, если это решит проблему, которую вы наблюдаете.
В идеале вам не нужно делать этот обходной путь при использовании Task.ContinueWith
, Я открыл вопрос в GitHub для отслеживания этого: https://github.com/Azure/azure-functions-durable-extension/issues/317
Поскольку я не хочу собирать результаты деятельности, я избегаю звонить
await Tasks.WhenAll(tasks)
после планирования всех мероприятий. Я делаю это для того, чтобы уменьшить конкуренцию в очереди управления, т. Е. Иметь возможность запустить другую оркестровку, если требуется. Тем не менее, статус "оркестратор" по-прежнему "работает", пока все действия не закончат работу. Я предполагаю, что он перемещается в "Выполнено" только после того, как последнее действие отправляет сообщение "Готово" в очередь управления.
Это ожидается. Функции Orchestrator фактически никогда не завершаются, пока не будут выполнены все нерешенные долговременные задачи. Там нет никакого способа обойти это. Обратите внимание, что вы по-прежнему можете запускать другие экземпляры оркестратора, возможно, возникнет некоторая конкуренция, если они окажутся в одном и том же разделе (по умолчанию 4 раздела).
await Task.Delay
это определенно не лучший вариант: вы заплатите за это время, пока ваша функция не сделает ничего полезного. Максимальная задержка также ограничена 10 минутами в плане потребления.
На мой взгляд, необработанные сообщения очереди - лучший вариант для сценариев "забей и забудь". Установите правильные тайм-ауты видимости, и ваш сценарий будет работать надежно и эффективно.
Особенностью убийцы Durable Functions являются: await
s, которые делают свою магию приостановки и возобновления, сохраняя при этом прицел. Таким образом, это отличный способ реализовать фан-ин, но вам это не нужно.
Я думаю, что долговечность определенно имеет смысл для этого рабочего процесса. Я думаю, что лучшим вариантом было бы использовать функцию задержки / таймера, как вы сказали, но, основываясь на синхронном характере выполнения, я не думаю, что добавлю все в список задач, который действительно ожидает .WhenAll()
или же .WhenAny()
к которому вы не стремитесь. Я думаю, что лично я бы просто сделал последовательный цикл foreach с задержками таймера для каждой задачи. Итак, псевдокод из:
for(int x = 0; x < results.Length; x++) {
await context.CreateTimer(TimeSpan.FromMinutes(1), ...);
await context.CallActivityAsync("PerformTaskAsync", results[x]);
}
Вам нужны те, кто ждет там, независимо от того, просто избегая await Task.WhenAll(...)
вероятно, вызывает некоторые проблемы в примере кода выше. надеюсь, это поможет
Вы должны быть в состоянии использовать
IDurableOrchestrationContext.StartNewOrchestration()
метод, который был добавлен в 2019 году для поддержки этого сценария. См. https://github.com/Azure/azure-functions-durable-extension/issues/715 для контекста .