Kafka Streams: PAPI и DSL KTable смешивать и сочетать, не разделяя
У меня есть смешанная и подходящая топология Scala, в которой основным рабочим является процессор PAPI, а другие части связаны через DSL.
EventsProcessor:
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)
Данные по темам (включая оригинал eventsTopic
) разделена через, давайте назовем это DoubleKey
это имеет два поля. Посетители отправляются на visitorsTopic
через раковину:
.addSink(VISITOR_SINK_NAME, visitorTopicName,
DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
В DSL я создаю KV KTable поверх этой темы:
val visitorTable = builder.table(
visitorTopicName,
Consumed.`with`(DoubleKey.getKafkaSerde(),
Visitor.getKafkaSerde()),
Materialized.as(visitorStoreName))
который я позже подключу к EventProcessor
:
topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
Все разделено (через DoubleKey). visitorSinkPartitioner
выполняет типичную операцию по модулю:
Math.abs(partitionKey.hashCode % numPartitions)
В обработчике событий PAPI EventsProcessor я запрашиваю эту таблицу, чтобы узнать, есть ли уже существующие посетители.
Однако в моих тестах (с использованием EmbeddedKafka, но это не должно иметь значения), если я запускаю их с одним разделом, все в порядке (EventsProcessor проверяет KTable на двух событиях на одном и том же DoubleKey
и на втором событии - с некоторой задержкой - он может видеть существующее Visitor
в магазине), но если я запускаю его с большим номером, EventProcessor никогда не увидит значение в магазине.
Однако, если я проверю магазин через API (итерации store.all()
), запись есть. Итак, я понимаю, что это должно идти в другой раздел.
Поскольку KTable должен работать с данными в своем разделе, и все отправляется в один и тот же раздел (с использованием явных разделителей, вызывающих один и тот же код), KTable должен получать эти данные в одном и том же разделе.
Верны ли мои предположения? Что может происходить?
KafkaStreams 1.0.0, Scala 2.12.4.
PS. Конечно, это будет работать, делая put
на PAPI, создавая магазин через PAPI вместо StreamsBuilder.table()
, поскольку это определенно будет использовать тот же раздел, где выполняется код, но об этом не может быть и речи.
1 ответ
Да, предположения были правильными.
На случай, если это кому-нибудь поможет:
У меня возникла проблема при передаче Partitioner в библиотеку Scala EmbeddedKafka. В одном из тестов это было сделано неправильно. Теперь, после очень полезной практики рефакторинга, я использовал этот метод во всех комплектах этой топологии.
def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) :
EmbeddedKafkaConfig = {
val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
classOf[DoubleKeyPartitioner].getCanonicalName)
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort,
customProducerProperties = producerProperties)
}