Azkaban: передать параметры в базовый код задания

Можно ли передать параметры из рабочего процесса азкабана в базовый код задания?

У меня есть что-то вроде этого, это вроде работает для жестко закодированных / заранее известных дат, но я хотел бы иметь возможность указать дату, когда я выполняю поток:

from azkaban import Job, Project
import datetime
import os
from datetime import datetime, timezone, timedelta




options = {
            'start.date' : today.strftime('%Y-%m-%d'), # Can we pass this as an argument to the underlying code?
            'day.offset' : 1
            }

project = Project('my_project',root=__file__)
project.add_file('my_shell_script.sh', 'my_shell_script.sh')
project.add_job('my_job', Job(options, {'type' : 'command' : 'bash my_shell_script <pass date here?>'}))
project.add_job('my_job', Job(options, {'type' : 'command' : 'java -jar test.jar <pass date here?>'}))

Спасибо, Шарат

3 ответа

Большая картина: сделайте параметр постоянным, записав его на диск

Одним из способов передачи параметров между несмежными заданиями в потоке Azkaban является использование JOB_OUTPUT_PROP_FILE непосредственно перед тем, как вам понадобится параметр. Это необходимо сделать с помощью сценария оболочки, поскольку переменная JOB_OUTPUT_PROP_FILE не доступна напрямую для данного задания. Этот подход записывает соответствующую информацию в файл и читает ее непосредственно перед тем, как это потребуется, используя вспомогательный скрипт. Параметры могут быть переданы в соседние задания путем записи в JOB_OUTPUT_PROP_FILE на каждом шаге.

Передать параметры между диаграммой заданий


концепция

  1. записать на диск файл, содержащий информацию, которую вы хотите передать
  2. создайте сценарий оболочки помощника / подготовки, который запускается непосредственно перед заданием, для которого требуется параметр
  3. используйте параметр в своей работе

пример

В сценарии, когда дата, когда было выполнено первое задание в потоке, должна использоваться последним заданием, сначала запишите соответствующие данные в файл. В этом примере текущая дата в формате ГГГГ-ММ-ДД записывается в локальный файл с именем rundate.text.

#step_1.job
type=command
dependencies=initialize_jobs
command=whoami
command.1=/bin/sh -c "date '+%Y-%m-%d' > rundate.text"

Затем, перед тем, как параметр потребуется, запустите подготовительный сценарий, чтобы сделать параметр доступным.

#step_4_preparation.job
type=command
dependencies=step_3
command=whoami
command.1=/bin/bash rd.sh

Шаг 4 Подготовка выполняет следующий сценарий оболочки (rd.sh)

#!/bin/sh
# this script takes the run_date value from the text file and passes it to Azkaban
# Now, the variable can be used in the next step of the job

RD=$(cat rundate.text)

echo "Now setting Job Output Property File with RD (run date) variable"
echo $RD


#This is the point where the parameter is written in JSON format
#to the JOB_OUTPUT_PROP_FILE, which allows it to be used in the next step
echo '{"RD" : "'"$RD"'"}' > "${JOB_OUTPUT_PROP_FILE}"

Затем на следующем шаге можно использовать параметр, который в этом примере равен ${RD}.

# step_4.job
type=command
dependencies=step_4_preparation
command=whoami
command.1=bash -c "echo ${RD} is the run date"

Что ж,

Согласно Azkaban Doc, только глобальные свойства потока могут быть переопределены. В python мы можем установить глобальные свойства следующим образом:

    project = Project('emr-cluster-creation', root=__file__)

project.properties = {
                                                    'emr.cluster.name' : 'default-clustername',
                                                    'emr.master.instance.type' : 'r3.xlarge',
                                                    'emr.core.instance.type' : 'r3.xlarge',
                                                    'emr.task.instance.type' : 'r3.xlarge',
                                                    'emr.instance.count' : 11,
                                                    'emr.task.instance.count' : 5,
                                                    'emr.hadoop.conf.local.path' : 's3n://bucket/hadoop-configuration.json',
                                                    'emr.hive.site.s3.path' : 's3n://bucket/hive-site.xml',
                                                    'emr.spark.version' : '1.4.0.b',
#                                                   'emr.service.role' : 'some-role', #amazon IAM role.
                                                    'emr.service.role' : '\'\'', #amazon IAM role.
                                                    'emr.cluster.output' : '\'\''
    }

# do something...

Эти параметры могут быть переданы в основное приложение / сценарии как ${emr.cluster.name}. Это будет поддерживать как значения свойств по умолчанию, которые будут передаваться, так и переопределение параметров потока либо на веб-интерфейсе сервера Azkaban, либо с помощью Azjaban ajax API.

Как правильно сказал, правильный путь - это использовать JOB_OUTPUT_PROP_FILE но вместо того, чтобы сохранять его в файловой системе, я считаю, что лучше использовать признак того, что он передается всем его зависимостям ( Создание потоков> Конфигурация задания> Вывод параметров> "Свойства можно экспортировать для передачи их зависимостям"),

Чтобы использовать эту функцию, просто сделайте задания, которым нужны экспортированные параметры, зависимости от задания, которое их экспортирует. В случае с Востоком откажитесь от промежуточного step 4 preparation и просто сделать step 4 зависит от step 1,

Другие вопросы по тегам