SnappyData - Ошибка при создании таблицы потоков Kafka

Я вижу проблему при создании таблицы потоковой передачи с использованием kafka из оболочки snappy.

"Исключение" Неверный ввод "C", ожидаемая операция dmlOperation, insert, withIdentifier, select или put (строка 1, столбец 1):'

Ссылка: http://snappydatainc.github.io/snappydata/streamingWithSQL/

Вот мой sql:

CREATE STREAM TABLE if not exists sensor_data_stream 
(sensor_id string, metric string)
using kafka_stream 
options (
    storagelevel 'MEMORY_AND_DISK_SER_2',
    rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
    zkQuorum 'localhost:2181',
    groupId 'streamConsumer',
    topics 'test:01');

Похоже, оболочке не нравится сценарий с первым символом "C". Я пытаюсь выполнить скрипт, используя следующую команду:

snappy> run '/scripts/my_test_sensor_script.sql';

любая помощь приветствуется!

2 ответа

Существует некоторое несоответствие в документации и фактическом синтаксисе. Правильный синтаксис:

CREATE STREAM TABLE sensor_data_stream if not exists (sensor_id string, 
metric string) using kafka_stream 
options (storagelevel 'MEMORY_AND_DISK_SER_2', 
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter', 
zkQuorum 'localhost:2181',
 groupId 'streamConsumer',  topics 'test:01');

Еще одна вещь, которую вам нужно сделать, это написать конвертер строк для ваших данных

Майк, тебе нужно создать свой собственный класс rowConverter, реализовав следующую черту:

trait StreamToRowsConverter extends Serializable {
  def toRows(message: Any): Seq[Row]
}

а затем укажите полное имя класса rowConverter в DDL. RowConverter является специфическим для схемы. "io.snappydata.app.streaming.KafkaStreamToRowsConverter" - это просто имя класса-заполнителя, которое должно быть заменено вашим собственным классом rowConverter.

Другие вопросы по тегам