Использование start_bundle() в работе Apache-Beam не работает. Непередаваемое хранилище. Клиент ()

Я получаю эту ошибку

pickle.PicklingError: Выборка клиентских объектов явно не поддерживается. Клиенты имеют нетривиальное состояние, которое является локальным и неприметным.

При попытке использовать beam.ParDo для вызова функции, которая выглядит следующим образом

class ExtractBlobs(beam.DoFn):
    def start_bundle(self):
        self.storageClient = storage.Client()

    def process(self, element):
        client = self.storageClient
        bucket = client.get_bucket(element)
        blobs = list(bucket.list_blobs(max_results=100))
        return blobs

Я думал, что весь смысл start_bundle состоял в том, чтобы инициализировать self.someProperty, а затем использовать это self.someProperty в методе 'process', чтобы избавиться от проблемы травления (из источников ниже). Может ли кто-нибудь указать мне правильное направление, как решить это?

[+] Что я прочитал:

https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

Как устранить ошибку засолки в классе apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum?

1 ответ

ОБНОВЛЕНО: проблема была фактически проблемой библиотеки. У меня должна была быть правильная версия apache-beam SDK с правильными версиями Google-Cloud:

gapic-Google облако-PubSub-v1==0.15.4

GAX-Google-каротаж v2==0.8.3

GAX-Google-PubSub-v1 == 0.8.3

google-api-core == 1.1.2 google-api-python-client == 1.6.7

Google-apitools==0.5.10

Google-аутентификации ==1.4.1

Google-Auth-httplib2==0.0.3

Google облако-BigQuery == 1.1.0

Google облако-ядро ==0.28.1

Google облако-хранилищу ==1.6.0

Google облако-PubSub==0.26.0

Google-облако хранения == 1.10.0

Google-GAX ==0.12.5

апаша-лучевое ==2.3.0

Я смог решить эту проблему с помощью комбинации вещей: во-первых, я ничего не сериализовал (некрасивый вкладыш в выход), а во-вторых, использовал threading.local().

class ExtractBlobs(beam.DoFn):
    def start_bundle(self):
        self.threadLocal = threading.local()
        self.threadLocal.client = storage.Client()

    def process(self, element):
        yield list(storage.Client().get_bucket(element).list_blobs(max_results=100))
Другие вопросы по тегам