Kafka Streams: PAPI и DSL KTable смешивать и сочетать, не разделяя

У меня есть смешанная и подходящая топология Scala, в которой основным рабочим является процессор PAPI, а другие части связаны через DSL.

EventsProcessor: 
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)

Данные по темам (включая оригинал eventsTopic) разделена через, давайте назовем это DoubleKey это имеет два поля. Посетители отправляются на visitorsTopic через раковину:

.addSink(VISITOR_SINK_NAME, visitorTopicName,
    DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)

В DSL я создаю KV KTable поверх этой темы:

val visitorTable = builder.table(
  visitorTopicName,
  Consumed.`with`(DoubleKey.getKafkaSerde(),
  Visitor.getKafkaSerde()),
  Materialized.as(visitorStoreName))

который я позже подключу к EventProcessor:

topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)

Все разделено (через DoubleKey). visitorSinkPartitioner выполняет типичную операцию по модулю:

Math.abs(partitionKey.hashCode % numPartitions)

В обработчике событий PAPI EventsProcessor я запрашиваю эту таблицу, чтобы узнать, есть ли уже существующие посетители.

Однако в моих тестах (с использованием EmbeddedKafka, но это не должно иметь значения), если я запускаю их с одним разделом, все в порядке (EventsProcessor проверяет KTable на двух событиях на одном и том же DoubleKeyи на втором событии - с некоторой задержкой - он может видеть существующее Visitor в магазине), но если я запускаю его с большим номером, EventProcessor никогда не увидит значение в магазине.

Однако, если я проверю магазин через API (итерации store.all()), запись есть. Итак, я понимаю, что это должно идти в другой раздел.

Поскольку KTable должен работать с данными в своем разделе, и все отправляется в один и тот же раздел (с использованием явных разделителей, вызывающих один и тот же код), KTable должен получать эти данные в одном и том же разделе.

Верны ли мои предположения? Что может происходить?

KafkaStreams 1.0.0, Scala 2.12.4.

PS. Конечно, это будет работать, делая putна PAPI, создавая магазин через PAPI вместо StreamsBuilder.table(), поскольку это определенно будет использовать тот же раздел, где выполняется код, но об этом не может быть и речи.

1 ответ

Решение

Да, предположения были правильными.

На случай, если это кому-нибудь поможет:

У меня возникла проблема при передаче Partitioner в библиотеку Scala EmbeddedKafka. В одном из тестов это было сделано неправильно. Теперь, после очень полезной практики рефакторинга, я использовал этот метод во всех комплектах этой топологии.

def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) : 
    EmbeddedKafkaConfig = {
    val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
        classOf[DoubleKeyPartitioner].getCanonicalName)
    EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
        customProducerProperties = producerProperties)
}
Другие вопросы по тегам