Реестр Spring Embedded Kafka + Mock Schema: Государственный магазин ChangeLog Schema не зарегистрирован

Я создаю интеграционный тест для нашей системы kafka с помощью Spring Embedded Kafka Broker с MockSchemaRegistryClient. Я создаю тест для одной из наших топологий Stream, созданный с использованием API Streams (KStreamBuilder). Эта конкретная топология имеет KStream (stream1), подающий в KTable (таблица1).

Я сталкиваюсь с ошибкой при подаче ввода в stream1, происходящий из KTableProcessor таблицы1:

Исключение в потоке "ипотека-кафка-потребители-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: исключение, обнаруженное в процессе. taskId=0_0, процессор =KSTREAM-SOURCE-0000000001, тема =streaming.mortgage.application_party, раздел =0, смещение = 0 в org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202) в org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342) в org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:4.) apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334) в org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624) в org.apache.kaf streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) в org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) Причина: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора 6 Причина: java.io. IOException: Невозможно получить схему из реестра схем! на io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106) в io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149) в io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121) при io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92) при io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54) в com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35) в org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151) в org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135) в org.apache.kafka.streams.kstream.tern. ls.KTableSource$KTableSourceProcessor.process(KTableSource.java:62) в org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45) в org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) в org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131) в org.apache.kafka.streams.processor.inforteI (ProcessorContextImpl.java:82) в org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) в org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188) в org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342) в org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks (AssignedT4ks) в org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334) в org.apache.kafka.streams.processor.internals.StreamThre ad.processAndPunctuate (StreamThread.java:624) в org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) в org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) в org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:457)

KTableProcessor пытается десериализовать запись из хранилища состояний RocksDB, однако схема не существует в реестре фиктивной схемы. Тема, схема которой запрашивается: appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog

Как указывает исключение, схема не была зарегистрирована. Однако раздел appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key имеет зарегистрированную схему (регистрируется, когда ключ записи сериализуется для запроса).

Поскольку это внутренняя тема, я не ожидаю, что мне придется регистрировать эту схему самостоятельно, однако у меня не получается из-за отсутствия схемы в реестре. Есть ли способ зарегистрировать схемы журнала изменений до приема данных? Есть ли способ отключить журнал изменений в хранилище состояний с помощью KStreamBuilder?

Заранее спасибо!

1 ответ

Решил проблему, которую я сейчас смущенно расскажу: при использовании KTable (через Streams API) со встроенным брокером kafka вы захотите настроить объект KafkaStreams с каталогом State Store, уникальным для каждого запуска встроенного брокера kafka (в моем случае каждый прогон теста).

Вы управляете каталогом State Store через StreamsConfig.STATE_DIR_CONFIG конфигурации. Я сделал его уникальным, добавив отметку времени в каталог хранилища состояний по умолчанию

properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kraken-streams/" + LocalDateTime.now().toString());

Проблема заключалась в том, что старое хранилище состояний существовало в одном и том же месте каждый раз при инициализации встроенного брокера kafka. Когда самая первая запись была введена в тему KTable, хранилище состояний смогло вернуть предыдущее значение. В результате была предпринята попытка десериализации записи хранилища состояний, которая еще не была сериализована (с точки зрения экземпляра реестра схемы). Схемы регистрируются только при сериализации, поэтому попытка десериализации не удалась из-за отсутствия зарегистрированной схемы.