Интеграция DataStreamAPI и TableAPI
В дополнение к этому вопросу я создал этот пример для интеграции
DataStreamAPI
и
TableAPI
и на этот раз у меня нет ошибки, и у меня есть два задания вместо одного, одно создано для
DataStreamAPI
который работает идеально, а другая работа создана для
TableAPI
который тоже работает идеально, но единственная проблема заключается в том, что никогда не получает никакого значения от
DataStreamAPI
, пример:
/*FILTERING NULL IDs*/
final SingleOutputStreamOperator<Event> stream_filtered = eventsStream
.filter(new NullidEventsFilterFunction())
.uid("id_filter_operator")
.name("Event Filter");
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
SingleOutputStreamOperator<String> toTable = stream_filtered.map(x -> x.id).name("Map for table");
Table source = fsTableEnv.fromDataStream(toTable);
source.execute(); /*without this line the TableAPI job is not started, but nothing happens if is not there either*/
DataStream<String> finalRes = fsTableEnv.toAppendStream(source, String.class);
finalRes.map((MapFunction<String, String>) value -> value)
.name("Mapping after table")
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
}).name("Sink after map from table");
/*STARTING TRANSFORMATIONS*/
Init.init(stream_filtered);
env.execute(job_name);
делая это, я вижу эту строку в регистраторе:
INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Event Mapper -> Watermarks Added -> Event Filter -> Map for table -> SourceConversion(table=[Unregistered_DataStream_5], fields=[f0]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (0d3cd78d35480c44f09603786bf775e7) switched from DEPLOYING to RUNNING.
но запись не получена и не отправлена.
См. Изображение для
DataStream
работа
и посмотрите изображение для
TableAPI
работа
Есть идеи? Заранее спасибо. С уважением!
1 ответ
Если вы хотите написать одно задание, которое начинается и заканчивается API DataStream и использует API таблиц посередине, то вот простой пример, который вы можете использовать.
Обратите внимание, что задействованные детали менялись от выпуска к выпуску, и этот конкретный пример работает так, как написано с Flink 1.11. FLIP-136: В настоящее время ведется работа по улучшению взаимодействия между DataStream и Table API, чтобы сделать это еще проще.
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
public class BackAndForth {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<Tuple2<String, Long>> rawInput = env.fromElements(
new Tuple2<>("u2", 0L),
new Tuple2<>("u1", 5L),
new Tuple2<>("u2", 1L),
new Tuple2<>("u3", 1L),
new Tuple2<>("u1", 0L),
new Tuple2<>("u1", 3L),
new Tuple2<>("u2", 2L));
Table events = tableEnv.fromDataStream(rawInput, $("userId"), $("value"));
Table results = events
.select($("userId"), $("value"))
.where($("value").isGreater(0));
tableEnv
.toAppendStream(results, Row.class)
.print();
env.execute();
}
}
Вы можете быть обеспокоены тем, что в веб-интерфейсе отображается "Отправлено записей: 0" и "Получено записей: 0". Это очень заблуждение. Эти метрики Flink измеряют только записи и байты, проходящие внутри Flink, и не сообщают о каких-либо операциях ввода-вывода с внешними системами. Эти метрики также не сообщают о записях и байтах, передаваемых между связанными друг с другом операторами. Все в этих двух заданиях связано, поэтому отправленные / полученные записи / байты в этом случае всегда будут нулевыми.