Функция шага AWS - дождаться события

У меня есть случай использования, когда у меня есть функция шага AWS, которая запускается при загрузке файла на S3, оттуда первый шаг запускает ffprobe для получения длительности файла от внешнего сервиса, такого как transloadit, в который записывается вывод вернуться к S3.

Я могу создать новую функцию шага из этого события, но я бродил, возможно ли получить обещание Await внутри исходной функции шага, а затем перейти к следующей - учитывая, что возвращение ffprobe может занять больше времени.

Любой совет высоко ценится, как справиться с этим.

7 ответов

AWS Step Functions теперь поддерживает асинхронные обратные вызовы для длительных шагов как первоклассный.

Это похоже на ответ @ixja выше, но упрощено. Одно состояние в вашем рабочем процессе может напрямую вызывать Lambda, SNS, SQS или ECS и ждать вызоваSendTaskSuccess.

Для SQS задокументирован хороший пример, в котором пошаговая функция отправляет сообщение и приостанавливает выполнение рабочего процесса, пока что-то не предоставит обратный вызов. Лямбда будет эквивалентна (при условии, что основная обработка, такая как transloadit, происходит вне самой лямбды)

Определение вашей ступенчатой ​​функции будет выглядеть так

"Invoke transloadit": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
  "Parameters": {
    "FunctionName": "InvokeTransloadit",
    "Payload": {
        "some_other_param": "...",
        "token.$": "$$.Task.Token"
     }
  },
  "Next": "NEXT_STATE"
}

Затем в своей лямбде вы должны сделать что-то вроде

def lambda_handler(event, context):
    token = event['token']

    # invoke transloadit via SSM, ECS, passing token along

то в вашем основном длительном процессе вы должны выполнить обратный вызов с токеном, например aws stepfunctions send-task-success --task-token $token из сценария оболочки / CLI или аналогичного с вызовами API.

Когда вы отправляете запрос на transloadit, сохраняйте taskToken для шага в s3 под предсказуемым ключом, основанным на ключе загруженного файла. Например, если файл мультимедиа находится по адресу "s3://my-media-bucket/foobar/media-001.mp3", вы можете создать файл JSON, содержащий токен задачи текущего шага, и сохранить его с тем же ключом. в другом сегменте, например 's3://ffprobe-tasks/foobar/media-001.mp3.json'. В конце вашего шага, который отправляет носитель на transloadit , не вызывайте успех или неудачу на этом шаге - оставьте его запущенным.

Затем, когда вы получите уведомление s3 о том, что результат transloadit готов, вы можете определить ключ s3 для получения токена задачи ('s3://ffprobe-tasks/foobar/media-001.mp3'), загрузить JSON (и удалить это из s3) и отправь успех для этой задачи. Шаговая функция перейдет к следующему состоянию выполнения.

Не могу предложить простое решение, только несколько направлений для изучения.

Во-первых, у пошаговых функций есть особый способ обработки длительной фоновой работы: действия. https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html это в основном очередь.

Если вы хотите 100% без сервера, это будет сложно или безобразно.

  • либо, как вы сказали, создайте новую пошаговую функцию для каждого файла
  • или цикл опроса S3 в конечном автомате с использованием пользовательского кода ошибки и Retry пункт

Если вы можете выделить экземпляр "1/8 микро" для фонового работника, это не элегантно, но легко и может быть реализовано с мгновенной реакцией. Низкие требования к оборудованию намекают на то, что мы будем использовать машину только для синхронизации.

Определите действие StepFunction, названное, например, video-duration, Определите очередь SQS для мгновенной реакции или опрос S3 для результатов продолжительности.

Псевдокод функции состояния:

{
  StartAt: ffprobe
  ffprobe: {
    Type: Task
    Resource: arn:...lambda:launch-ffprobe
    Next: wait-duration
  }
  wait-duration: {
    Type: Task
    Resource: arn...activity:video-duration
    End: true
  }
}

Фоновый рабочий псевдокод:

statemap = dict/map filename to result

thread1:
  loop:
    taskToken, input = SF.GetActivityTask('video-duration')  # long poll
    sync(key=input.filename, waiter=taskToken)
thread2:
  loop:
    msg = SQS.ReceiveMessage(...)  # or poll S3
    sync(key=msg.filename, duration=msg.result)

