Ошибка усечения данных в задании aws glue при передаче данных из S3 в Aurora

Я пытаюсь перенести свои данные из корзины S3 (address.csv) в AWS Aurora (MySQL) с помощью AWS Glue. Когда я использую следующий сценарий для передачи, в одном из столбцов с именем «po_box_number», который является varchar длиной 10, выдается сообщение об ошибке: «Произошла ошибка при вызове o195.pyWriteDynamicFrame. Усечение данных: данные слишком длинные для столбца« po_box_number » в строке 1 ". Когда я увеличил размер столбца для диагностических целей, я увидел, что данные хранятся в формате json. Предположим, что мне нужно значение "100", оно хранится как {"long": 100, "string": null}, аналогично, если я пытаюсь сохранить "E101", оно сохраняется как {"long": null, "строка": "E101"}

      import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "db1", table_name = "tb1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "tb1", transformation_ctx = "datasource0")   

#applymapping1 = Map.apply(frame = datasource0, f = AddProcessedTime)

applymapping1 = ApplyMapping.apply(frame = applymapping1, mappings = [("col6", "string", "po_box_number", "string")], transformation_ctx = "applymapping1")

#applymapping1 = ResolveChoice.apply(applymapping1, specs = [("po_box_number", "cast:string")])
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = applymapping1, database = "db1", table_name = "tb2", transformation_ctx = "datasink5")
job.commit()

1 ответ

Похоже, что в моем ведре S3 есть поврежденные данные, которые были ответственны за преобразование в json. Как только я его удалил, все прошло как положено