Загадочное сообщение Google Cloud Dataflow при загрузке файла из gcp в локальную систему

Я пишу конвейер потока данных, который обрабатывает видео из облака Google. Мой конвейер загружает каждый рабочий элемент в локальную систему, а затем повторно загружает результаты обратно в корзину GCP. После предыдущего вопроса.

Конвейер работает на локальном DirectRunner, у меня проблемы с отладкой на DataFlowRunnner.

Ошибка читает

File "run_clouddataflow.py", line 41, in process 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 464, in download_to_file self._do_download(transport, file_obj, download_url, headers) 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 418, in _do_download download.consume(transport) File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 101, in consume self._write_to_stream(result) 
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 62, in _write_to_stream with response: AttributeError: __exit__ [while running 'Run DeepMeerkat']

При попытке выполнить blob.download_to_file(file_obj) в пределах:

storage_client=storage.Client()
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)

#store local path
local_path="/tmp/" + parsed.path.split("/")[-1]

print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
  blob.download_to_file(file_obj)

print("Downloaded" + local_path)

Я предполагаю, что рабочие не имеют права писать на месте? Или, возможно, в контейнере потока данных нет папки /tmp. Где я должен писать объекты? Его трудно отлаживать без доступа к окружающей среде. Возможно ли получить доступ к стандартному выводу от рабочих для целей отладки (последовательная консоль?)

РЕДАКТИРОВАТЬ #1

Я попытался явно передать учетные данные:

  try:
      credentials, project = google.auth.default()
  except:
      os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
      credentials, project = google.auth.default()

а также запись в cwd() вместо /tmp/

local_path=parsed.path.split("/")[-1]

print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
  blob.download_to_file(file_obj)

Все еще получаю загадочную ошибку при загрузке BLOB-объектов из gcp.

Полный скрипт конвейера находится ниже, setup.py здесь.

import logging
import argparse
import json
import logging
import os
import csv
import apache_beam as beam
from urlparse import urlparse
from google.cloud import storage

##The namespaces inside of clouddataflow workers is not inherited ,
##Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors, better to write ugly import statements then to miss a namespace

class PredictDoFn(beam.DoFn):
  def process(self,element):

    import csv
    from google.cloud import storage
    from DeepMeerkat import DeepMeerkat
    from urlparse import urlparse
    import os
    import google.auth


    DM=DeepMeerkat.DeepMeerkat()

    print(os.getcwd())
    print(element)

    #try adding credentials?
    #set credentials, inherent from worker
    credentials, project = google.auth.default()

    #download element locally
    parsed = urlparse(element[0])

    #parse gcp path
    storage_client=storage.Client(credentials=credentials)
    bucket = storage_client.get_bucket(parsed.hostname)
    blob=storage.Blob(parsed.path[1:],bucket)

    #store local path
    local_path=parsed.path.split("/")[-1]

    print('local path: ' + local_path)
    with open(local_path, 'wb') as file_obj:
      blob.download_to_file(file_obj)

    print("Downloaded" + local_path)

    #Assign input from DataFlow/manifest
    DM.process_args(video=local_path)
    DM.args.output="Frames"

    #Run DeepMeerkat
    DM.run()

    #upload back to GCS
    found_frames=[]
    for (root, dirs, files) in os.walk("Frames/"):
      for files in files:
        fileupper=files.upper()
        if fileupper.endswith((".JPG")):
          found_frames.append(os.path.join(root, files))

    for frame in found_frames:

      #create GCS path
      path="DeepMeerkat/" + parsed.path.split("/")[-1] + "/" + frame.split("/")[-1]
      blob=storage.Blob(path,bucket)
      blob.upload_from_filename(frame)

def run():
  import argparse
  import os
  import apache_beam as beam
  import csv
  import logging
  import google.auth

  parser = argparse.ArgumentParser()
  parser.add_argument('--input', dest='input', default="gs://api-project-773889352370-testing/DataFlow/manifest.csv",
                      help='Input file to process.')
  parser.add_argument('--authtoken', default="/Users/Ben/Dropbox/Google/MeerkatReader-9fbf10d1e30c.json",
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args()

  #set credentials, inherent from worker
  try:
      credentials, project = google.auth.default()
  except:
      os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
      credentials, project = google.auth.default()

  p = beam.Pipeline(argv=pipeline_args)

  vids = (p|'Read input' >> beam.io.ReadFromText(known_args.input)
       | 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next())
       | 'Run DeepMeerkat' >> beam.ParDo(PredictDoFn()))

  logging.getLogger().setLevel(logging.INFO)
  p.run()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

1 ответ

Решение

Я говорил с менеджером пакета google-cloud-storage, это была известная проблема. Обновление конкретной версии в моем файле setup.py для

REQUIRED_PACKAGES = ["google-cloud-storage==1.3.2","google-auth","requests>=2.18.0"]

исправил проблему.

https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3836

Другие вопросы по тегам