Как вы можете загрузить CSV в PyFlink в качестве источника таблицы потоковой передачи?

Я пытаюсь настроить простую игровую среду для использования Flink Python Table API. Вакансии, которые я в конечном итоге пытаюсь написать, будут подпитываться очередями Kafka или Kenesis, но это очень затрудняет игру с идеями (и тестами).

Я могу с удовольствием загрузить файл из CSV и обработать его в пакетном режиме. Но я не могу заставить его работать в потоковом режиме. Как бы я сделал что-то подобное, но в StreamingExecutionEnvironment (в первую очередь, чтобы я мог поиграть с окнами).

Я понимаю, что мне нужно заставить систему использовать EventTime (потому что ProcTime будет работать сразу), но я не могу найти способ настроить это. В принципе, я должен иметь возможность установить один из столбцов CSV в качестве времени события, но из документации не ясно, как это сделать (или если это возможно).

Чтобы запустить тесты пакетного выполнения, я использовал приведенный ниже код, который читается из input.csv и выводит на output.csv.

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    TableConfig,
    DataTypes,
    BatchTableEnvironment,
    StreamTableEnvironment,
)
from pyflink.table.descriptors import Schema, Csv, OldCsv, FileSystem
from pathlib import Path

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
root = Path(__file__).parent.resolve()
out_path = root / "output.csv"

try:
    out_path.unlink()
except:
    pass

from pyflink.table.window import Tumble

(
    t_env.connect(FileSystem().path(str(root / "input.csv")))
    .with_format(Csv())
    .with_schema(
        Schema().field("time", DataTypes.TIMESTAMP(3)).field("word", DataTypes.STRING())
    )
    .create_temporary_table("mySource")
)

(
    t_env.connect(FileSystem().path(str(out_path)))
    .with_format(Csv())
    .with_schema(
        Schema().field("word", DataTypes.STRING()).field("count", DataTypes.BIGINT())
    )
    .create_temporary_table("mySink")
)

(
    t_env.from_path("mySource")
    .group_by("word")
    .select("word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

t_env.execute("tutorial_job")

а input.csv - это

2000-01-01 00:00:00.000000000,james
2000-01-01 00:00:00.000000000,james
2002-01-01 00:00:00.000000000,steve

Итак, мой вопрос в том, как я могу настроить его так, чтобы он читал из того же CSV, но использовал первый столбец в качестве времени события и позволял мне писать такой код, как:

(
    t_env.from_path("mySource")
    .window(Tumble.over("10.minutes").on("time").alias("w"))
    .group_by("w, word")
    .select("w, word, count(1) as count")
    .filter("count > 1")
    .insert_into("mySink")
)

Любая помощь будет принята с благодарностью, я не могу решить это из документов. Я используюpython 3.7 а также flink 1.11.1.

2 ответа

Решение

Если вы используете API дескриптора, вы можете указать поле как поле времени события через схему:

.with_schema(  # declare the schema of the table
             Schema()
             .field("rowtime", DataTypes.TIMESTAMP())
             .rowtime(
                Rowtime()
                .timestamps_from_field("time")
                .watermarks_periodic_bounded(60000))
             .field("a", DataTypes.STRING())
             .field("b", DataTypes.STRING())
             .field("c", DataTypes.STRING())
         )

Но я все же рекомендую вам использовать DDL, с одной стороны, его проще использовать, с другой стороны, есть некоторые ошибки в существующем API дескриптора, сообщество обсуждает рефакторинг API дескриптора.

Вы пробовали использовать стратегии водяных знаков? Как упоминалось здесь, вам необходимо использовать стратегии водяных знаков, чтобы использовать время события. Для случая pyflink лично я думаю, что его проще объявить в формате ddl, как это.

Другие вопросы по тегам