SnappyData - Ошибка при создании таблицы потоков Kafka
Я вижу проблему при создании таблицы потоковой передачи с использованием kafka из оболочки snappy.
"Исключение" Неверный ввод "C", ожидаемая операция dmlOperation, insert, withIdentifier, select или put (строка 1, столбец 1):'
Ссылка: http://snappydatainc.github.io/snappydata/streamingWithSQL/
Вот мой sql:
CREATE STREAM TABLE if not exists sensor_data_stream
(sensor_id string, metric string)
using kafka_stream
options (
storagelevel 'MEMORY_AND_DISK_SER_2',
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
zkQuorum 'localhost:2181',
groupId 'streamConsumer',
topics 'test:01');
Похоже, оболочке не нравится сценарий с первым символом "C". Я пытаюсь выполнить скрипт, используя следующую команду:
snappy> run '/scripts/my_test_sensor_script.sql';
любая помощь приветствуется!
2 ответа
Существует некоторое несоответствие в документации и фактическом синтаксисе. Правильный синтаксис:
CREATE STREAM TABLE sensor_data_stream if not exists (sensor_id string,
metric string) using kafka_stream
options (storagelevel 'MEMORY_AND_DISK_SER_2',
rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter',
zkQuorum 'localhost:2181',
groupId 'streamConsumer', topics 'test:01');
Еще одна вещь, которую вам нужно сделать, это написать конвертер строк для ваших данных
Майк, тебе нужно создать свой собственный класс rowConverter, реализовав следующую черту:
trait StreamToRowsConverter extends Serializable {
def toRows(message: Any): Seq[Row]
}
а затем укажите полное имя класса rowConverter в DDL. RowConverter является специфическим для схемы. "io.snappydata.app.streaming.KafkaStreamToRowsConverter" - это просто имя класса-заполнителя, которое должно быть заменено вашим собственным классом rowConverter.