Кафка государственный магазин возвращает ноль при использовании Avro

У нас есть таблица kTable, в которой мы отображаем значения и материализуем их в KeyValueStore, используя следующий код:

    @Bean
    public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
                                          ValueMapper<CancelEvent, CancelEvent> mapper,
                                          SpecificAvroSerde<CancelEvent> serde) {

      return kStreamBuilder
            .table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
            .mapValues(mapper, Materialized.as("cancel-event-store"));

    }

И код картографа:

    @Override
    public CancelEvent apply(CancelEvent value) {

      if (value.getData().getCancelled()) {
        return null;
      }

      return value;

    }

Мы используем Avro для сериализации и десериализации. Когда мы пытаемся запросить state store с ключом всегда возвращается null,

Код для запроса государственного магазина:

    ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
            .store("cancel-event-store", QueryableStoreTypes.keyValueStore());

    CancelEvent event = store.get(queryEvent.getMeta().getId().toString());

event всегда null, Во время отладки я перебрал keyValuestoreи понял, что к ключу добавлено что-то (может быть магический байт!). Пожалуйста, обратитесь к изображению ниже

введите описание изображения здесь

Итак, вопрос в том, как сделать запрос с ключом?

Обновление 1

    private Map<String, Object> kStreamsConfigsProperties() {
    final Map<String, Object> config = new HashMap<>();

    config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());

    config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));

    config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
    config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());

    return config;
}

0 ответов

Другие вопросы по тегам