Запись в секционированную таблицу в bigquerry из задания Python Dataflow

Когда я пишу в секционированную таблицу в bigquerry из потока данных, я получаю следующую ошибку

Может ли кто-нибудь помочь мне в этом

"message": "Invalid table ID \"test$20181126\". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used."

это фрагмент кода Python, который я использую для написания

import apache_beam as beam


class bqwriter(beam.PTransform):
    def __init__(self, table, schema):
        super(BQWriter, self).__init__()
        self.table = table
        self.schema = schema

    def expand(self, pcoll):
        pcoll | beam.io.Write(beam.io.BigQuerySink(
            self.table,
            schema=self.schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        ))

Я создаю табе, как показано ниже

a | 'BQWrite' >> BQWriter("test-123:test.test$20181126", table_schema)

2 ответа

У меня такая же проблема. Мое решение было:

  • Либо добавьте столбец даты к данным, а затем установите таблицу BQ для разделения на нее

  • Или установите разделение по умолчанию на _PARTITIONTIME в BQ.

Обе эти опции означают, что вы вставляете только в test-123:test.test

Что касается того, сможем ли мы сделать то, что вы пытались сделать, кажется, да. JIRA Beam утверждает, что они исправили это для Java, но я не смог найти статус для Python.

Лучший способ сделать это - передать функцию родному beam.io.WriteToBigQuery класс :

      def table_fn(element):
    current_date = date.fromtimestamp(element['timestamp']).strftime("%Y%m%d")
    return f"{bq_output_table}${current_date}"

user_parent_user_watchever_pcol | "Write to BigQuery" >> 
beam.io.Write(
    beam.io.WriteToBigQuery(
        table_fn,
        schema=schemas.VIDEOCATALOG_SCHEMA,
        method="STREAMING_INSERTS",
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
    )
)
Другие вопросы по тегам