Невозможно перехватить исключения при загрузке BigQuery из данных JSON
У меня есть файл с подключенными данными JSON
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageschemaid": null,
"messageschema": null,
"message": {
"data": {
"ID": "123456",
"NAME": "ABCXYZ",
"SALARY": "10"
},
"beforeData": null,
"headers": {
"operation": "INSERT",
"changeSequence": "20200822230048000000000017887787417",
"timestamp": "2020-08-22T23:00:48.000",
"transactionId": "some_id"
}
}
}
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageschemaid": null,
"messageschema": null,
"message": {
"data": {
"ID": "QWERTYT",
"NAME": "ABCXYZ",
"SALARY": "10"
},
"beforeData": null,
"headers": {
"operation": "INSERT",
"changeSequence": "20200822230048000000000017887787417",
"timestamp": "2020-08-22T23:00:48.000",
"transactionId": "some_id"
}
}
}
У меня есть приведенный ниже код, который я использую для получения данных, проанализированных из JSON, в тип словаря, который можно загрузить в BigQuery. Но если у моих данных JSON есть какие-либо проблемы, я хочу их поймать и загрузить в другой BQ_Table/File.
class custom_json_parser(beam.DoFn):
def process(self, element):
norm = json_normalize(element, max_level=1)
l = norm["message.data"]
return l
class Audit(beam.DoFn):
def process(self, element):
yield element
table_schema = 'ID:INTEGER, NAME:STRING, SALARY:FLOAT'
audit_table_schema = 'ID:INTEGER, NAME:STRING, SALARY:FLOAT'
p = beam.Pipeline(options = pipeline_options)
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/file_with_json_data")
| "PARSE JSON" >> beam.Map(json.loads)
| "CUSTOM JOSN PARSE" >> beam.ParDo(custom_json_parser())
)
Load_col = data_from_source
Audit_col = (data_from_source
| beam.Map(lambda x: (x['ID'], x['NAME'], datetime.now()))
| beam.Map(lambda x:{'ID':x[0],'NAME':x[1], 'LAST_UPDT':x[2]})
)
try:
#Load_col |"WriteToBigQuery" >> beam.ParDo(Printer())
load_data = (Load_col
|"WriteDataToBigQuery" >> beam.io.WriteToBigQuery(
"{0}:{1}.trial_data".format(PROJECT_ID, datasetId_Staging),
schema=table_schema,
#write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
audit_data = (Audit_col
|"WriteAuditDataToBigQuery" >> beam.io.WriteToBigQuery(
"{0}:{1}.audit_data".format(PROJECT_ID, datasetId_Staging),
schema=audit_table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
#write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
except:
(load_data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("gs://ti-project-1/error-files/data_error_log.txt")
)
results = p.run()
В блоке Except я в основном хочу записать данные с ошибками (ID": "QWERTYT","NAME": "ABCXYZ","SALARY": "10") в файл, также я хочу записать их в BigQuery.
Я также хочу записать ошибочные данные в таблицу аудита с именем файла. Но это другой вопрос.