Получение NullPointerException при попытке создать RMQSink на Flink
Я пытаюсь заставить Sink for Flink отправлять сообщения из Flink в RabbitMQ. Когда я создаю Sink в main, все работает нормально - я получаю сообщение в специальной очереди на RabbitMQ. К сожалению, когда я пытаюсь создать Sink вне основного метода - он заканчивается NPEx (Flink показывает, что StreamExecutionEnvironment имеет значение null).
У работающего кода есть один недостаток - он может отправить сообщение только один раз при запуске.jar на Flink. Я хочу иметь функцию, которая позволит мне отправлять сообщение в RabbitMQ при вызове этой функции.
Вот код, который работает:
public static void main(String[] args) throws Exception{
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(RABBITMQ_HOST_NAME)
.setPort(RABBITMQ_PORT_NUMBER)
.setUserName(RABBITMQ_USERNAME)
.setPassword(RABBITMQ_PASSWORD)
.setVirtualHost(RABBITMQ_VIRTUAL_HOST)
.setConnectionTimeout(5000)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(
connectionConfig,
KURA_QUEUE_NAME,
simpleStringSchema));
env.fromElements("ALERT")
.addSink(new IotRMQSink(connectionConfig,
ANDROID_QUEUE_NAME,
simpleStringSchema));
dataStreamSource.filter(new TemperatureFluctuationDetector());
env.execute();
}
И вот код, который я хочу выглядеть (но бросает NPEx):
public static RMQConnectionConfig connectionConfig;
public static StreamExecutionEnvironment env;
public static SimpleStringSchema simpleStringSchema;
public static void main(String[] args) throws Exception{
connectionConfig = new RMQConnectionConfig.Builder()
.setHost(RABBITMQ_HOST_NAME)
.setPort(RABBITMQ_PORT_NUMBER)
.setUserName(RABBITMQ_USERNAME)
.setPassword(RABBITMQ_PASSWORD)
.setVirtualHost(RABBITMQ_VIRTUAL_HOST)
.setConnectionTimeout(5000)
.build();
env = StreamExecutionEnvironment.getExecutionEnvironment();
simpleStringSchema = new SimpleStringSchema();
DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(
connectionConfig,
KURA_QUEUE_NAME,
simpleStringSchema));
publish();
dataStreamSource.filter(new TemperatureFluctuationDetector());
env.execute();
}
public static void publish(){
env.fromElements("ALERT")
.addSink(new IotRMQSink(connectionConfig,
ANDROID_QUEUE_NAME,
simpleStringSchema));
}
Это выглядит как StreamExecutionEnvironment
является нулевым, но я установил его в основной функции. У кого-нибудь есть идеи, как этого избежать? NullPointerException
?