confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161: ValueError
Я новичок в Python и пытаюсь использовать confluent_kafka для создания сообщений avro. Использование confluent_kafka.schema_registry.avro.AvroSerializer для того же (указано: https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py)
Он работает для простой схемы avro с вводом dict (json, преобразованный в dict), но для схемы ниже образца я получаю сообщение об ошибке:
Схема:
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Value",
"fields": [{
"name": "EventId",
"type": "long"
}, {
"name": "CameraId",
"type": ["null", "long"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Value"
}],
"default": null
}, {
"name": "after",
"type": ["null", "Value"],
"default": null
}, {
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.sqlserver",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}],
"connect.name": "io.debezium.connector.sqlserver.Source"
}
}, {
"name": "op",
"type": "string"
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
Входной Json:
{
"after": null,
"before": {
"CoreOLTPEvents.dbo.Event.Value" : {
"EventId": 1111111111,
"CameraId": 222222222
}
},
"source": {
"version": "InitialLoad",
"connector": "sqlserver"
},
"op": "C"
}
Ошибка :
ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111, 'CameraId': 222222222}} (type <class 'dict'>) do not match ['null', {'connect.name': 'CoreOLTPEvents.dbo.Event.Value', 'type': 'record', 'name': 'CoreOLTPEvents.dbo.Event.Value', 'fields': [{'name': 'EventId', 'type': 'long'}, {'default': None, 'name': 'CameraId', 'type': ['null', 'long']}]}] on field before
Тип поля «до» - union (['null',record]), если я изменю его только на запись (удалите объединение), тогда он будет работать нормально. Но мне нужно настроить ввод так, чтобы он работал для данной схемы.
(Примечание: я читаю ввод json, используя json.load (json_file), поэтому он дает вывод dict)
Любая помощь приветствуется.
Обновление: Фактическая большая схема:
{
"type": "record",
"name": "Envelope",
"namespace": "CoreOLTPEvents.dbo.Event",
"fields": [{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Value",
"fields": [{
"name": "EventId",
"type": "long"
}, {
"name": "CameraId",
"type": ["null", "long"],
"default": null
}, {
"name": "SiteId",
"type": ["null", "long"],
"default": null
}, {
"name": "VehicleId",
"type": ["null", "long"],
"default": null
}, {
"name": "EventReviewStatusID",
"type": "int"
}, {
"name": "EventTypeId",
"type": ["null", "int"],
"default": null
}, {
"name": "EventDateTime",
"type": ["null", {
"type": "string",
"connect.name": "net.smartdrive.converters.SmartdriveEventDateFieldConverter"
}],
"default": null
}, {
"name": "FTPUploadDateTime",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "CAMFileName",
"type": "string"
}, {
"name": "KeypadEntryCode",
"type": ["null", "string"],
"default": null
}, {
"name": "IsActive",
"type": {
"type": "boolean",
"connect.default": true
},
"default": true
}, {
"name": "Flagged",
"type": "boolean"
}, {
"name": "EventTitle",
"type": ["null", "string"],
"default": null
}, {
"name": "CreatedBy",
"type": "long"
}, {
"name": "CreatedDate",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "ModifiedBy",
"type": "long"
}, {
"name": "ModifiedDate",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
}, {
"name": "ReReviewAnalysis",
"type": ["null", "string"],
"default": null
}, {
"name": "LegacyEventId",
"type": ["null", "long"],
"default": null
}, {
"name": "TripId",
"type": ["null", "long"],
"default": null
}, {
"name": "FileVersion",
"type": ["null", "string"],
"default": null
}, {
"name": "EventNumber",
"type": ["null", "string"],
"default": null
}, {
"name": "Latitude",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 13,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "13"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "Longitude",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 13,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "13"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "GeoAddressId",
"type": ["null", "long"],
"default": null
}, {
"name": "ReviewedEventId",
"type": ["null", "long"],
"default": null
}, {
"name": "VideoStatus",
"type": {
"type": "int",
"connect.default": 0
},
"default": 0
}, {
"name": "PredictionImportance",
"type": ["null", {
"type": "bytes",
"scale": 10,
"precision": 15,
"connect.version": 1,
"connect.parameters": {
"scale": "10",
"connect.decimal.precision": "15"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, {
"name": "FlaggedBy",
"type": ["null", "long"],
"default": null
}, {
"name": "FlaggedDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "TriggerTypeId",
"type": ["null", "int"],
"default": null
}, {
"name": "VideoDeleteDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "MetadataDeleteDate",
"type": ["null", {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}],
"default": null
}, {
"name": "RetentionStatus",
"type": {
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"default": 0
}, {
"name": "PartnerTriggerId",
"type": ["null", "int"],
"default": null
}, {
"name": "CoachingStateId",
"type": {
"type": "int",
"connect.default": 0,
"connect.type": "int16"
},
"default": 0
}, {
"name": "EventKudoHistoryId",
"type": ["null", "int"],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Value"
}],
"default": null
}, {
"name": "after",
"type": ["null", "Value"],
"default": null
}, {
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.sqlserver",
"fields": [{
"name": "version",
"type": "string"
}, {
"name": "connector",
"type": "string"
}, {
"name": "name",
"type": "string"
}, {
"name": "ts_ms",
"type": "long"
}, {
"name": "snapshot",
"type": [{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false"
},
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
}, "null"],
"default": "false"
}, {
"name": "db",
"type": "string"
}, {
"name": "schema",
"type": "string"
}, {
"name": "table",
"type": "string"
}, {
"name": "change_lsn",
"type": ["null", "string"],
"default": null
}, {
"name": "commit_lsn",
"type": ["null", "string"],
"default": null
}, {
"name": "event_serial_no",
"type": ["null", "long"],
"default": null
}],
"connect.name": "io.debezium.connector.sqlserver.Source"
}
}, {
"name": "op",
"type": "string"
}, {
"name": "ts_ms",
"type": ["null", "long"],
"default": null
}, {
"name": "transaction",
"type": ["null", {
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [{
"name": "id",
"type": "string"
}, {
"name": "total_order",
"type": "long"
}, {
"name": "data_collection_order",
"type": "long"
}]
}],
"default": null
}],
"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}
Вход для большой схемы:
{
"before": null,
"after": {
"EventId": 1234566,
"CameraId": 2233,
"SiteId": 111,
"VehicleId": 45587,
"EventReviewStatusID": 10,
"EventTypeId": 123,
"EventDateTime": "2015-01-02T01:30:29Z",
"FTPUploadDateTime": 1420193330590,
"CAMFileName": "XYZ",
"KeypadEntryCode": "0",
"IsActive": false,
"Flagged": false,
"EventTitle": null,
"CreatedBy": 1,
"CreatedDate": 1420191120730,
"ModifiedBy": 1,
"ModifiedDate": 1577871185680,
"ReReviewAnalysis": null,
"LegacyEventId": null,
"TripId": 3382,
"FileVersion": "2.2",
"EventNumber": "AAAA-BBBB",
"Latitude": "UU9elrA=",
"Longitude": "/ueZUeFw",
"GeoAddressId": null,
"ReviewedEventId": 129411077,
"VideoStatus": 4,
"PredictionImportance": 0.1402457539,
"FlaggedBy": null,
"FlaggedDate": null,
"TriggerTypeId": 322,
"VideoDeleteDate": 1422783120000,
"MetadataDeleteDate": 1577871120000,
"RetentionStatus": 15,
"PartnerTriggerId": null,
"CoachingStateId": 0,
"EventKudoHistoryId": null
},
"source": {
"version": "Final",
"connector": "sqlserver",
"name": "CoreOLTP",
"ts_ms": 1615813992548,
"snapshot": "false",
"db": "CoreOLTP",
"schema": "dbo",
"table": "xyz",
"change_lsn": null,
"commit_lsn": null,
"event_serial_no": null
},
"op": "C",
"ts_ms": 1615813992548,
"transaction": null
}
Ошибка :
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="{'EventId': 129411077, 'CameraId': 46237, 'SiteId': 2148, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 247, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420191120590, 'CAMFileName': 'JD2BC02120150102013029ER.SDE', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'WSHX-8QQ2', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None} (type <class 'dict'>) do not match ['null', 'CoreOLTPEvents.dbo.Event.Value'] on field after"}
1 ответ
Вам просто нужно изменить ввод, чтобы у поля не было пространства имен. Итак, это должно выглядеть так:
{
"after": null,
"before": {
"EventId": 1111111111,
"CameraId": 222222222
},
"source": {
"version": "InitialLoad",
"connector": "sqlserver"
},
"op": "C"
}
Исходный ввод, который вы выглядели так, как будто он пытался быть в кодировке JSON avro, потому что поле
before
имел
CoreOLTPEvents.dbo.Event.Value
пространство имен. Однако я предполагаю, что это должно быть вручную, потому что
CameraId
должно было быть указано как
{"long": 222222222}
а не просто
222222222
.
Если у вас действительно есть кодированный в Avro JSON (из результата какого-то другого процесса или чего-то еще), вы можете использовать что-то вроде
fastavro.json_reader
для чтения в этом файле, и он создаст правильное представление памяти (которое не включает информацию о типе для полей объединения).
ОБНОВИТЬ:
Чтобы выяснить, в чем проблема с полной схемой и полными данными, я сначала загрузил два объекта, используя
json.load
а затем использовал
fastavro.validate(record, schema)
Результатом этого является трассировка стека, которая заканчивается следующим образом:
fastavro._validate_common.ValidationError: [
"CoreOLTPEvents.dbo.Event.Envelope.after is <{'EventId': 1234566, 'CameraId': 2233, 'SiteId': 111, 'VehicleId': 45587, 'EventReviewStatusID': 10, 'EventTypeId': 123, 'EventDateTime': '2015-01-02T01:30:29Z', 'FTPUploadDateTime': 1420193330590, 'CAMFileName': 'XYZ', 'KeypadEntryCode': '0', 'IsActive': False, 'Flagged': False, 'EventTitle': None, 'CreatedBy': 1, 'CreatedDate': 1420191120730, 'ModifiedBy': 1, 'ModifiedDate': 1577871185680, 'ReReviewAnalysis': None, 'LegacyEventId': None, 'TripId': 3382, 'FileVersion': '2.2', 'EventNumber': 'AAAA-BBBB', 'Latitude': 'UU9elrA=', 'Longitude': '/ueZUeFw', 'GeoAddressId': None, 'ReviewedEventId': 129411077, 'VideoStatus': 4, 'PredictionImportance': 0.1402457539, 'FlaggedBy': None, 'FlaggedDate': None, 'TriggerTypeId': 322, 'VideoDeleteDate': 1422783120000, 'MetadataDeleteDate': 1577871120000, 'RetentionStatus': 15, 'PartnerTriggerId': None, 'CoachingStateId': 0, 'EventKudoHistoryId': None}> of type <class 'dict'> expected null",
"CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected null",
"CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected {'scale': 10, 'precision': 13, 'connect.version': 1, 'connect.parameters': {'scale': '10', 'connect.decimal.precision': '13'}, 'connect.name': 'org.apache.kafka.connect.data.Decimal', 'logicalType': 'decimal', 'type': 'bytes'}"
]
Таким образом, мы пытаемся сказать нам, что есть 3 потенциальные проблемы. Во-первых, значение в не совпадает, но мы можем игнорировать это, потому что мы не хотим
after
чтобы соответствовать.
Две последние проблемы и есть настоящая проблема. Это говорит о том, что ценность
Latitude
это строка
UU9elrA=
, но это тоже не совпадает
null
или же
bytes
. Строка здесь выглядит в кодировке base64, поэтому, возможно, у вас есть код, который декодирует это в байты, и если да, то, возможно, настоящая проблема заключается в другом, но если да, то я думаю, вы сможете использовать
fastavro.validate
чтобы выяснить, в чем проблема.