Использование start_bundle() в работе Apache-Beam не работает. Непередаваемое хранилище. Клиент ()
Я получаю эту ошибку
pickle.PicklingError: Выборка клиентских объектов явно не поддерживается. Клиенты имеют нетривиальное состояние, которое является локальным и неприметным.
При попытке использовать beam.ParDo для вызова функции, которая выглядит следующим образом
class ExtractBlobs(beam.DoFn):
def start_bundle(self):
self.storageClient = storage.Client()
def process(self, element):
client = self.storageClient
bucket = client.get_bucket(element)
blobs = list(bucket.list_blobs(max_results=100))
return blobs
Я думал, что весь смысл start_bundle состоял в том, чтобы инициализировать self.someProperty, а затем использовать это self.someProperty в методе 'process', чтобы избавиться от проблемы травления (из источников ниже). Может ли кто-нибудь указать мне правильное направление, как решить это?
[+] Что я прочитал:
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191
1 ответ
ОБНОВЛЕНО: проблема была фактически проблемой библиотеки. У меня должна была быть правильная версия apache-beam SDK с правильными версиями Google-Cloud:
gapic-Google облако-PubSub-v1==0.15.4
GAX-Google-каротаж v2==0.8.3
GAX-Google-PubSub-v1 == 0.8.3
google-api-core == 1.1.2 google-api-python-client == 1.6.7
Google-apitools==0.5.10
Google-аутентификации ==1.4.1
Google-Auth-httplib2==0.0.3
Google облако-BigQuery == 1.1.0
Google облако-ядро ==0.28.1
Google облако-хранилищу ==1.6.0
Google облако-PubSub==0.26.0
Google-облако хранения == 1.10.0
Google-GAX ==0.12.5
апаша-лучевое ==2.3.0
Я смог решить эту проблему с помощью комбинации вещей: во-первых, я ничего не сериализовал (некрасивый вкладыш в выход), а во-вторых, использовал threading.local().
class ExtractBlobs(beam.DoFn):
def start_bundle(self):
self.threadLocal = threading.local()
self.threadLocal.client = storage.Client()
def process(self, element):
yield list(storage.Client().get_bucket(element).list_blobs(max_results=100))