Как устранить ошибку засолки в классе apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum?
При удаленном запуске конвейера данных возникает ошибка PicklingError: конвейер данных был написан с использованием Beam SDK для Python, и я запускаю его поверх облачного потока данных Google. Трубопровод работает нормально, когда я запускаю его локально.
Следующий код генерирует PicklingError: это должно воспроизвести проблему
import apache_beam as beam
from apache_beam.transforms import pvalue
from apache_beam.io.fileio import _CompressionType
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.options import GoogleCloudOptions
from apache_beam.utils.options import SetupOptions
from apache_beam.utils.options import StandardOptions
if __name__ == "__main__":
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
pipeline_options.view_as(SetupOptions).save_main_session = True
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = "project-name"
google_cloud_options.job_name = "job-name"
google_cloud_options.staging_location = 'gs://path/to/bucket/staging'
google_cloud_options.temp_location = 'gs://path/to/bucket/temp'
p = beam.Pipeline(options=pipeline_options)
p.run()
Ниже приведен пример с начала и конца трассировки:
WARNING: Could not acquire lock C:\Users\ghousains\AppData\Roaming\gcloud\credentials.lock in 0 seconds
WARNING: The credentials file (C:\Users\ghousains\AppData\Roaming\gcloud\credentials) is not writable. Opening in read-only mode. Any refreshed credentials will only be valid for this run.
Traceback (most recent call last):
File "formatter_debug.py", line 133, in <module>
p.run()
File "C:\Miniconda3\envs\beam\lib\site-packages\apache_beam\pipeline.py", line 159, in run
return self.runner.run(self)
....
....
....
File "C:\Miniconda3\envs\beam\lib\sitepackages\apache_beam\runners\dataflow_runner.py", line 172, in run
self.dataflow_client.create_job(self.job))
StockPickler.save_global(pickler, obj)
File "C:\Miniconda3\envs\beam\lib\pickle.py", line 754, in save_global (obj, module, name))
pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum
3 ответа
Я обнаружил, что ваша ошибка возникает, когда объект Pipeline включается в контекст, который обрабатывается и отправляется в облако:
pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum
Естественно, вы можете спросить:
- Что делает объект конвейера непроницаемым, когда он отправляется в облако, поскольку обычно он может быть рассортирован?
- Если бы это было действительно проблемой, то я бы не получал эту ошибку постоянно - разве объект Pipeline обычно не включается в контекст, отправляемый в облако?
- Если объект Pipeline обычно не включается в контекст, отправляемый в облако, то почему объект Pipeline включается в мой случай?
(1)
Когда вы звоните p.run()
на трубопроводе с cloud=True
одна из первых вещей, которая происходит в том, что p.runner.job=apiclient.Job(pipeline.options)
установлен в apache_beam.runners.dataflow_runner.DataflowPipelineRunner.run
,
Без этого установленного атрибута конвейер может быть выбран. Но как только это установлено, конвейер больше не может быть засолен, так как p.runner.job.proto._Message__tags[17]
это TypeValueValuesEnum
, который определяется как вложенный класс в apache_beam.internal.clients.dataflow.dataflow_v1b3_messages
, Вложенные классы AFAIK не могут быть выбраны (даже укропом - см. Как я могу выбрать вложенный класс в Python?).
(2) - (3)
Противоположным образом объект Pipeline обычно не включается в контекст, отправляемый в облако. Когда вы звоните p.run()
на трубопроводе с cloud=True
только следующие объекты травятся (и обратите внимание, что травление происходит после p.runner.job
устанавливается):
- Если
save_main_session=True
тогда все глобальные объекты в модуле обозначены__main__
маринованные (__main__
это скрипт, который вы запустили из командной строки). - Каждое преобразование, определенное в конвейере, обрабатывается индивидуально
В вашем случае вы столкнулись с #1, поэтому ваше решение сработало. Я на самом деле столкнулся с № 2, где я определил beam.Map
лямбда-функция как метод составного PTransform
, (Когда применяются составные преобразования, конвейер добавляется как атрибут преобразования...) Мое решение состояло в том, чтобы вместо этого определить эти лямбда-функции в модуле.
Более долгосрочное решение было бы для нас, чтобы исправить это в проекте Apache Beam. TBD!
Это должно быть исправлено в gd-dataflow 0.4.4 sdk release с https://github.com/apache/incubator-beam/pull/1485
Я решил эту проблему, заключив тело main в метод run() и вызвав run().