Ошибка при создании файла шаблона Google Dataflow

Я пытаюсь запланировать поток данных, который завершится через определенное время с использованием шаблона. Я могу успешно сделать это, используя командную строку, но когда я пытаюсь сделать это с помощью Google Cloud Scheduler, я получаю ошибку при создании шаблона.

Ошибка

         File "pipelin_stream.py", line 37, in <module>
    main()
  File "pipelin_stream.py", line 34, in main
    result.cancel()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1638, in cancel
    raise IOError('Failed to get the Dataflow job id.')
IOError: Failed to get the Dataflow job id.

Команда, которую я использую для создания шаблона,

         python pipelin_stream.py \
--runner Dataflowrunner \
--project $PROJECT \
--temp_location $BUCKET/tmp \
--staging_location $BUCKET/staging \
--template_location $BUCKET/templates/time_template_test \
--streaming

И файл конвейера, который у меня есть, это

         from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys


PROJECT = 'projectID'
schema = 'ex1:DATE, ex2:STRING'
TOPIC = "projects/topic-name/topics/scraping-test"

def main(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument("--input_topic")
    parser.add_argument("--output")
    known_args = parser.parse_known_args(argv)

    p = beam.Pipeline(options=PipelineOptions(region='us-central1', service_account_email='email'))

    (p
        | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
        | 'Decode' >> beam.Map(lambda x:x.decode('utf-8'))
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('tablename'.format(PROJECT), schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    result = p.run()
    result.wait_until_finish(duration=3000)
    result.cancel()   # If the pipeline has not finished, you can cancel it

if __name__ == '__main__':
    logger = logging.getLogger().setLevel(logging.INFO)
    main()

Кто-нибудь знает, почему я могу получить эту ошибку?

1 ответ

Решение

Ошибка вызывается функцией отмены по истечении времени ожидания и кажется безвредной.

Чтобы доказать это, мне удалось воспроизвести вашу точную проблему с моей виртуальной машины с помощью python 3.5. Шаблон создается по указанному пути --template_locationи может использоваться для выполнения заданий. Обратите внимание, что мне нужно было внести некоторые изменения в ваш код, чтобы он действительно работал в Dataflow.

На случай, если он вам пригодится, в итоге я использовал этот код конвейера

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import datetime

# Fill this values in order to have them by default
# Note that the table in BQ needs to have the column names message_body and publish_time

Table = 'projectid:datasetid.tableid'
schema = 'ex1:STRING, ex2:TIMESTAMP'
TOPIC = "projects/<projectid>/topics/<topicname>"

class AddTimestamps(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """

        yield {
            "message_body": element.decode("utf-8"),
            "publish_time": datetime.datetime.utcfromtimestamp(
                float(publish_time)
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }


def main(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument("--input_topic", default=TOPIC)
    parser.add_argument("--output_table", default=Table)
    args, beam_args = parser.parse_known_args(argv)
    # save_main_session needs to be set to true due to modules being used among the code (mostly datetime)
    # Uncomment the service account email to specify a custom service account
    p = beam.Pipeline(argv=beam_args,options=PipelineOptions(save_main_session=True,
region='us-central1'))#, service_account_email='email'))

    (p
        | 'ReadData' >> beam.io.ReadFromPubSub(topic=args.input_topic).with_output_types(bytes)
        | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(args.output_table, schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    result = p.run()
    #Warning: Cancel does not work properly in a template
    result.wait_until_finish(duration=3000)
    result.cancel()   # Cancel the streaming pipeline after a while to avoid consuming more resources

if __name__ == '__main__':
    logger = logging.getLogger().setLevel(logging.INFO)
    main()

После этого я запустил команды:

# Fill accordingly
PROJECT="MYPROJECT-ID"
BUCKET="MYBUCKET"
TEMPLATE_NAME="TRIAL"

# create the template
python3 -m templates.template-pubsub-bigquery \
  --runner DataflowRunner \
  --project $PROJECT \
  --staging_location gs://$BUCKET/staging \
  --temp_location gs://$BUCKET/temp \
  --template_location gs://$BUCKET/templates/$TEMPLATE_NAME \
  --streaming

для создания конвейера (который дает указанную вами ошибку, но все же создает шаблон). И

# Fill job-name and gcs location accordingly
# Uncomment and fill the parameters should you want to use your own

gcloud dataflow jobs run <job-name> \
        --gcs-location "gs://<MYBUCKET>/dataflow/templates/mytemplate" 
   #     --parameters input_topic="", output_table=""

Запустить трубопровод.

Как я уже сказал, шаблон был правильно создан и конвейер работал правильно.


редактировать

Действительно, функция отмены не работает в шаблоне должным образом. Кажется, проблема заключается в том, что ему нужен идентификатор задания при создании шаблона, который, конечно, не существует, и в результате он пропускает функцию.

Я нашел этот другой пост, который обрабатывает извлечение идентификатора задания из конвейера. Я пробовал некоторые настройки, чтобы заставить его работать в самом коде шаблона, но я думаю, что в этом нет необходимости. Если вы хотите запланировать их выполнение, я бы выбрал более простой вариант и выполнил бы шаблон конвейера потоковой передачи в определенное время (например, 9:01 по Гринвичу) и отменил конвейер с помощью скрипта.

import logging, re,os
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

def retrieve_job_id():
  #Fill as needed
  project = '<project-id>'
  job_prefix = "<job-name>"
  location = '<location>'

  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))

  try:
    credentials = GoogleCredentials.get_application_default()
    dataflow = build('dataflow', 'v1b3', credentials=credentials)

    result = dataflow.projects().locations().jobs().list(
      projectId=project,
      location=location,
    ).execute()

    job_id = "none"

    for job in result['jobs']:
      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
        job_id = job['id']
        break

    logging.info("Job ID: {}".format(job_id))
    return job_id

  except Exception as e:
    logging.info("Error retrieving Job ID")
    raise KeyError(e)


os.system('gcloud dataflow jobs cancel {}'.format(retrieve_job_id()))

в другое время (например, 9:05 по Гринвичу). Этот сценарий предполагает, что вы запускаете сценарий с тем же именем каждый раз, и отменяет его, используя последнее появление имени. Я пробовал несколько раз, работает нормально.

Другие вопросы по тегам