Google Dataflow: глобальное имя не определено - apache beam

В местном я имею это:

from shapely.geometry import Point
<...>
class GeoDataIngestion:
    def parse_method(self, string_input):
       Place = Point(float(values[2]), float(values[3]))
       <...>

Я запускаю это, с Python 2.7 и все идет хорошо

После этого я пытаюсь проверить это с помощью обработчика потока данных, но во время работы я получил эту ошибку:

NameError: global name 'Point' is not defined

Трубопровод:

geo_data = (raw_data
                    | 'Geo data transform' >> beam.Map(lambda s: geo_ingestion.parse_method(s))

Я прочитал другой пост, и я думаю, что это должно работать, но я не уверен, есть ли что-то особенное с Google Dataflow в этом

Я также попробовал:

import shapely.geometry
<...>
Place = shapely.geometry.Point(float(values[2]), float(values[3]))

С тем же результатом

NameError: global name 'shapely' is not defined

Любая идея?


В Google Cloud, если я попробовал в моей виртуальной среде, я могу сделать это без каких-либо проблем:

(env) ...@cloudshell:~ ()$ python
Python 2.7.13 (default, Sep 26 2018, 18:42:22)
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from shapely.geometry import Point
Var = Point(-5.020751953125, 39.92237576385941)

EXTRA:


Ошибка при использовании require.txt

Collecting Shapely==1.6.4.post1 (from -r req.txt (line 2))
  Using cached https://files.pythonhosted.org/packages/7d/3c/0f09841db07aabf9cc387662be646f181d07ed196e6f60ce8be5f4a8f0bd/Shapely-1.6.4.post1.tar.gz
  Saved c:\<...>\shapely-1.6.4.post1.tar.gz
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "c:\<...>\temp\pip-download-kpg5ca\Shapely\setup.py", line 80, in <module>
        from shapely._buildcfg import geos_version_string, geos_version, \
      File "shapely\_buildcfg.py", line 200, in <module>
        lgeos = CDLL("geos_c.dll")
      File "C:\Python27\Lib\ctypes\__init__.py", line 366, in __init__
        self._handle = _dlopen(self._name, mode)
    WindowsError: [Error 126] No se puede encontrar el m¢dulo especificado

Ошибка при использовании setup.py

Setup.py, как это меняется:

CUSTOM_COMMANDS = [
    ['apt-get', 'update'],
    ['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
    ['pip', 'install', 'Shapely'],
    ['echo', 'Custom command worked!']
]

В результате пакет не будет установлен, потому что я получаю сообщение об ошибке с самого начала:

NameError: global name 'Point' is not defined

файл setup.py:

from __future__ import absolute_import
from __future__ import print_function
import subprocess
from distutils.command.build import build as _build
import setuptools


class build(_build):  # pylint: disable=invalid-name
  sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [
    ['apt-get', 'update'],
    ['apt-get', '--assume-yes', 'install', 'libgeos-dev'],
    ['pip', 'install', 'Shapely']]


class CustomCommands(setuptools.Command):  
  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print('Running command: %s' % command_list)
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print('Command output: %s' % stdout_data)
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)

REQUIRED_PACKAGES = ['Shapely']

setuptools.setup(
    name='dataflow',
    version='0.0.1',
    description='Dataflow set workflow package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        'build': build,
        'CustomCommands': CustomCommands,
        }
    )

Варианты трубопровода:

 pipeline_options = PipelineOptions()
    pipeline_options.view_as(StandardOptions).streaming = True
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(SetupOptions).setup_file = 'C:\<...>\setup.py'

    with beam.Pipeline(options=pipeline_options) as p:

Вызов:

python -m dataflow --project XXX --temp_location gs://YYY --runner DataflowRunner --region europe-west1 --setup_file C:\<...>\setup.py

Начало журнала: (до потока данных ждать данных)

INFO:root:Defaulting to the temp_location as staging_location: gs://iotbucketdetector/test/prueba
C:\Users\<...>~1\Desktop\PROYEC~2\env\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py:816: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will
 not be supported
  transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pipeline.pb
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', 'setup.py', 'sdist', '--dist-dir', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs']
running sdist
running egg_info
writing requirements to dataflow.egg-info\requires.txt
writing dataflow.egg-info\PKG-INFO
writing top-level names to dataflow.egg-info\top_level.txt
writing dependency_links to dataflow.egg-info\dependency_links.txt
reading manifest file 'dataflow.egg-info\SOURCES.txt'
writing manifest file 'dataflow.egg-info\SOURCES.txt'
warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md

