Как выполнить сложную обработку событий, используя Esper с потоками, поступающими из данных CSV?
Я непрерывно отправляю строки CSV в темы Kafka, используя производителя Kafka, и генерирую поток строк CSV следующим образом
температура =23,3, давление =1015, влажность =63,4, скорость ветра =7,4.....
Теперь я хочу выполнить сложную обработку событий в этом потоке, используя ESPER, но esper использует предопределенный класс POJO и использует оператор EPL для фильтрации событий.
Как мне преобразовать потоки CSV в потоки событий во время выполнения так, чтобы Esper мог извлекать или фильтровать такие параметры, как температура или давление, основываясь на некоторой логике (например, такие как параметры извлечения, которые изменяются в течение 5 минут)?
1 ответ
Пример кода приведен ниже. В этом примере предполагается, что в теме уже есть некоторые сообщения. Это не зацикливание и ожидание большего количества сообщений.
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
ConsumerRecords<String, String> rows = consumer.poll(1000);
Iterator<ConsumerRecord<String, String>> it = rows.iterator();
while (it.hasNext()) {
ConsumerRecord<String, String> row = it.next();
MyEvent event = new MyEvent(row.value()); // transform string to event
// process event
runtime.sendEvent(event);
}