Как программно обновить схему темы и совместимость в реестре схем Confluent

У меня есть схема, уже зарегистрированная в реестре схем, что я смог сделать, используя register() нравится,

from schema_registry.client import SchemaRegistryClient, schema

subject_name = "new-schema"
schema_url = "https://{{ schemaRegistry }}:8081" 
sr = SchemaRegistryClient(schema_url)

schema = schema.AvroSchema({
    "namespace": "example.avro",
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "fname", "type": "string"},
        {"name": "favorite_number",  "type": "int"}
    ]
})

my_schema = sr.register(subject_name, schema)

Теперь мне нужно обновить эту же тему новым полем, поэтому я получу новый идентификатор схемы и version = 2.

updated_schema = schema.AvroSchema({
    "namespace": "example.avro",
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "fname", "type": "string"},
        {"name": "favorite_number",  "type": "int"},
        {"name": "favorite_food",  "type": "string"}
    ]
})

Я пробовал использовать sr.register(subject_name, updated_schema), это вызывает ошибку для того же объекта:

AttributeError: 'ClientError' object has no attribute '_get_object_id'
ClientError: Incompatible Avro schema

Да, эта функция предназначена для регистрации новой схемы, чтобы она не обновлялась. У меня не было никакой функции обновления, и я не знаю, как это сделать. Итак, как я могу обновить схему? Любая помощь будет оценена.

1 ответ

Решение

Реестр схем применяет определенные правила совместимости при регистрации новых схем в теме. Следовательно, вам необходимо убедиться, что режим совместимости субъекта соответствует эволюции схемы, которую вы ищете.


С помощью confluent-kafka-python

from confluent_kafka.schema_registry import SchemaRegistryClient


sr = SchemaRegistryClient("https://schema-registry-host:8081")

# Options are: 
#   - NONE, FULL, BACKWARD, FORWARD, 
#   - BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
sr.set_compatibility("yourSubjectName", "NONE")

С помощью python-schema-registry-client

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient("https://schema-registry-host:8081")
sr.update_compatibility(level="NONE", subject="yourSubjectName")

Полный список типов совместимости см. В документации Confluent.