Apache Beam - Python - Потоковая передача в BigQuery не записывает данные в таблицу

Apache Beam - Python - Потоковая передача в BigQuery не записывает данные в таблицу

Я спроектировал простой Apache Beam Pipeline с использованием Python SDK, хотя я знаю, что возможности потоковой передачи Python SDK все еще разрабатываются. Я наткнулся на контрольно-пропускной пункт, который, похоже, не могу обойти: все в Pipeline работает нормально до момента где я пытаюсь попасть в таблицу BigQuery. Я не получаю ошибок, исключений или предупреждений, данные просто не отображаются в BigQuery.

  • Я пробовал оба с 2.1.0 и 2.2.0, и это дает те же результаты.

  • Тема PubSub, из которой я получаю данные, просто содержит твиты типа "Я люблю Apache Beam #apachebeam #dataflow #beam #datascience".

Это трубопровод:

import datetime
import logging

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

# Configuration
# I set all the constant before creating the Pipeline Options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
options.view_as(StandardOptions).runner = 'DirectRunner'
# options.view_as(SetupOptions).save_main_session = True
# options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID


# DoFn
class ExtractHashtagsDoFn(beam.DoFn):
    def process(self, element):
        """
        Takes a tweet as input, search and return its content 
        and hashtag as a dictionary.
        """
        hastags = [w for w in element.split() if w.startswith('#')]
        result = {'tweet': element, 'hashtags': hastags}
        logging.info('{}'.format(result))
        yield result


class SentimentAnalysisDoFn(beam.DoFn):
    def process(self, element):
        from google.cloud import language
        from google.gax.errors import RetryError

        client = language.LanguageServiceClient()
        document = language.types.Document(
            content=element['tweet'].encode('utf-8'),
            type='PLAIN_TEXT')

        try:
            response = client.analyze_sentiment(
                document=document,
                encoding_type='UTF8')
            sentiment = response.document_sentiment
            element['sentiment_score'] = sentiment.score
            element['sentiment_magnitude'] = sentiment.magnitude
            element['analyzed'] = 1

        except RetryError:
            element['sentiment_score'] = None
            element['sentiment_magnitude'] = None
            element['analyzed'] = 0

#         logging.info('{}'.format(element))
        yield element


# Define Composite Transforms
class TextAnalysisTransform(beam.PTransform):
    def expand(self, pcoll):
        return(
            pcoll
            | 'Decode' >> beam.Map(lambda string: string.decode('utf8', 'ignore'))
            | 'ExtractHashtags' >> beam.ParDo(ExtractHashtagsDoFn())
            | 'SentimentAnalysis' >> beam.ParDo(SentimentAnalysisDoFn())
        )


class WindowingForOutputTransform(beam.PTransform):
    def expand(self, pcoll):
        import json
        return(
            pcoll
            | 'Pack' >> beam.Map(lambda x: (x, 1))
            | 'Windowing' >> beam.WindowInto(window.FixedWindows(5, 0))
            | 'GroupByKey' >> beam.GroupByKey()
            | 'Unpack' >> beam.Map(lambda x: x[0])
        )


class SaveToBigQueryTransform(beam.PTransform):
    def expand(self, pcoll):
        # Define the Schema.
        from apache_beam.io.gcp.internal.clients import bigquery

        table_schema = bigquery.TableSchema()

        # Fields that use standard types.
        alpha_schema = bigquery.TableFieldSchema()
        alpha_schema.name = 'tweet'
        alpha_schema.type = 'string'
        alpha_schema.mode = 'nullable'
        table_schema.fields.append(alpha_schema)

        beta_schema = bigquery.TableFieldSchema()
        beta_schema.name = 'sentiment_score'
        beta_schema.type = 'float'
        beta_schema.mode = 'nullable'
        table_schema.fields.append(beta_schema)

        gamma_schema = bigquery.TableFieldSchema()
        gamma_schema.name = 'sentiment_magnitude'
        gamma_schema.type = 'float'
        gamma_schema.mode = 'nullable'
        table_schema.fields.append(gamma_schema)

        delta_schema = bigquery.TableFieldSchema()
        delta_schema.name = 'analyzed'
        delta_schema.type = 'integer'
        delta_schema.mode = 'required'
        table_schema.fields.append(delta_schema)

        # A repeated field.
        omega_schema = bigquery.TableFieldSchema()
        omega_schema.name = 'hashtags'
        omega_schema.type = 'string'
        omega_schema.mode = 'repeated'
        table_schema.fields.append(omega_schema)

        # Saving the output to BigQuery.
        return (
            pcoll
            | 'PrepareForOutput' >> WindowingForOutputTransform()
            | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
                table='twitter_analysis',
                dataset=DATASET_ID,
                project=PROJECT_ID,
                schema=table_schema,  # Pass the defined table_schema
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


def run(argv=None):
    """
    Define and run the pipeline.
    """
    with beam.Pipeline(options=options) as p:
        plumbing = (
            p
            | 'LoadTestData' >> beam.io.ReadStringsFromPubSub(subscription=FULL_SUBSCRIPTION)
            | 'TextAnalysis' >> TextAnalysisTransform()
            | 'StreamToBigQuery' >> SaveToBigQueryTransform()
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Вывод: /home/ubik/Documents/github/twitter-streaming/beam/.venv/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:135: UserWarning: использование запасного кодера для typehint: Any. warnings.warn('Использование резервного кода для typehint: %r.' % typehint) INFO:root: Запуск конвейера с DirectRunner.

РЕДАКТИРОВАТЬ: я попытался запустить трубопровод 4 сентября, все работало нормально. Вероятно, полтергейст BigQuery прекратил преследовать меня.

0 ответов

Другие вопросы по тегам