Получение NullPointerException при попытке создать RMQSink на Flink

Я пытаюсь заставить Sink for Flink отправлять сообщения из Flink в RabbitMQ. Когда я создаю Sink в main, все работает нормально - я получаю сообщение в специальной очереди на RabbitMQ. К сожалению, когда я пытаюсь создать Sink вне основного метода - он заканчивается NPEx (Flink показывает, что StreamExecutionEnvironment имеет значение null).

У работающего кода есть один недостаток - он может отправить сообщение только один раз при запуске.jar на Flink. Я хочу иметь функцию, которая позволит мне отправлять сообщение в RabbitMQ при вызове этой функции.

Вот код, который работает:

public static void main(String[] args) throws Exception{

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
        .setHost(RABBITMQ_HOST_NAME)
        .setPort(RABBITMQ_PORT_NUMBER)
        .setUserName(RABBITMQ_USERNAME)
        .setPassword(RABBITMQ_PASSWORD)
        .setVirtualHost(RABBITMQ_VIRTUAL_HOST)
        .setConnectionTimeout(5000)
        .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SimpleStringSchema simpleStringSchema = new SimpleStringSchema();

DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(
        connectionConfig,
        KURA_QUEUE_NAME,
        simpleStringSchema));

        env.fromElements("ALERT")
        .addSink(new IotRMQSink(connectionConfig,
                ANDROID_QUEUE_NAME,
                simpleStringSchema));

dataStreamSource.filter(new TemperatureFluctuationDetector());

env.execute();
}

И вот код, который я хочу выглядеть (но бросает NPEx):

public static RMQConnectionConfig connectionConfig;
public static StreamExecutionEnvironment env;
public static SimpleStringSchema simpleStringSchema;

public static void main(String[] args) throws Exception{

connectionConfig = new RMQConnectionConfig.Builder()
        .setHost(RABBITMQ_HOST_NAME)
        .setPort(RABBITMQ_PORT_NUMBER)
        .setUserName(RABBITMQ_USERNAME)
        .setPassword(RABBITMQ_PASSWORD)
        .setVirtualHost(RABBITMQ_VIRTUAL_HOST)
        .setConnectionTimeout(5000)
        .build();

env = StreamExecutionEnvironment.getExecutionEnvironment();

simpleStringSchema = new SimpleStringSchema();

DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(
        connectionConfig,
        KURA_QUEUE_NAME,
        simpleStringSchema));

publish();

dataStreamSource.filter(new TemperatureFluctuationDetector());

env.execute();
}

public static void publish(){
    env.fromElements("ALERT")
        .addSink(new IotRMQSink(connectionConfig,
                ANDROID_QUEUE_NAME,
                simpleStringSchema));
}

Это выглядит как StreamExecutionEnvironment является нулевым, но я установил его в основной функции. У кого-нибудь есть идеи, как этого избежать? NullPointerException?

0 ответов

Другие вопросы по тегам