Azkaban: передать параметры в базовый код задания
Можно ли передать параметры из рабочего процесса азкабана в базовый код задания?
У меня есть что-то вроде этого, это вроде работает для жестко закодированных / заранее известных дат, но я хотел бы иметь возможность указать дату, когда я выполняю поток:
from azkaban import Job, Project
import datetime
import os
from datetime import datetime, timezone, timedelta
options = {
'start.date' : today.strftime('%Y-%m-%d'), # Can we pass this as an argument to the underlying code?
'day.offset' : 1
}
project = Project('my_project',root=__file__)
project.add_file('my_shell_script.sh', 'my_shell_script.sh')
project.add_job('my_job', Job(options, {'type' : 'command' : 'bash my_shell_script <pass date here?>'}))
project.add_job('my_job', Job(options, {'type' : 'command' : 'java -jar test.jar <pass date here?>'}))
Спасибо, Шарат
3 ответа
Большая картина: сделайте параметр постоянным, записав его на диск
Одним из способов передачи параметров между несмежными заданиями в потоке Azkaban является использование JOB_OUTPUT_PROP_FILE непосредственно перед тем, как вам понадобится параметр. Это необходимо сделать с помощью сценария оболочки, поскольку переменная JOB_OUTPUT_PROP_FILE не доступна напрямую для данного задания. Этот подход записывает соответствующую информацию в файл и читает ее непосредственно перед тем, как это потребуется, используя вспомогательный скрипт. Параметры могут быть переданы в соседние задания путем записи в JOB_OUTPUT_PROP_FILE на каждом шаге.
концепция
- записать на диск файл, содержащий информацию, которую вы хотите передать
- создайте сценарий оболочки помощника / подготовки, который запускается непосредственно перед заданием, для которого требуется параметр
- используйте параметр в своей работе
пример
В сценарии, когда дата, когда было выполнено первое задание в потоке, должна использоваться последним заданием, сначала запишите соответствующие данные в файл. В этом примере текущая дата в формате ГГГГ-ММ-ДД записывается в локальный файл с именем rundate.text.
#step_1.job
type=command
dependencies=initialize_jobs
command=whoami
command.1=/bin/sh -c "date '+%Y-%m-%d' > rundate.text"
Затем, перед тем, как параметр потребуется, запустите подготовительный сценарий, чтобы сделать параметр доступным.
#step_4_preparation.job
type=command
dependencies=step_3
command=whoami
command.1=/bin/bash rd.sh
Шаг 4 Подготовка выполняет следующий сценарий оболочки (rd.sh)
#!/bin/sh
# this script takes the run_date value from the text file and passes it to Azkaban
# Now, the variable can be used in the next step of the job
RD=$(cat rundate.text)
echo "Now setting Job Output Property File with RD (run date) variable"
echo $RD
#This is the point where the parameter is written in JSON format
#to the JOB_OUTPUT_PROP_FILE, which allows it to be used in the next step
echo '{"RD" : "'"$RD"'"}' > "${JOB_OUTPUT_PROP_FILE}"
Затем на следующем шаге можно использовать параметр, который в этом примере равен ${RD}.
# step_4.job
type=command
dependencies=step_4_preparation
command=whoami
command.1=bash -c "echo ${RD} is the run date"
Что ж,
Согласно Azkaban Doc, только глобальные свойства потока могут быть переопределены. В python мы можем установить глобальные свойства следующим образом:
project = Project('emr-cluster-creation', root=__file__)
project.properties = {
'emr.cluster.name' : 'default-clustername',
'emr.master.instance.type' : 'r3.xlarge',
'emr.core.instance.type' : 'r3.xlarge',
'emr.task.instance.type' : 'r3.xlarge',
'emr.instance.count' : 11,
'emr.task.instance.count' : 5,
'emr.hadoop.conf.local.path' : 's3n://bucket/hadoop-configuration.json',
'emr.hive.site.s3.path' : 's3n://bucket/hive-site.xml',
'emr.spark.version' : '1.4.0.b',
# 'emr.service.role' : 'some-role', #amazon IAM role.
'emr.service.role' : '\'\'', #amazon IAM role.
'emr.cluster.output' : '\'\''
}
# do something...
Эти параметры могут быть переданы в основное приложение / сценарии как ${emr.cluster.name}. Это будет поддерживать как значения свойств по умолчанию, которые будут передаваться, так и переопределение параметров потока либо на веб-интерфейсе сервера Azkaban, либо с помощью Azjaban ajax API.
Как правильно сказал, правильный путь - это использовать JOB_OUTPUT_PROP_FILE
но вместо того, чтобы сохранять его в файловой системе, я считаю, что лучше использовать признак того, что он передается всем его зависимостям ( Создание потоков> Конфигурация задания> Вывод параметров> "Свойства можно экспортировать для передачи их зависимостям"),
Чтобы использовать эту функцию, просто сделайте задания, которым нужны экспортированные параметры, зависимости от задания, которое их экспортирует. В случае с Востоком откажитесь от промежуточного step 4 preparation
и просто сделать step 4
зависит от step 1
,