Невозможно перехватить исключения при загрузке BigQuery из данных JSON

У меня есть файл с подключенными данными JSON

{
    "magic": "atMSG",
    "type": "DT",
    "headers": null,
    "messageschemaid": null,
    "messageschema": null,
    "message": {
        "data": {
            "ID": "123456",
            "NAME": "ABCXYZ",
            "SALARY": "10"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "20200822230048000000000017887787417",
            "timestamp": "2020-08-22T23:00:48.000",
            "transactionId": "some_id"
        }
    }
}

{
    "magic": "atMSG",
    "type": "DT",
    "headers": null,
    "messageschemaid": null,
    "messageschema": null,
    "message": {
        "data": {
            "ID": "QWERTYT",
            "NAME": "ABCXYZ",
            "SALARY": "10"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "20200822230048000000000017887787417",
            "timestamp": "2020-08-22T23:00:48.000",
            "transactionId": "some_id"
        }
    }
}

У меня есть приведенный ниже код, который я использую для получения данных, проанализированных из JSON, в тип словаря, который можно загрузить в BigQuery. Но если у моих данных JSON есть какие-либо проблемы, я хочу их поймать и загрузить в другой BQ_Table/File.

class custom_json_parser(beam.DoFn):
    def process(self, element):
        norm = json_normalize(element, max_level=1)
        l = norm["message.data"]
        return l

class Audit(beam.DoFn):
    
    def process(self, element):
        yield element
table_schema = 'ID:INTEGER, NAME:STRING, SALARY:FLOAT'
audit_table_schema = 'ID:INTEGER, NAME:STRING, SALARY:FLOAT'

p = beam.Pipeline(options = pipeline_options)

data_from_source = (p
                    | "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/file_with_json_data")
                    | "PARSE JSON" >> beam.Map(json.loads)
                    | "CUSTOM JOSN PARSE" >> beam.ParDo(custom_json_parser())
                   )

Load_col = data_from_source
Audit_col = (data_from_source 
             | beam.Map(lambda x: (x['ID'], x['NAME'], datetime.now())) 
            | beam.Map(lambda x:{'ID':x[0],'NAME':x[1], 'LAST_UPDT':x[2]})
           )
try:
    #Load_col |"WriteToBigQuery" >> beam.ParDo(Printer())
    load_data = (Load_col 
                 |"WriteDataToBigQuery" >> beam.io.WriteToBigQuery(
                     "{0}:{1}.trial_data".format(PROJECT_ID, datasetId_Staging),
                     schema=table_schema,
                     #write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
                )
    
    audit_data = (Audit_col 
                  |"WriteAuditDataToBigQuery" >> beam.io.WriteToBigQuery(
                      "{0}:{1}.audit_data".format(PROJECT_ID, datasetId_Staging),
                      schema=audit_table_schema,
                      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                      #write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
                 )
except:
    (load_data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
     | "Bad lines" >> beam.io.textio.WriteToText("gs://ti-project-1/error-files/data_error_log.txt")
    )

results = p.run()

В блоке Except я в основном хочу записать данные с ошибками (ID": "QWERTYT","NAME": "ABCXYZ","SALARY": "10") в файл, также я хочу записать их в BigQuery.

Я также хочу записать ошибочные данные в таблицу аудита с именем файла. Но это другой вопрос.

0 ответов

Другие вопросы по тегам