Как выполнить сложную обработку событий, используя Esper с потоками, поступающими из данных CSV?

Я непрерывно отправляю строки CSV в темы Kafka, используя производителя Kafka, и генерирую поток строк CSV следующим образом

температура =23,3, давление =1015, влажность =63,4, скорость ветра =7,4.....

Теперь я хочу выполнить сложную обработку событий в этом потоке, используя ESPER, но esper использует предопределенный класс POJO и использует оператор EPL для фильтрации событий.

Как мне преобразовать потоки CSV в потоки событий во время выполнения так, чтобы Esper мог извлекать или фильтровать такие параметры, как температура или давление, основываясь на некоторой логике (например, такие как параметры извлечения, которые изменяются в течение 5 минут)?

1 ответ

Пример кода приведен ниже. В этом примере предполагается, что в теме уже есть некоторые сообщения. Это не зацикливание и ожидание большего количества сообщений.

    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
    KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
    ConsumerRecords<String, String> rows = consumer.poll(1000);
    Iterator<ConsumerRecord<String, String>> it = rows.iterator();
    while (it.hasNext()) {
        ConsumerRecord<String, String> row = it.next();
        MyEvent event = new MyEvent(row.value()); // transform string to event

        // process event
        runtime.sendEvent(event);
    }
Другие вопросы по тегам