flink SourceFunction<> заменяется в StreamExecutionEnvironment.addSource()?
Я столкнулся с этой проблемой, когда пытался создать собственный источник события. Который содержит очередь, которая позволяет моему другому процессу добавлять элементы в нее. Затем ожидайте, что мой шаблон CEP будет печатать некоторые сообщения отладки, когда есть совпадение.
Но совпадений нет, что бы я ни добавил в очередь. Затем я замечаю, что очередь внутри mySource.run() всегда пуста. Это означает, что очередь, которую я использовал для создания экземпляра mySource, отличается от очереди внутри StreamExecutionEnvironment
, Если я изменяю очередь на статическую, заставляю все экземпляры делить одну и ту же очередь, все работает как положено.
DummySource.java
public class DummySource implements SourceFunction<String> {
private static final long serialVersionUID = 3978123556403297086L;
// private static Queue<String> queue = new LinkedBlockingQueue<String>();
private Queue<String> queue;
private boolean cancel = false;
public void setQueue(Queue<String> q){
queue = q;
}
@Override
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
throws Exception {
System.out.println("run");
synchronized (queue) {
while (!cancel) {
if (queue.peek() != null) {
String e = queue.poll();
if (e.equals("exit")) {
cancel();
}
System.out.println("collect "+e);
ctx.collectWithTimestamp(e, System.currentTimeMillis());
}
}
}
}
@Override
public void cancel() {
System.out.println("canceled");
cancel = true;
}
}
Поэтому я копаюсь в исходном коде StreamExecutionEnvironment
, Внутри метода addSource(). Существует метод clean(), который выглядит так, как будто он заменяет экземпляр на новый.
Возвращает очищенную от замыкания версию данной функции.
Это почему? и почему его нужно сериализовать? Я также пытаюсь отключить чистое закрытие, используя getConfig(). Результат все тот же. Мой экземпляр очереди не тот, который использует env.
Как мне решить эту проблему?
1 ответ
clean()
Метод, используемый в функциях Flink, предназначен главным образом для Function
(как SourceFunction, MapFunction) сериализуемый. Flink будет сериализовать эти функции и распределить их по узлам задач для их выполнения.
Для простых переменных в вашем основном коде Flink, таких как int, вы можете просто ссылаться на них в своей функции. Но для больших или не сериализуемых лучше использовать широковещательную и богатую функцию источника. Пожалуйста, обратитесь к https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables