Запись в секционированную таблицу в bigquerry из задания Python Dataflow
Когда я пишу в секционированную таблицу в bigquerry из потока данных, я получаю следующую ошибку
Может ли кто-нибудь помочь мне в этом
"message": "Invalid table ID \"test$20181126\". Table IDs must be alphanumeric (plus underscores) and must be at most 1024 characters long. Also, Table decorators cannot be used."
это фрагмент кода Python, который я использую для написания
import apache_beam as beam
class bqwriter(beam.PTransform):
def __init__(self, table, schema):
super(BQWriter, self).__init__()
self.table = table
self.schema = schema
def expand(self, pcoll):
pcoll | beam.io.Write(beam.io.BigQuerySink(
self.table,
schema=self.schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
))
Я создаю табе, как показано ниже
a | 'BQWrite' >> BQWriter("test-123:test.test$20181126", table_schema)
2 ответа
У меня такая же проблема. Мое решение было:
Либо добавьте столбец даты к данным, а затем установите таблицу BQ для разделения на нее
Или установите разделение по умолчанию на _PARTITIONTIME в BQ.
Обе эти опции означают, что вы вставляете только в test-123:test.test
Что касается того, сможем ли мы сделать то, что вы пытались сделать, кажется, да. JIRA Beam утверждает, что они исправили это для Java, но я не смог найти статус для Python.
Лучший способ сделать это - передать функцию родному
beam.io.WriteToBigQuery
класс :
def table_fn(element):
current_date = date.fromtimestamp(element['timestamp']).strftime("%Y%m%d")
return f"{bq_output_table}${current_date}"
user_parent_user_watchever_pcol | "Write to BigQuery" >>
beam.io.Write(
beam.io.WriteToBigQuery(
table_fn,
schema=schemas.VIDEOCATALOG_SCHEMA,
method="STREAMING_INSERTS",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
)
)