Загадочное сообщение Google Cloud Dataflow при загрузке файла из gcp в локальную систему
Я пишу конвейер потока данных, который обрабатывает видео из облака Google. Мой конвейер загружает каждый рабочий элемент в локальную систему, а затем повторно загружает результаты обратно в корзину GCP. После предыдущего вопроса.
Конвейер работает на локальном DirectRunner, у меня проблемы с отладкой на DataFlowRunnner.
Ошибка читает
File "run_clouddataflow.py", line 41, in process
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 464, in download_to_file self._do_download(transport, file_obj, download_url, headers)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 418, in _do_download download.consume(transport) File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 101, in consume self._write_to_stream(result)
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 62, in _write_to_stream with response: AttributeError: __exit__ [while running 'Run DeepMeerkat']
При попытке выполнить blob.download_to_file(file_obj) в пределах:
storage_client=storage.Client()
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)
#store local path
local_path="/tmp/" + parsed.path.split("/")[-1]
print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
print("Downloaded" + local_path)
Я предполагаю, что рабочие не имеют права писать на месте? Или, возможно, в контейнере потока данных нет папки /tmp. Где я должен писать объекты? Его трудно отлаживать без доступа к окружающей среде. Возможно ли получить доступ к стандартному выводу от рабочих для целей отладки (последовательная консоль?)
РЕДАКТИРОВАТЬ #1
Я попытался явно передать учетные данные:
try:
credentials, project = google.auth.default()
except:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
credentials, project = google.auth.default()
а также запись в cwd() вместо /tmp/
local_path=parsed.path.split("/")[-1]
print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
Все еще получаю загадочную ошибку при загрузке BLOB-объектов из gcp.
Полный скрипт конвейера находится ниже, setup.py здесь.
import logging
import argparse
import json
import logging
import os
import csv
import apache_beam as beam
from urlparse import urlparse
from google.cloud import storage
##The namespaces inside of clouddataflow workers is not inherited ,
##Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors, better to write ugly import statements then to miss a namespace
class PredictDoFn(beam.DoFn):
def process(self,element):
import csv
from google.cloud import storage
from DeepMeerkat import DeepMeerkat
from urlparse import urlparse
import os
import google.auth
DM=DeepMeerkat.DeepMeerkat()
print(os.getcwd())
print(element)
#try adding credentials?
#set credentials, inherent from worker
credentials, project = google.auth.default()
#download element locally
parsed = urlparse(element[0])
#parse gcp path
storage_client=storage.Client(credentials=credentials)
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)
#store local path
local_path=parsed.path.split("/")[-1]
print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
blob.download_to_file(file_obj)
print("Downloaded" + local_path)
#Assign input from DataFlow/manifest
DM.process_args(video=local_path)
DM.args.output="Frames"
#Run DeepMeerkat
DM.run()
#upload back to GCS
found_frames=[]
for (root, dirs, files) in os.walk("Frames/"):
for files in files:
fileupper=files.upper()
if fileupper.endswith((".JPG")):
found_frames.append(os.path.join(root, files))
for frame in found_frames:
#create GCS path
path="DeepMeerkat/" + parsed.path.split("/")[-1] + "/" + frame.split("/")[-1]
blob=storage.Blob(path,bucket)
blob.upload_from_filename(frame)
def run():
import argparse
import os
import apache_beam as beam
import csv
import logging
import google.auth
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='input', default="gs://api-project-773889352370-testing/DataFlow/manifest.csv",
help='Input file to process.')
parser.add_argument('--authtoken', default="/Users/Ben/Dropbox/Google/MeerkatReader-9fbf10d1e30c.json",
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args()
#set credentials, inherent from worker
try:
credentials, project = google.auth.default()
except:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
credentials, project = google.auth.default()
p = beam.Pipeline(argv=pipeline_args)
vids = (p|'Read input' >> beam.io.ReadFromText(known_args.input)
| 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next())
| 'Run DeepMeerkat' >> beam.ParDo(PredictDoFn()))
logging.getLogger().setLevel(logging.INFO)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
1 ответ
Я говорил с менеджером пакета google-cloud-storage, это была известная проблема. Обновление конкретной версии в моем файле setup.py для
REQUIRED_PACKAGES = ["google-cloud-storage==1.3.2","google-auth","requests>=2.18.0"]
исправил проблему.
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3836