Вызов конвейера из конвейера в Amazon Data Pipeline
Моя рабочая команда в настоящее время ищет замену довольно дорогому инструменту ETL, который на данный момент мы используем в качестве прославленного планировщика. Любую интеграцию, предлагаемую инструментом ETL, мы улучшили с помощью нашего собственного кода на Python, поэтому мне просто нужна его способность планирования. Одним из вариантов, который мы рассматриваем, является Data Pipeline, который я сейчас пилотирую.
Моя проблема заключается в следующем: представьте, что нам нужно загрузить два набора данных - продукты и продажи. Каждый из этих наборов данных требует нескольких шагов для загрузки (получение исходных данных, вызов сценария Python для преобразования, загрузка в Redshift). Однако продукт должен быть загружен до начала продаж, так как нам нужна стоимость продукта и т. Д. Для расчета маржи. Возможно ли иметь "главный" конвейер в конвейере данных, который сначала вызывает продукты, ожидает их успешного завершения, а затем вызывает продажи? Если так, то как? Я также открыт для предложений по другим продуктам, если конвейер данных не подходит для такого типа рабочего процесса. Ценим помощь
2 ответа
Я думаю, что я могу относиться к этому варианту использования. В любом случае, Data Pipeline не выполняет такого рода управление зависимостями самостоятельно. Однако его можно смоделировать с помощью предварительных условий файла.
В этом примере ваши дочерние конвейеры могут зависеть от наличия файла (в качестве предварительного условия) перед запуском. Главный конвейер будет создавать файлы триггеров на основе некоторой логики, выполняемой в его действиях. Дочерний конвейер может создавать другие файлы триггеров, которые будут запускать последующий конвейер вниз по течению.
Другое решение - использовать продукт Simple Workflow. Это имеет функции, которые вы ищете, но потребует пользовательского кодирования с использованием Flow SDK.
Это базовый вариант использования datapipeline, и он определенно должен быть возможным. Вы можете использовать их графический редактор конвейеров для создания этого конвейера. Решение проблемы:
Есть два набора данных:
- Товар
- Продажи
Шаги для загрузки этих наборов данных:
- Получить исходные данные: скажем, из S3. Для этого используйте S3DataNode
- Вызовите сценарий Python для преобразования: используйте ShellCommandActivity с подготовкой. Конвейер данных выполняет неявное размещение данных для узлов S3DataNode, подключенных к ShellCommandActivity. Вы можете использовать их, используя специальные переменные env: Подробности
- Загрузить вывод в Redshift: используйте RedshiftDatabase
Вам нужно будет добавить вышеупомянутые компоненты для каждого набора данных, с которым вам нужно работать (в данном случае это продукт и продажи). Для простоты управления вы можете запустить их на экземпляре EC2.
Условие: "продукт" должен быть загружен до "продаж"
- Добавить зависит от отношения. Добавьте это поле в ShellCommandActivity of Sales, которое ссылается на ShellCommandActivity продукта. См. Поле зависящий от в документации. В нем говорится: "Одна или несколько ссылок на другие действия, которые должны перейти в состояние FINISHED, прежде чем начнется это действие".
Совет: В большинстве случаев вы не хотели бы, чтобы выполнение вашего следующего дня началось, в то время как выполнение предыдущего дня все еще активно, то есть РАБОТАЕТ. Чтобы избежать такого сценария, используйте поле "maxActiveInstances" и установите его в "1".