как называется ОПЕРАЦИЯ в gcloud ai-platform operations wait OPERATION?
Мне нужно дождаться завершения работы по обучению машинному обучению, прежде чем приступить к остальной части рабочего процесса.
Я использую Composer/Airflow для оркестровки своих задач. Моя первая задача - запустить обучение машинному обучению на платформе ИИ, затем мне нужно дождаться завершения этого обучения, прежде чем перейти к следующей задаче.
У меня проблемы с пониманием документации здесь, в которой объясняется, как можно дождаться завершения операции ML.
В документации указано:gcloud ai-platform operations wait OPERATION
В настоящее время мой код:
gcloud ai-platform operations wait {{ params.JOB_NAME }}
и я получаю сообщение об ошибке:
Running command: gcloud ai-platform operations wait ai_composer_20191119_201848
[2019-11-19 20:18:48,648] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:48,647] {bash_operator.py:97} INFO - Output:
[2019-11-19 20:18:49,811] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,810] {bash_operator.py:101} INFO - ERROR: (gcloud.ai-platform.operations.wait) NOT_FOUND: Field: name Error: The specified job was not found.
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - - '@type': type.googleapis.com/google.rpc.BadRequest
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - fieldViolations:
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - - description: The specified job was not found.
[2019-11-19 20:18:49,813] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - field: name
[2019-11-19 20:18:49,853] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,852] {bash_operator.py:105} INFO - Command exited with return code 1
[2019-11-19 20:18:49,883] {models.py:1595} ERROR - Bash command failed
Что должно быть в поле OPERATION?
Для большего контекста мой даг выглядит так:
with dag:
env = {}
env['BUCKET'] = models.Variable.get('bucket_name')
env['JOB_NAME'] = "ai_composer_{}".format(datetime.now().strftime("%Y%m%d_%H%M%S"))
env['JOB_DIR'] = "gs://{bucket}/jobs/{job_name}".format(bucket=env['BUCKET'], job_name=env['JOB_NAME'])
env['REGION'] = models.Variable.get('ai_region')
env['PACKAGE_PATH'] = models.Variable.get('ai_training_package')
env['CONFIG'] = models.Variable.get('train_config_path', deserialize_json=True)
env['OUTPUT_FOLDER'] = "{job_dir}/model/".format(job_dir=env['JOB_DIR'])
env['DUMMY_TRAINING_FILE'] = models.Variable.get('dummy_training_file')
test_training = BashOperator(
task_id='test_training',
xcom_push=True,
bash_command='gcloud ai-platform jobs submit training {{ params.JOB_NAME }} \
--region {{ params.REGION }} \
--scale-tier=CUSTOM \
--python-version 3.5 \
--runtime-version 1.13 \
--master-machine-type n1-highcpu-16 \
--staging-bucket gs://{{ params.BUCKET }} \
--job-dir {{ params.JOB_DIR }} \
--module-name trainer.task \
--packages {{ params.PACKAGE_PATH }} \
-- \
--gcs-bucket gs://{{ params.BUCKET }} \
--train-file {{ params.DUMMY_TRAINING_FILE }}\
--verbose-logging \
--data-type web_views \
--delimiter , \
&& echo "{{ params.JOB_NAME }}"',
params=env
)
get_ml_status = BashOperator(
task_id='get_ml_status',
xcom_push=True,
bash_command='gcloud ai-platform operations wait {{ params.JOB_NAME }}',
params=env
)
test_training >> get_ml_status
Здесь test_training успешна, поэтому задание на обучение запускается до запуска задачи get_ml_status.
1 ответ
В итоге я нашел обходной путь, проверив статус работы с описанием вакансий.
output = sh.gcloud("ai-platform", "jobs", "describe", job_name)
Я выполняю цикл while, пока состояние не станет SUCCEEDED
или FAILED
.