function sync(key, waiter, duration):
  state = statemap[key]
  if waiter:
    state.waiter = waiter
  if duration:
    state.duration = duration
  if state.waiter and state.duration:
    SF.SendTaskSuccess(state.waiter, state.duration)

S3 запускает псевдокод:

if filename is video:
  SF.StartExecution(...)
else if filename is duration:
  content = S3.GetObject(filename)
  SQS.SendMessage(queue, content)

Если вы знаете, куда transloadit поместит файл в S3, как только это будет сделано, вы можете опросить S3 в цикле. Для опроса вы можете использовать HeadObject, а затем проверить код состояния ответа.

Такой цикл опроса описан в одном из примеров проектов в документации AWS Step Function . Вместо того, чтобы использовать Lambdas, за выполнение которых нужно платить, вы можете напрямую запросить S3 API, как описано здесь . Без Lambdas вам придется платить только за переходы между состояниями в стандартном рабочем процессе .

Как правило, вы хотите инициировать асинхронную задачу как действие Step Function. Ключевое слово здесь - " инициировать" - другими словами, когда ваша активность имеет ожидающее действие, именно тогда вы запускаете асинхронное действие. Причина этого в том, что вам нужен токен задачи, связанный с ожидающим действием - тогда, пока ваше "будущее" может каким-либо образом включить этот токен (например, вы можете установить его как ссылку или идентификатор запроса), тогда вы можете "завершить" действие с успехом или неудачей с использованием вызова SendTaskSuccess или SendTaskFailure.

Есть два подхода к инициированию задачи:

  1. Опрос для нового занятия. Вы должны настроить запланированное событие CloudWatch, чтобы вызывать вызов GetActivityTask каждые n минут.

  2. Запустите новую задачу "инициатор" параллельно с вашей деятельностью в рамках функции шага. Этот инициатор выполняет то же, что и #1, и выполняет вызов GetActivityTask, единственное отличие состоит в том, что он запускается немедленно и не требует механизма опроса. Вызов GetActivityTask блокируется до тех пор, пока не станет доступна новая задача активности, поэтому проблем с условиями гонки не возникает. Обратите внимание, что есть вероятность, что вы можете выбрать действие из другого выполнения, поэтому этот инициатор должен учитывать только входные данные действия, а не входные данные, которые получает сам инициатор.

Вот как выглядит #2 в функции Step:

Инициирование деятельности

И основной пример кода, связанный с задачей InitiateManualApprovalActivity:

import boto3
import time

client = boto3.client('stepfunctions')
activity = "arn:aws:states:us-east-1:123456789012:activity:ManualStep"

def lambda_handler(event, context):
    print(event)
    # This will block until an activity task becomes available
    task = client.get_activity_task(activityArn=activity, workerName="test")
    print(task)
    # Perform your task here
    # In this example we continue on in the same function,
    # but the continuation could be a separate event, 
    # just as long as you can retrieve the task token
    time.sleep(60)
    response = client.send_task_success(taskToken=task['taskToken'], output=task['input'])
    print(response)
    return "done"

Я также перехожу к этой проблеме, когда пытался объединить SFN для организации пакетных заданий AWS. практика, предложенная выше, проблематична, так как вы должны передать taskToken, поэтому вам нужно из лямбды внутри конечного автомата опросить TaskToken из очереди и передать его в S3 или куда-нибудь, чтобы другая лямбда представила статус активности,

Проблема в том, что при опросе taskToken вы не можете знать, принадлежит ли он вашему экземпляру конечного автомата. вместо этого вы можете получить токен на другом экземпляре той же самой машины. лично я думаю, что было бы здорово, если бы AWS поддерживал эту функциональность, что они легко могут сделать...

Ну, я бы вдохновился с https://aws.amazon.com/blogs/compute/implementing-serverless-manual-approval-steps-in-aws-step-functions-and-amazon-api-gateway/

В этом случае вы можете заменить шлюз API лямбда-функцией AWS, запускаемой, например, событием S3 (Документация: http://docs.aws.amazon.com/lambda/latest/dg/with-s3.html). Просто убедитесь, что ваша задача имеет соответствующее время ожидания.

Другие вопросы по тегам