Как вы можете загрузить CSV в PyFlink в качестве источника таблицы потоковой передачи?
Я пытаюсь настроить простую игровую среду для использования Flink Python Table API. Вакансии, которые я в конечном итоге пытаюсь написать, будут подпитываться очередями Kafka или Kenesis, но это очень затрудняет игру с идеями (и тестами).
Я могу с удовольствием загрузить файл из CSV и обработать его в пакетном режиме. Но я не могу заставить его работать в потоковом режиме. Как бы я сделал что-то подобное, но в StreamingExecutionEnvironment (в первую очередь, чтобы я мог поиграть с окнами).
Я понимаю, что мне нужно заставить систему использовать EventTime (потому что ProcTime будет работать сразу), но я не могу найти способ настроить это. В принципе, я должен иметь возможность установить один из столбцов CSV в качестве времени события, но из документации не ясно, как это сделать (или если это возможно).
Чтобы запустить тесты пакетного выполнения, я использовал приведенный ниже код, который читается из input.csv
и выводит на output.csv
.
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
TableConfig,
DataTypes,
BatchTableEnvironment,
StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"
try:
out_path.unlink()
except:
pass
from pyflink.table.window import Tumble
(
t_env.connect(FileSystem().path(str(root / "input.csv")))
.with_format(Csv())
.with_schema(
Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
)
.create_temporary_table("mySource")
)
(
t_env.connect(FileSystem().path(str(out_path)))
.with_format(Csv())
.with_schema(
Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
)
.create_temporary_table("mySink")
)
(
t_env.from_path("mySource")
.group_by("word")
.select("word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
t_env.execute("tutorial_job")
а input.csv - это
2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve
Итак, мой вопрос в том, как я могу настроить его так, чтобы он читал из того же CSV, но использовал первый столбец в качестве времени события и позволял мне писать такой код, как:
(
t_env.from_path("mySource")
.window(Tumble.over("10.minutes").on("time").alias("w"))
.group_by("w, word")
.select("w, word, count(1) as count")
.filter("count > 1")
.insert_into("mySink")
)
Любая помощь будет принята с благодарностью, я не могу решить это из документов. Я используюpython 3.7
а также flink 1.11.1
.
2 ответа
Если вы используете API дескриптора, вы можете указать поле как поле времени события через схему:
.with_schema( # declare the schema of the table
Schema()
.field("rowtime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("time")
.watermarks_periodic_bounded(60000))
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
)
Но я все же рекомендую вам использовать DDL, с одной стороны, его проще использовать, с другой стороны, есть некоторые ошибки в существующем API дескриптора, сообщество обсуждает рефакторинг API дескриптора.