Ошибка потока данных: "Клиенты имеют нетривиальное состояние, которое является локальным и недоступным для выбора"

У меня есть конвейер, который я могу выполнить локально без каких-либо ошибок. Я использовал, чтобы получить эту ошибку в моем локально запущенном конвейере

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

Я полагаю, что я исправил это путем понижения до apache-beam=2.3.0. Тогда локально он работал бы отлично.

Теперь я использую DataflowRunner и в файле needs.txt у меня есть следующие зависимости

    apache-beam==2.3.0
    google-cloud-bigquery==1.1.0
    google-cloud-core==0.28.1
    google-cloud-datastore==1.6.0
    google-cloud-storage==1.10.0
    protobuf==3.5.2.post1
    pytz==2013.7

но я снова получаю эту ужасную ошибку

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

Почему это дает мне ошибку с DataflowRunner, но не DirectRunner? не должны ли они использовать одни и те же зависимости / среду? Любая помощь будет оценена.

Я читал, что это способ решить эту проблему, но когда я пытаюсь это сделать, я все равно получаю ту же ошибку

    class MyDoFn(beam.DoFn):

        def start_bundle(self, process_context):
            self._dsclient = datastore.Client()

        def process(self, context, *args, **kwargs):
        # do stuff with self._dsclient

со https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

Мой предыдущий справочный пост, где я исправил это локально:

Использование start_bundle() в работе Apache-Beam не работает. Непередаваемое хранилище. Клиент ()

Заранее спасибо!

0 ответов

Инициализация клиентов, которые невозможно выбрать, в start_bundleМетод - правильный подход, и операторы ввода-вывода Beam часто следуют ему, см. в качестве примера datastoreio.py. Вот конвейер, который выполняет простую операцию с клиентом Python GCS в DoFn. Я запустил его на Apache Beam 2.16.0 без проблем. Если вы все же можете воспроизвести проблему, предоставьте дополнительные сведения.

файл gcs_client.py:

import argparse
import logging
import time

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

class MyDoFn(beam.DoFn):
  def start_bundle(self):
    self.storage_client = storage.Client()

  def process(self, element):
    bucket = self.storage_client.get_bucket("existing-gcs-bucket")
    blob = bucket.blob(str(int(time.time())))
    blob.upload_from_string("payload")
    return element

logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()

pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())

p.run().wait_until_finish()

файл requirements.txt:

google-cloud-storage==1.23.0

командная строка:

python -m gcs_client \
    --project=insert_your_project \
    --runner=DataflowRunner \
    --temp_location gs://existing-gcs-bucket/temp/ \
    --requirements_file=requirements.txt \
    --save_main_session

У меня была аналогичная проблема, когда Dataflow записывал кучу строк в Bigtable. Настройка--save-main-session к False похоже решил это.

Другие вопросы по тегам