flink SourceFunction<> заменяется в StreamExecutionEnvironment.addSource()?

Я столкнулся с этой проблемой, когда пытался создать собственный источник события. Который содержит очередь, которая позволяет моему другому процессу добавлять элементы в нее. Затем ожидайте, что мой шаблон CEP будет печатать некоторые сообщения отладки, когда есть совпадение.

Но совпадений нет, что бы я ни добавил в очередь. Затем я замечаю, что очередь внутри mySource.run() всегда пуста. Это означает, что очередь, которую я использовал для создания экземпляра mySource, отличается от очереди внутри StreamExecutionEnvironment, Если я изменяю очередь на статическую, заставляю все экземпляры делить одну и ту же очередь, все работает как положено.

DummySource.java

    public class DummySource implements SourceFunction<String> {

    private static final long serialVersionUID = 3978123556403297086L;
//  private static Queue<String> queue = new LinkedBlockingQueue<String>();
    private Queue<String> queue;
    private boolean cancel = false;

    public void setQueue(Queue<String> q){
        queue = q;
    }   

    @Override
    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
            throws Exception {
        System.out.println("run");
        synchronized (queue) {          
            while (!cancel) {
                if (queue.peek() != null) {
                    String e = queue.poll();
                    if (e.equals("exit")) {
                        cancel();
                    }
                    System.out.println("collect "+e);
                    ctx.collectWithTimestamp(e, System.currentTimeMillis());
                }
            }
        }
    }

    @Override
    public void cancel() {
        System.out.println("canceled");
        cancel = true;
    }
}

Поэтому я копаюсь в исходном коде StreamExecutionEnvironment, Внутри метода addSource(). Существует метод clean(), который выглядит так, как будто он заменяет экземпляр на новый.

Возвращает очищенную от замыкания версию данной функции.

Это почему? и почему его нужно сериализовать? Я также пытаюсь отключить чистое закрытие, используя getConfig(). Результат все тот же. Мой экземпляр очереди не тот, который использует env.

Как мне решить эту проблему?

1 ответ

Решение

clean() Метод, используемый в функциях Flink, предназначен главным образом для Function(как SourceFunction, MapFunction) сериализуемый. Flink будет сериализовать эти функции и распределить их по узлам задач для их выполнения.

Для простых переменных в вашем основном коде Flink, таких как int, вы можете просто ссылаться на них в своей функции. Но для больших или не сериализуемых лучше использовать широковещательную и богатую функцию источника. Пожалуйста, обратитесь к https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables

Другие вопросы по тегам