Как программно обновить схему темы и совместимость в реестре схем Confluent
У меня есть схема, уже зарегистрированная в реестре схем, что я смог сделать, используя register()
нравится,
from schema_registry.client import SchemaRegistryClient, schema
subject_name = "new-schema"
schema_url = "https://{{ schemaRegistry }}:8081"
sr = SchemaRegistryClient(schema_url)
schema = schema.AvroSchema({
"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{"name": "fname", "type": "string"},
{"name": "favorite_number", "type": "int"}
]
})
my_schema = sr.register(subject_name, schema)
Теперь мне нужно обновить эту же тему новым полем, поэтому я получу новый идентификатор схемы и version = 2
.
updated_schema = schema.AvroSchema({
"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{"name": "fname", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_food", "type": "string"}
]
})
Я пробовал использовать sr.register(subject_name, updated_schema)
, это вызывает ошибку для того же объекта:
AttributeError: 'ClientError' object has no attribute '_get_object_id'
ClientError: Incompatible Avro schema
Да, эта функция предназначена для регистрации новой схемы, чтобы она не обновлялась. У меня не было никакой функции обновления, и я не знаю, как это сделать. Итак, как я могу обновить схему? Любая помощь будет оценена.
1 ответ
Реестр схем применяет определенные правила совместимости при регистрации новых схем в теме. Следовательно, вам необходимо убедиться, что режим совместимости субъекта соответствует эволюции схемы, которую вы ищете.
С помощью confluent-kafka-python
from confluent_kafka.schema_registry import SchemaRegistryClient
sr = SchemaRegistryClient("https://schema-registry-host:8081")
# Options are:
# - NONE, FULL, BACKWARD, FORWARD,
# - BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
sr.set_compatibility("yourSubjectName", "NONE")
С помощью python-schema-registry-client
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient("https://schema-registry-host:8081")
sr.update_compatibility(level="NONE", subject="yourSubjectName")
Полный список типов совместимости см. В документации Confluent.