Как обеспечить постоянную генерацию схемы Avro и избежать исключения "Слишком много объектов схемы создано для x"?
Я испытываю воспроизводимую ошибку при создании сообщений Avro с реактивной kafka и avro4s. Однажды identityMapCapacity
клиента (CachedSchemaRegistryClient
), сериализация завершается неудачно с
java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value
Это неожиданно, поскольку все сообщения должны иметь одну и ту же схему - они являются сериализацией одного и того же класса дел.
val avroProducerSettings: ProducerSettings[String, GenericRecord] =
ProducerSettings(system, Serdes.String().serializer(),
avroSerde.serializer())
.withBootstrapServers(settings.bootstrapServer)
val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
ProducerMessage.Result[String, GenericRecord, String],
NotUsed] = Producer.flow(avroProducerSettings)
val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] =
Source.queue(bufferSize, overflowStrategy)
.via(avroProdFlow)
.map(logResult)
.to(Sink.ignore)
.run()
...
queue.offer(msg)
Сериализатор представляет собой KafkaAvroSerializer
, созданный с new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)
Генерация GenericRecord
:
def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
recordFormat.to(a)
val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
val edgeAvro: GenericRecord = toAvro(edge)
val record = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
ProducerMessage.Message(record, edge.id)
}
Схема создается глубоко в коде (io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema
, вызванный io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl
), где я не имею на это никакого влияния, поэтому я не знаю, как исправить утечку. Похоже, два слитых проекта плохо работают вместе.
Проблемы, которые я нашел здесь, здесь и здесь, похоже, не касаются моего варианта использования.
Два обходных пути для меня в настоящее время:
- не использовать схему реестра - не долгосрочное решение, очевидно
- создать кастом
SchemaRegistryClient
не полагаясь на идентичность объекта - выполнимо, но хотел бы избежать создания большего количества проблем, которые я бы решил
Есть ли способ генерировать или кэшировать согласованную схему в зависимости от типа сообщения / записи и использовать ее с моими настройками?
1 ответ
редактировать 2017.11.20
Проблема в моем случае заключалась в том, что каждый экземпляр GenericRecord
несущий мое сообщение было сериализовано другим экземпляром RecordFormat
, содержащий другой экземпляр Schema
, Здесь неявное разрешение каждый раз генерирует новый экземпляр.
def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)
Решение было закрепить RecordFormat
экземпляр для val
и использовать его явно. Большое спасибо https://github.com/heliocentrist за разъяснения деталей.
оригинальный ответ:
Подождав некоторое время (также нет ответа на вопрос о github), мне пришлось реализовать свой собственный SchemaRegistryClient
, Более 90% копируется с оригинала CachedSchemaRegistryClient
, только что перевел в скалу. Использование скалы mutable.Map
исправлена утечка памяти. Я не проводил всесторонних тестов, поэтому пользуйтесь на свой страх и риск.
import java.util
import io.confluent.kafka.schemaregistry.client.rest.entities.{ Config, SchemaString }
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest
import io.confluent.kafka.schemaregistry.client.rest.{ RestService, entities }
import io.confluent.kafka.schemaregistry.client.{ SchemaMetadata, SchemaRegistryClient }
import org.apache.avro.Schema
import scala.collection.mutable
class CachingSchemaRegistryClient(val restService: RestService, val identityMapCapacity: Int)
extends SchemaRegistryClient {
val schemaCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
val idCache: mutable.Map[String, mutable.Map[Integer, Schema]] =
mutable.Map(null.asInstanceOf[String] -> mutable.Map())
val versionCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
def this(baseUrl: String, identityMapCapacity: Int) {
this(new RestService(baseUrl), identityMapCapacity)
}
def this(baseUrls: util.List[String], identityMapCapacity: Int) {
this(new RestService(baseUrls), identityMapCapacity)
}
def registerAndGetId(subject: String, schema: Schema): Int =
restService.registerSchema(schema.toString, subject)
def getSchemaByIdFromRegistry(id: Int): Schema = {
val restSchema: SchemaString = restService.getId(id)
(new Schema.Parser).parse(restSchema.getSchemaString)
}
def getVersionFromRegistry(subject: String, schema: Schema): Int = {
val response: entities.Schema = restService.lookUpSubjectVersion(schema.toString, subject)
response.getVersion.intValue
}
override def getVersion(subject: String, schema: Schema): Int = synchronized {
val schemaVersionMap: mutable.Map[Schema, Integer] =
versionCache.getOrElseUpdate(subject, mutable.Map())
val version: Integer = schemaVersionMap.getOrElse(
schema, {
if (schemaVersionMap.size >= identityMapCapacity) {
throw new IllegalStateException(s"Too many schema objects created for $subject!")
}
val version = new Integer(getVersionFromRegistry(subject, schema))
schemaVersionMap.put(schema, version)
version
}
)
version.intValue()
}
override def getAllSubjects: util.List[String] = restService.getAllSubjects()
override def getByID(id: Int): Schema = synchronized { getBySubjectAndID(null, id) }
override def getBySubjectAndID(subject: String, id: Int): Schema = synchronized {
val idSchemaMap: mutable.Map[Integer, Schema] = idCache.getOrElseUpdate(subject, mutable.Map())
idSchemaMap.getOrElseUpdate(id, getSchemaByIdFromRegistry(id))
}
override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = {
val response = restService.getVersion(subject, version)
val id = response.getId.intValue
val schema = response.getSchema
new SchemaMetadata(id, version, schema)
}
override def getLatestSchemaMetadata(subject: String): SchemaMetadata = synchronized {
val response = restService.getLatestVersion(subject)
val id = response.getId.intValue
val version = response.getVersion.intValue
val schema = response.getSchema
new SchemaMetadata(id, version, schema)
}
override def updateCompatibility(subject: String, compatibility: String): String = {
val response: ConfigUpdateRequest = restService.updateCompatibility(compatibility, subject)
response.getCompatibilityLevel
}
override def getCompatibility(subject: String): String = {
val response: Config = restService.getConfig(subject)
response.getCompatibilityLevel
}
override def testCompatibility(subject: String, schema: Schema): Boolean =
restService.testCompatibility(schema.toString(), subject, "latest")
override def register(subject: String, schema: Schema): Int = synchronized {
val schemaIdMap: mutable.Map[Schema, Integer] =
schemaCache.getOrElseUpdate(subject, mutable.Map())
val id = schemaIdMap.getOrElse(
schema, {
if (schemaIdMap.size >= identityMapCapacity)
throw new IllegalStateException(s"Too many schema objects created for $subject!")
val id: Integer = new Integer(registerAndGetId(subject, schema))
schemaIdMap.put(schema, id)
idCache(null).put(id, schema)
id
}
)
id.intValue()
}
}