running check
warning: check: missing required meta-data: url

warning: check: missing meta-data: either (author and author_email) or (maintainer and maintainer_email) must be supplied

creating dataflow-0.0.1
creating dataflow-0.0.1\dataflow.egg-info
copying files to dataflow-0.0.1...
copying setup.py -> dataflow-0.0.1
copying dataflow.egg-info\PKG-INFO -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\SOURCES.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\dependency_links.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\requires.txt -> dataflow-0.0.1\dataflow.egg-info
copying dataflow.egg-info\top_level.txt -> dataflow-0.0.1\dataflow.egg-info
Writing dataflow-0.0.1\setup.cfg
Creating tar archive
removing 'dataflow-0.0.1' (and everything under it)
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/workflow.tar.gz
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/pickled_main_session
INFO:root:Downloading source distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', '-m', 'pip', 'download', '--dest', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--no-binary', ':all:']
Collecting apache-beam==2.5.0
  Using cached https://files.pythonhosted.org/packages/c6/96/56469c57cb043f36bfdd3786c463fbaeade1e8fcf0593ec7bc7f99e56d38/apache-beam-2.5.0.zip
  Saved c:\users\<...>~1\appdata\local\temp\tmpakq8bs\apache-beam-2.5.0.zip
Successfully downloaded apache-beam
INFO:root:Staging SDK sources from PyPI to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/dataflow_python_sdk.tar
INFO:root:Downloading binary distribtution of the SDK from PyPi
INFO:root:Executing command: ['C:\\Users\\<...>~1\\Desktop\\PROYEC~2\\env\\Scripts\\python.exe', '-m', 'pip', 'download', '--dest', 'c:\\users\\<...>~1\\appdata\\local\\temp\\tmpakq8bs', 'apache-beam==2.5.0', '--no-d
eps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
Collecting apache-beam==2.5.0
  Using cached https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
  Saved c:\users\<...>~1\appdata\local\temp\tmpakq8bs\apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
Successfully downloaded apache-beam
INFO:root:Staging binary distribution of the SDK from PyPI to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Starting GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://<...>-1120074505-586000.1542699905.588000/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl
INFO:root:Create job: <Job
 createTime: u'2018-11-20T07:45:28.050865Z'
 currentStateTime: u'1970-01-01T00:00:00Z'
 id: u'2018-11-19_23_45_27-14221834310382472741'
 location: u'europe-west1'
 name: u'beamapp-<...>-1120074505-586000'
 projectId: u'poc-cloud-209212'
 stageStates: []
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>

2 ответа

Решение

Это потому, что вам нужно указать поток данных для установки желаемого пакета.

Кратко документация здесь.

Проще говоря, для пакета PyPi вроде shapely вы можете сделать следующее, чтобы убедиться, что все зависимости установлены.

  1. pip freeze > requirements.txt
  2. Удалить все несвязанные пакеты в requirements.txt
  3. Запустите свой трубопровод с --requirements_file requirements.txt

Или даже больше, если вы хотите сделать что-то вроде установки пакета Linux apt-get или используя свой собственный модуль Python. Посмотрите на этот официальный пример. Вам необходимо настроить setup.py для этого и измените команду конвейера с--setup_file setup.py,

Для модуля PyPi используйте REQUIRED_PACKAGES в примере.

REQUIRED_PACKAGES = [
   'numpy','shapely'
]

Если вы используете параметры конвейера, то добавьте setup.py как

pipeline_options = {
        'project': PROJECT,
        'staging_location': 'gs://' + BUCKET + '/staging',
        'runner': 'DataflowRunner',
        'job_name': 'test',
        'temp_location': 'gs://' + BUCKET + '/temp',
        'save_main_session': True,
        'setup_file':'.\setup.py'
    }
options = PipelineOptions.from_dictionary(pipeline_options)
p = beam.Pipeline(options=options)

Импорт внутри функции + setup.py:

class GeoDataIngestion:
    def parse_method(self, string_input):
        from shapely.geometry import Point
        place = Point(float(values[2]), float(values[3]))

setup.py с:

REQUIRED_PACKAGES = ['shapely']
Другие вопросы по тегам