Не могу подключиться к внешней теме в KSql

Я очень новичок в Confluent KSql, но не новичок в Kafka. У меня есть существующие темы, которые существуют в Kafka как сериализированные данные Avro. У меня есть Confluent схема-реестр и работает и настроить KSql, чтобы указать на реестр.

Когда я пытаюсь создать таблицу на основе одной из моих тем, KSql жалуется, что не может найти поток. Когда я пытаюсь создать поток в KSql, который просто передает мою тему в KSql, кажется, нет никакого способа указать на мою сериализованную тему Avro, на которую есть ссылка в реестре.

Кто-нибудь знает, как атаковать эти две проблемы? Является ли способ, которым я хочу использовать KSql, не подходит для того, что он может сделать?

ОБНОВИТЬ

Вот еще несколько деталей

ksql> show topics;

 Kafka Topic                                                                                 | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA                              | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_CPATRACKINGCALLBACK                                             | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK                                                 | true       | 10         | 3                  | 0         | 0

KSql config

#bootstrap.servers=localhost:9092
bootstrap.servers=host1:9092,host2:9092,host3:9092,host4:9092,host5:9092

#listeners=http://localhost:8088
listeners=http://localhost:59093

ksql.server.ui.enabled=true

ksql.schema.registry.url=http://host1:59092

Конфигурация реестра

# The host name advertised in ZooKeeper. Make sure to set this if running Schema Registry with multiple nodes.
host.name: x.x.x.x
listeners=http://0.0.0.0:59092

# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster
# (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
#kafkastore.connection.url=localhost:2181

# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via
# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the master schema registry instance and for storing the data for
# registered schemas.
# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down
# all instances, switching to the new configuration, and then starting the schema registry
# instances again.)
kafkastore.bootstrap.servers=PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092,PLAINTEXT://host4:9092,PLAINTEXT://host5:9092

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

Попытка решить проблему, объявив внешнюю тему

ksql> register  topic xxx with (value_format='avro', kafka_topic='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');
You need to provide avro schema file path for topics in avro format.

2 ответа

REGISTER TOPIC устарел синтаксис. Вы должны использовать CREATE STREAM (или же CREATE TABLE, в зависимости от ваших требований к доступу к данным).

Таким образом, ваше заявление будет выглядеть примерно так:

CREATE STREAM MY_STREAM_1 \
  WITH (VALUE_FORMAT='AVRO', \
  KAFKA_TOPIC='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');

Обратите внимание, что я использовал \ разбить строки на удобочитаемость; тебе не нужно этого делать.

Я решил проблему, возникшую после изменения информации, которую я использовал в теме Кафки, в отличие от использования всего содержимого темы. В теме содержатся данные в кодировке Avro (хорошо), созданные с помощью ReflectionData, KSql имеет проблемы с нестандартными элементами в потоке, но обрабатывает элементы ReflectionData, пока существует соответствующий тип данных KSql. Я решил эту проблему, создав новый поток в KSql, в котором были выбраны только те элементы, которые мне нужны, которые также совместимы с KSql. Когда это было сделано, я мог обработать то, что мне нужно, из большего потока.

Комментарий Я вижу в KSql недостаток в том, что вам нужно создать новую актуальную тему (и) в Kafka обрабатывать данные. Я думаю, что лучшим решением будет рассматривать промежуточный поток как View в реальный поток. Я понимаю, что для промежуточных тем нужно хранить накопления и обработанные элементы, прежде чем разрешить их как таблицу KTable.

Другие вопросы по тегам