Только разветвление (и забыть) в длительных функциях

У меня есть приложение-функция с двумя функциями и очередь хранения. F1 вызывается сообщением в теме служебной шины. Для каждого полученного сообщения F1 вычисляет некоторые подзадачи (T1,T2,...), которые должны выполняться с различной задержкой. Ex - T1 срабатывает через 3 минуты, T2 - через 5 минут и т. Д. F1 отправляет сообщения в очередь хранения с соответствующими тайм-аутами видимости (для имитации задержки), и F2 запускается всякий раз, когда сообщение видно в очереди. Все работает отлично.

Теперь я хочу перенести это приложение в "Durable Functions". F1 теперь только запускает оркестратор. Код оркестровщика выглядит примерно так:

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            var pnTask = context.CallActivityAsync("PerformSubTask", value);
            tasks.Add(pnTask);
        }

        //dont't await as we want to fire and forget. No fan-in!
        //await Task.WhenAll(tasks);
    }

    [FunctionName("PerformSubTask")]
    public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
    {
         TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
         TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
         var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

         //will still keep the activity function running and incur costs??
         await Task.Delay(actualDelay);

         //perform subtask work after delay! 
    }

Я хотел бы только разойтись (без разборки, чтобы собрать результаты) и запустить подзадачи. Оркестратор запускает все задачи и избегает вызова "await Task.WhenAll". Функция действия вызывает Task.Delay, чтобы подождать указанное количество времени, а затем выполняет свою работу.

Мои вопросы

  • Имеет ли смысл использовать Durable Functions для этого рабочего процесса?
  • Является ли это правильным подходом для организации рабочего процесса "разветвления"?
  • Мне не нравится тот факт, что функция активности работает в течение определенного времени (3 или 5 минут), ничего не делая. Это будет нести расходы, или?
  • Кроме того, если требуется задержка более 10 минут, функция активности не сможет добиться успеха с этим подходом!
  • Моя ранняя попытка избежать этого состояла в том, чтобы использовать "CreateTimer" в оркестраторе, а затем добавить действие в качестве продолжения, но я вижу только записи таймера в таблице "История". Продолжение не стреляет! Нарушаю ли я ограничение для кода оркестратора - "Код оркестратора никогда не должен инициировать какую-либо асинхронную операцию"?

    foreach (var value in results)
    {
            //calculate time to start
            var timeToStart = ;
            var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
            tasks.Add(pnTask);
    }
    

    ОБНОВЛЕНИЕ: использование подхода, предложенного Крисом

    Деятельность, которая вычисляет подзадачи и задержки

    [FunctionName("CalculateTasks")]
    public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
    {
        //in reality time is obtained by calling an endpoint 
        DateTime currentTime = DateTime.UtcNow;
        return new List<TaskInfo> {
            new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
        };
    }
    
    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        var currentTime = context.CurrentUtcDateTime;
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            TimeSpan timeDifference = currentTime - value.Origin;
            TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
            var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
    
            var timeToStart = currentTime.Add(actualDelay);
    
            Task delayedActivityCall = context
                 .CreateTimer(timeToStart, CancellationToken.None)
                 .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
            tasks.Add(delayedActivityCall);
        }
    
        await Task.WhenAll(tasks);
    }
    

Кажется, что просто планирование задач изнутри оркестратора работает. В моем случае я вычисляю задачи и задержки в другой операции (CalculateTasks) перед циклом. Я хочу, чтобы задержки вычислялись с использованием "текущего времени", когда выполнялась операция. Я использую DateTime.UtcNow в деятельности. Это как-то не очень хорошо работает при использовании в оркестраторе. Действия, указанные в "ContinueWith", просто не выполняются, и оркестратор всегда находится в состоянии "Выполнено".

Могу ли я не использовать время, записанное действием из оркестратора?

ОБНОВЛЕНИЕ 2

Так что обходной путь, предложенный Крисом, работает!

Так как я не хочу собирать результаты деятельности, я избегаю звонить await Tasks.WhenAll(tasks) после планирования всех мероприятий. Я делаю это для того, чтобы уменьшить конкуренцию в очереди управления, т. Е. Иметь возможность запустить другую оркестровку, если требуется. Тем не менее, статус "оркестратор" по-прежнему " работает ", пока все действия не закончат работу. Я предполагаю, что он перемещается в " Выполнено " только после того, как последнее действие отправляет сообщение "Готово" в очередь управления.

Я прав? Есть ли способ освободить оркестратора раньше, то есть сразу после планирования всех действий?

4 ответа

ContinueWith подход работал нормально для меня. Мне удалось смоделировать версию вашего сценария, используя следующий код оркестратора:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    var tasks = new List<Task>(10);
    for (int i = 0; i < 10; i++)
    {
        int j = i;
        DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
        Task delayedActivityCall = context
            .CreateTimer(timeToStart, CancellationToken.None)
            .ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}

