Google Cloud Dataflow Python, получение идентификатора задания

В настоящее время я работаю над шаблоном потока данных в Python, и я хотел бы получить доступ к идентификатору задания и использовать его для сохранения в конкретном документе Firestore.

Можно ли получить доступ к идентификатору работы?

Я не могу найти ничего относительно этого в документации.

5 ответов

Решение

Вы можете сделать это, позвонив dataflow.projects().locations().jobs().list изнутри конвейера (см. полный код ниже). Одна возможность состоит в том, чтобы всегда вызывать шаблон с одним и тем же именем задания, что имело бы смысл, в противном случае префикс задания можно было бы передать в качестве параметра времени выполнения. Список заданий анализируется с использованием регулярного выражения, чтобы определить, содержит ли задание префикс имени и, если да, возвращает идентификатор задания. Если их больше одного, он вернет только последний (который работает в данный момент).

Шаблон ставится после определения PROJECT а также BUCKET переменные, с:

python script.py \
    --runner DataflowRunner \
    --project $PROJECT \
    --staging_location gs://$BUCKET/staging \
    --temp_location gs://$BUCKET/temp \
    --template_location gs://$BUCKET/templates/retrieve_job_id

Затем укажите желаемое название работы (myjobprefix в моем случае) при выполнении шаблонной работы:

gcloud dataflow jobs run myjobprefix \
   --gcs-location gs://$BUCKET/templates/retrieve_job_id

retrieve_job_id функция вернет идентификатор задания из задания, измените job_prefix соответствовать названию

import argparse, logging, re
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def retrieve_job_id(element):
  project = 'PROJECT_ID'
  job_prefix = "myjobprefix"
  location = 'us-central1'

  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))

  try:
    credentials = GoogleCredentials.get_application_default()
    dataflow = build('dataflow', 'v1b3', credentials=credentials)

    result = dataflow.projects().locations().jobs().list(
      projectId=project,
      location=location,
    ).execute()

    job_id = "none"

    for job in result['jobs']:
      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
        job_id = job['id']
        break

    logging.info("Job ID: {}".format(job_id))
    return job_id

  except Exception as e:
    logging.info("Error retrieving Job ID")
    raise KeyError(e)


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  p = beam.Pipeline(options=pipeline_options)

  init_data = (p
               | 'Start' >> beam.Create(["Init pipeline"])
               | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))

  p.run()


if __name__ == '__main__':
  run()

Вы можете использовать Google Dataflow API. Используйте метод projects.jobs.list для получения идентификаторов заданий потока данных.

При просмотре документации отклик, который вы должны получить при запуске задания, должен содержать тело json со свойством "job", которое является экземпляром Job.

Вы должны быть в состоянии использовать это, чтобы получить идентификатор, который вам нужен.

Если вы используете Google Cloud SDK для потока данных, вы можете получить другой объект при вызове метода создания на templates(),

Следующий фрагмент кода запускает шаблон потока данных, хранящийся в корзине GCS, получает идентификатор задания из тела ответа API шаблона запуска и, наконец, опрашивает конечное состояние задания потока данных, например, каждые 10 секунд.

Официальная документация Google Cloud для тела ответа находится здесь.

До сих пор я видел только шесть состояний задания потока данных. Пожалуйста, дайте мне знать, если я пропустил другие.

      def launch_dataflow_template(project_id, location, credentials, template_path):
    dataflow = googleapiclient.discovery.build('dataflow', 'v1b3', credentials=credentials)
    logger.info(f"Template path: {template_path}")
    result = dataflow.projects().locations().templates().launch(
            projectId=project_id,
            location=location,
            body={
                ...
            },
            gcsPath=template_path  # dataflow template path
    ).execute()
    return result.get('job', {}).get('id')

def poll_dataflow_job_status(project_id, location, credentials, job_id):
    dataflow = googleapiclient.discovery.build('dataflow', 'v1b3', credentials=credentials)
    # executing states are not the final states of a Dataflow job, they show that the Job is transitioning into another upcoming state
    executing_states = ['JOB_STATE_PENDING', 'JOB_STATE_RUNNING', 'JOB_STATE_CANCELLING']
    # final states do not change further
    final_states = ['JOB_STATE_DONE', 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED']
    while True:
        job_desc =_get_dataflow_job_status(dataflow, project_id, location, job_id)
        if job_desc['currentState'] in executing_states:
            pass
        elif job_desc['currentState'] in final_states:
            break
        sleep(10)
    return job_id, job_desc['currentState']

Вы можете получить метаданные gcp, используя эти функции луча в версии 2.35.0. Вы можете посетить документацию https://beam.apache.org/releases/pydoc/2.35.0/_modules/apache_beam/io/gcp/gce_metadata_util.html#fetch_dataflow_job_id

      beam.io.gcp.gce_metadata_util._fetch_custom_gce_metadata("job_name")
beam.io.gcp.gce_metadata_util._fetch_custom_gce_metadata("job_id")
Другие вопросы по тегам