Apache Beam - Python - Потоковая передача в BigQuery не записывает данные в таблицу
Apache Beam - Python - Потоковая передача в BigQuery не записывает данные в таблицу
Я спроектировал простой Apache Beam Pipeline с использованием Python SDK, хотя я знаю, что возможности потоковой передачи Python SDK все еще разрабатываются. Я наткнулся на контрольно-пропускной пункт, который, похоже, не могу обойти: все в Pipeline работает нормально до момента где я пытаюсь попасть в таблицу BigQuery. Я не получаю ошибок, исключений или предупреждений, данные просто не отображаются в BigQuery.
Я пробовал оба с 2.1.0 и 2.2.0, и это дает те же результаты.
Тема PubSub, из которой я получаю данные, просто содержит твиты типа "Я люблю Apache Beam #apachebeam #dataflow #beam #datascience".
Это трубопровод:
import datetime
import logging
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
# Configuration
# I set all the constant before creating the Pipeline Options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
options.view_as(StandardOptions).runner = 'DirectRunner'
# options.view_as(SetupOptions).save_main_session = True
# options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
# DoFn
class ExtractHashtagsDoFn(beam.DoFn):
def process(self, element):
"""
Takes a tweet as input, search and return its content
and hashtag as a dictionary.
"""
hastags = [w for w in element.split() if w.startswith('#')]
result = {'tweet': element, 'hashtags': hastags}
logging.info('{}'.format(result))
yield result
class SentimentAnalysisDoFn(beam.DoFn):
def process(self, element):
from google.cloud import language
from google.gax.errors import RetryError
client = language.LanguageServiceClient()
document = language.types.Document(
content=element['tweet'].encode('utf-8'),
type='PLAIN_TEXT')
try:
response = client.analyze_sentiment(
document=document,
encoding_type='UTF8')
sentiment = response.document_sentiment
element['sentiment_score'] = sentiment.score
element['sentiment_magnitude'] = sentiment.magnitude
element['analyzed'] = 1
except RetryError:
element['sentiment_score'] = None
element['sentiment_magnitude'] = None
element['analyzed'] = 0
# logging.info('{}'.format(element))
yield element
# Define Composite Transforms
class TextAnalysisTransform(beam.PTransform):
def expand(self, pcoll):
return(
pcoll
| 'Decode' >> beam.Map(lambda string: string.decode('utf8', 'ignore'))
| 'ExtractHashtags' >> beam.ParDo(ExtractHashtagsDoFn())
| 'SentimentAnalysis' >> beam.ParDo(SentimentAnalysisDoFn())
)
class WindowingForOutputTransform(beam.PTransform):
def expand(self, pcoll):
import json
return(
pcoll
| 'Pack' >> beam.Map(lambda x: (x, 1))
| 'Windowing' >> beam.WindowInto(window.FixedWindows(5, 0))
| 'GroupByKey' >> beam.GroupByKey()
| 'Unpack' >> beam.Map(lambda x: x[0])
)
class SaveToBigQueryTransform(beam.PTransform):
def expand(self, pcoll):
# Define the Schema.
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
# Fields that use standard types.
alpha_schema = bigquery.TableFieldSchema()
alpha_schema.name = 'tweet'
alpha_schema.type = 'string'
alpha_schema.mode = 'nullable'
table_schema.fields.append(alpha_schema)
beta_schema = bigquery.TableFieldSchema()
beta_schema.name = 'sentiment_score'
beta_schema.type = 'float'
beta_schema.mode = 'nullable'
table_schema.fields.append(beta_schema)
gamma_schema = bigquery.TableFieldSchema()
gamma_schema.name = 'sentiment_magnitude'
gamma_schema.type = 'float'
gamma_schema.mode = 'nullable'
table_schema.fields.append(gamma_schema)
delta_schema = bigquery.TableFieldSchema()
delta_schema.name = 'analyzed'
delta_schema.type = 'integer'
delta_schema.mode = 'required'
table_schema.fields.append(delta_schema)
# A repeated field.
omega_schema = bigquery.TableFieldSchema()
omega_schema.name = 'hashtags'
omega_schema.type = 'string'
omega_schema.mode = 'repeated'
table_schema.fields.append(omega_schema)
# Saving the output to BigQuery.
return (
pcoll
| 'PrepareForOutput' >> WindowingForOutputTransform()
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='twitter_analysis',
dataset=DATASET_ID,
project=PROJECT_ID,
schema=table_schema, # Pass the defined table_schema
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
def run(argv=None):
"""
Define and run the pipeline.
"""
with beam.Pipeline(options=options) as p:
plumbing = (
p
| 'LoadTestData' >> beam.io.ReadStringsFromPubSub(subscription=FULL_SUBSCRIPTION)
| 'TextAnalysis' >> TextAnalysisTransform()
| 'StreamToBigQuery' >> SaveToBigQueryTransform()
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Вывод: /home/ubik/Documents/github/twitter-streaming/beam/.venv/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:135: UserWarning: использование запасного кодера для typehint: Any. warnings.warn('Использование резервного кода для typehint: %r.' % typehint) INFO:root: Запуск конвейера с DirectRunner.
РЕДАКТИРОВАТЬ: я попытался запустить трубопровод 4 сентября, все работало нормально. Вероятно, полтергейст BigQuery прекратил преследовать меня.