Flink Kafka Consumer генерирует исключение нулевого указателя при использовании ключа DataStream
Я использую этот пример Flink CEP, где я разделяю данные, так как я создал одно приложение, которое отправляет приложение в Kafka, а другое приложение читает из Kafka... Я сгенерировал производителя для класса TemperatureWarning, т.е. в Kafka, я отправлял данные Связанный с TemperatureWarning Ниже приведен мой код, который потребляет данные от Кафки...
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers", "PUBLICDNS:9092");
properties.setProperty("zookeeper.connect", "PUBLICDNS:2181");
properties.setProperty("group.id", "test");
DataStream<TemperatureWarning> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureWarning>("MonitoringEvent", new MonitoringEventSchema(), properties));
Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
.next("second")
.within(Time.seconds(20));
PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
dstream.keyBy("rackID"),
alertPattern);
DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
(Map<String, TemperatureWarning> pattern, Collector<TemperatureAlert> out) -> {
TemperatureWarning first = pattern.get("first");
TemperatureWarning second = pattern.get("second");
if (first.getAverageTemperature() < second.getAverageTemperature()) {
out.collect(new TemperatureAlert(second.getRackID(),second.getAverageTemperature(),second.getTimeStamp()));
}
});
dstream.print();
alerts.print();
env.execute("Flink Kafka Consumer");
Но когда я выполняю это приложение, оно выдает следующее исключение:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:329)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:274)
at com.yash.consumer.KafkaFlinkConsumer.main(KafkaFlinkConsumer.java:49)
Следующее - мой класс TemperatureWarning:
public class TemperatureWarning {
private int rackID;
private double averageTemperature;
private long timeStamp;
public TemperatureWarning(int rackID, double averageTemperature,long timeStamp) {
this.rackID = rackID;
this.averageTemperature = averageTemperature;
this.timeStamp=timeStamp;
}
public TemperatureWarning() {
this(-1, -1,-1);
}
public int getRackID() {
return rackID;
}
public void setRackID(int rackID) {
this.rackID = rackID;
}
public double getAverageTemperature() {
return averageTemperature;
}
public void setAverageTemperature(double averageTemperature) {
this.averageTemperature = averageTemperature;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TemperatureWarning) {
TemperatureWarning other = (TemperatureWarning) obj;
return rackID == other.rackID && averageTemperature == other.averageTemperature;
} else {
return false;
}
}
@Override
public int hashCode() {
return 41 * rackID + Double.hashCode(averageTemperature);
}
@Override
public String toString() {
//return "TemperatureWarning(" + getRackID() + ", " + averageTemperature + ")";
return "TemperatureWarning(" + getRackID() +","+averageTemperature + ") "+ "," + getTimeStamp();
}
}
Ниже приводится мой класс MonitoringEventSchema:
public class MonitoringEventSchema implements DeserializationSchema<TemperatureWarning>,SerializationSchema<TemperatureWarning>
{
@Override
public TypeInformation<TemperatureWarning> getProducedType() {
// TODO Auto-generated method stub
return null;
}
@Override
public byte[] serialize(TemperatureWarning element) {
// TODO Auto-generated method stub
return element.toString().getBytes();
}
@Override
public TemperatureWarning deserialize(byte[] message) throws IOException {
// TODO Auto-generated method stub
if(message!=null)
{
String str=new String(message,"UTF-8");
String []val=str.split(",");
TemperatureWarning warning=new TemperatureWarning(Integer.parseInt(val[0]),Double.parseDouble(val[1]),Long.parseLong(val[2]));
return warning;
}
return null;
}
@Override
public boolean isEndOfStream(TemperatureWarning nextElement) {
// TODO Auto-generated method stub
return false;
}
}
Теперь, что требуется для выполнения операции keyBy, поскольку я упомянул ключ, необходимый для разделения потока? Что нужно сделать здесь, чтобы решить эту ошибку?
1 ответ
Решение
Проблема в этой функции:
@Override
public TypeInformation<TemperatureWarning> getProducedType() {
// TODO Auto-generated method stub
return null;
}
you cannot return null here.