Интеграция Кафки с Apache Calcite
Я пытаюсь интегрировать кальцит с Kafka, я ссылался на CsvStreamableTable.
Каждый ConsumerRecord конвертируется в Object[] с использованием кода fowlloing:
static class ArrayRowConverter extends RowConverter<Object[]> {
private List<Schema.Field> fields;
public ArrayRowConverter(List<Schema.Field> fields) {
this.fields = fields;
}
@Override
Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
Object[] objects = new Object[fields.size()+1];
int i = 0 ;
objects[i++] = consumerRecord.timestamp();
for(Schema.Field field : this.fields) {
Object obj = consumerRecord.value().get(field.name());
if( obj instanceof Utf8 ){
objects[i ++] = obj.toString();
}else {
objects[i ++] = obj;
}
}
return objects;
}
}
Перечислитель реализован следующим образом: один поток постоянно опрашивает записи из kafka и помещает их в очередь, опрос метода getRecord() из этой очереди:
public E current() {
return current;
}
public boolean moveNext() {
for(;;) {
if(cancelFlag.get()) {
return false;
}
ConsumerRecord<String, GenericRecord> record = getRecord();
if(record == null) {
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
current = rowConvert.convertRow(record);
return true;
}
}
Я проверял SELECT STREAM * FROM Kafka.clicks
работает нормально.
rowtime - это первый явно добавленный столбец, а значением является метка времени записи Kafka.
Но когда я попробовал
SELECT STREAM FLOOR(rowtime TO HOUR)
AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks GROUP BY FLOOR(rowtime TO HOUR), ip
Это бросило исключение
java.sql.SQLException: Error while executing SQL "SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line 1, column 119: Streaming aggregation requires at least one monotonic expression in GROUP BY clause
at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
1 ответ
Вы должны объявить, что столбец "ROWTIME" является монотонным. В MockCatalogReader
Обратите внимание, как "ROWTIME" объявляется монотонным в потоках "ORDERS" и "SHIPMENTS". Вот почему некоторые запросы в SqlValidatorTest.testStreamGroupBy()
действительны, а другие нет. Ключевой метод, проверенный валидатором: SqlValidatorTable.getMonotonicity(String columnName)
,