И для чего это стоит, вот код функции деятельности.

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
    log.Warning($"{DateTime.Now:o}: {j:00}");
}

Из выходных данных журнала я увидел, что все вызовы активности выполнялись на расстоянии 10 секунд друг от друга.

Другой подход заключается в том, чтобы развернуться к нескольким суб-оркестрациям (как предложено @jeffhollan), которые представляют собой простую короткую последовательность длительной задержки таймера и вызова вашей активности.

ОБНОВЛЕНИЕ Я попытался использовать ваш обновленный образец и смог воспроизвести вашу проблему! Если вы запускаете локально в Visual Studio и настраиваете параметры исключений так, чтобы они всегда нарушали исключения, то вы должны увидеть следующее:

System.InvalidOperationException: 'Обнаружено многопоточное выполнение. Это может произойти, если код функции оркестратора ожидает выполнения задачи, которая не была создана методом DurableOrchestrationContext. Более подробную информацию можно найти в этой статье https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay.'

Это означает поток, который называется context.CallActivityAsync("PerformSubtask", j) не был тем же самым потоком, который вызвал функцию оркестратора. Я не знаю, почему мой первоначальный пример не ударил это, или почему ваша версия сделала. Это как-то связано с тем, как TPL решает, какой поток использовать для запуска ContinueWith делегат - кое-что, что я должен изучить больше.

Хорошей новостью является то, что существует простой обходной путь, который заключается в задании TaskContinuationOptions.ExecuteSynchronously, например:

Task delayedActivityCall = context
    .CreateTimer(timeToStart, CancellationToken.None)
    .ContinueWith(
        t => context.CallActivityAsync("PerformSubtask", j),
        TaskContinuationOptions.ExecuteSynchronously);

Пожалуйста, попробуйте и дайте мне знать, если это решит проблему, которую вы наблюдаете.

В идеале вам не нужно делать этот обходной путь при использовании Task.ContinueWith, Я открыл вопрос в GitHub для отслеживания этого: https://github.com/Azure/azure-functions-durable-extension/issues/317

Поскольку я не хочу собирать результаты деятельности, я избегаю звонить await Tasks.WhenAll(tasks) после планирования всех мероприятий. Я делаю это для того, чтобы уменьшить конкуренцию в очереди управления, т. Е. Иметь возможность запустить другую оркестровку, если требуется. Тем не менее, статус "оркестратор" по-прежнему "работает", пока все действия не закончат работу. Я предполагаю, что он перемещается в "Выполнено" только после того, как последнее действие отправляет сообщение "Готово" в очередь управления.

Это ожидается. Функции Orchestrator фактически никогда не завершаются, пока не будут выполнены все нерешенные долговременные задачи. Там нет никакого способа обойти это. Обратите внимание, что вы по-прежнему можете запускать другие экземпляры оркестратора, возможно, возникнет некоторая конкуренция, если они окажутся в одном и том же разделе (по умолчанию 4 раздела).

await Task.Delay это определенно не лучший вариант: вы заплатите за это время, пока ваша функция не сделает ничего полезного. Максимальная задержка также ограничена 10 минутами в плане потребления.

На мой взгляд, необработанные сообщения очереди - лучший вариант для сценариев "забей и забудь". Установите правильные тайм-ауты видимости, и ваш сценарий будет работать надежно и эффективно.

Особенностью убийцы Durable Functions являются: awaits, которые делают свою магию приостановки и возобновления, сохраняя при этом прицел. Таким образом, это отличный способ реализовать фан-ин, но вам это не нужно.

Я думаю, что долговечность определенно имеет смысл для этого рабочего процесса. Я думаю, что лучшим вариантом было бы использовать функцию задержки / таймера, как вы сказали, но, основываясь на синхронном характере выполнения, я не думаю, что добавлю все в список задач, который действительно ожидает .WhenAll() или же .WhenAny() к которому вы не стремитесь. Я думаю, что лично я бы просто сделал последовательный цикл foreach с задержками таймера для каждой задачи. Итак, псевдокод из:

for(int x = 0; x < results.Length; x++) { await context.CreateTimer(TimeSpan.FromMinutes(1), ...); await context.CallActivityAsync("PerformTaskAsync", results[x]); }

Вам нужны те, кто ждет там, независимо от того, просто избегая await Task.WhenAll(...) вероятно, вызывает некоторые проблемы в примере кода выше. надеюсь, это поможет

Вы должны быть в состоянии использовать IDurableOrchestrationContext.StartNewOrchestration()метод, который был добавлен в 2019 году для поддержки этого сценария. См. https://github.com/Azure/azure-functions-durable-extension/issues/715 для контекста .

Другие вопросы по тегам