Интеграция Кафки с Apache Calcite

Я пытаюсь интегрировать кальцит с Kafka, я ссылался на CsvStreamableTable.

Каждый ConsumerRecord конвертируется в Object[] с использованием кода fowlloing:

static class ArrayRowConverter extends RowConverter<Object[]> {
    private List<Schema.Field> fields;

    public ArrayRowConverter(List<Schema.Field> fields) {
        this.fields = fields;
    }

    @Override
    Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
        Object[] objects = new Object[fields.size()+1];
        int i = 0 ;
        objects[i++] = consumerRecord.timestamp();
        for(Schema.Field field : this.fields) {
            Object obj = consumerRecord.value().get(field.name());
            if( obj instanceof Utf8 ){
                objects[i ++] = obj.toString();
            }else {
                objects[i ++] = obj;
            }
        }
        return objects;
    }
}

Перечислитель реализован следующим образом: один поток постоянно опрашивает записи из kafka и помещает их в очередь, опрос метода getRecord() из этой очереди:

public E current() {
    return current;
}

public boolean moveNext() {
for(;;) {
    if(cancelFlag.get()) {
        return false;
    }
    ConsumerRecord<String, GenericRecord> record = getRecord();
    if(record ==  null) {
        try {
            Thread.sleep(200L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        continue;
    }
    current = rowConvert.convertRow(record);
    return true;
    }
}

Я проверял SELECT STREAM * FROM Kafka.clicksработает нормально. rowtime - это первый явно добавленный столбец, а значением является метка времени записи Kafka.

Но когда я попробовал

SELECT STREAM FLOOR(rowtime TO HOUR) 
AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime TO HOUR), ip

Это бросило исключение

java.sql.SQLException: Error while executing SQL "SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line 1, column 119: Streaming aggregation requires at least one monotonic expression in GROUP BY clause
    at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
    at org.apache.calcite.avatica.Helper.createException(Helper.java:41)

1 ответ

Вы должны объявить, что столбец "ROWTIME" является монотонным. В MockCatalogReaderОбратите внимание, как "ROWTIME" объявляется монотонным в потоках "ORDERS" и "SHIPMENTS". Вот почему некоторые запросы в SqlValidatorTest.testStreamGroupBy() действительны, а другие нет. Ключевой метод, проверенный валидатором: SqlValidatorTable.getMonotonicity(String columnName),

Другие вопросы по тегам