Описание тега flink-table-api

Apache Flink - это платформа с открытым исходным кодом для масштабируемой пакетной и потоковой обработки данных. Flink поддерживает пакетную и потоковую аналитику в одной системе. Аналитические программы можно писать краткими и элегантными API-интерфейсами на Java и Scala. Apache Flink предлагает Table API (и SQL API) как унифицированные API для потоковой и пакетной обработки.
2 ответа

Простой SQL-запрос TableAPI не работает на Flink 1.10 и Blink

Я хочу определить коннектор Kafka с помощью TableAPI и запустить SQL для такой описанной таблицы (при поддержке Kafka). К сожалению, кажется, чтоRowtime определение работает не так, как ожидалось. Вот воспроизводимый пример: object DefineSource exte…
01 июн '20 в 18:41
1 ответ

Apache-Flink 1.11 Невозможно использовать Python UDF в SQL Function DDL

Согласно этой странице слияния: https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL python udf во Flink 1.11 доступны для использования в функциях SQL. Я пошел к документации flink здесь: https://ci.a…
0 ответов

FLINK SQL JOIN с НЕ СУЩЕСТВУЮЩИМ ключевым словом и окном HOP

Я пытаюсь реализовать этот вариант использования с помощью flink sql. Пользователи, которые щелкнули товар 2 раза (ItemBuyPopOpen), но не купили его (ItemBuySuccess) в течение 24 часов Первый подход не сработал из-за следующего исключения.Caused by:…
1 ответ

Какой бэкэнд использует Flink Table API? Требуется ли реляционная БД?

Я новичок во Flink и пытаюсь понять подходящие варианты использования, в которых можно использовать Stream API/ Table API. Как часть этого пытается понять Как и Stream API, есть ли у Table API гибкость для выбора типа серверной части состояния, кото…
1 ответ

Apache-Flink 1.11 Невозможно использовать Python UDF через DDL-функцию SQL в задании потоковой передачи Java Flink

В Flip-106 есть пример того, как вызвать определяемую пользователем функцию python в java-приложении пакетного задания через SQL Function DDL... BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); tEnv.getConfig().getConfiguration().setS…
1 ответ

Интеграция DataStreamAPI и TableAPI

В дополнение к этому вопросу я создал этот пример для интеграции DataStreamAPI и TableAPI и на этот раз у меня нет ошибки, и у меня есть два задания вместо одного, одно создано для DataStreamAPI который работает идеально, а другая работа создана для…
1 ответ

Модульное тестирование Flink SQL: как назначить водяной знак?

Я пишу модульный тест для SQL-оператора Flink, который использует match_recognize. Я настраиваю тестовые данные вот так Table data = tEnv.fromValues(DataTypes.ROW( DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)), DataTypes.FIELD(&quo…
2 ответа

pyflink JDBC Postgresql Каталог выдаёт ошибки для типа данных UUID, как обрабатывать тип данных uuid в API таблиц Flink?

Каталог API таблиц Python Apache Flink 1.11.0: postgresql Чтение и запись данных из таблиц каталога postgresql, которые содержат столбцы с типом данных UUID через API таблиц, выбрасывая тип данных UUID unsupportedOperatorException. Как обрабатывать …
1 ответ

Список всех источников и приемников в задании Flink SQL

Я создаю своего рода оболочку вокруг Flink SQL. Я создаю задание с набором задаваемых пользователем операторов SQL с StreamTableEnvironment.sqlUpdate. Некоторые INSERTs, некоторые из них CREATEс. Я также делаю некоторые sqlQueryс. Прежде чем я позво…
1 ответ

API таблицы Flink: GROUP BY при выполнении SQL вызывает исключение org.apache.flink.table.api.TableException

У меня очень упрощенный вариант использования: я хочу использовать Apache Flink (1.11) для чтения данных из темы Kafka (назовем ее source_topic), подсчета атрибута в ней (называемого b) и записи результата в другую тему Kafka (result_topic). Пока у …
0 ответов

Выполнение удаленного задания Flink с запросом к Hive на кластере Flink

Я использую Flink 1.11.2, Hive 2.1.1, Java 8. Попытка выполнить удаленный запрос к Hive, упаковать его в jar и запустить с помощью RestClient Flink: private static String jar = "/path/Job.jar"; Configuration config = RemoteConfiguration.ge…
26 ноя '20 в 11:39
1 ответ

Хранилище Flink Table и Hive Catalog

У меня есть тема кафки и Hive Metastore. Я хочу присоединиться к входящим событиям из темы kafka с записями метастора. Я увидел возможность с Flink использовать каталог для запроса Hive Metastore. Итак, я вижу два способа справиться с этим: использо…
2 ответа

Исключение таблицы Flink: совокупность окон может быть определена только для столбца атрибутов времени, но обнаружено TIMESTAMP(6)

Я использую flink 1.12.0. Пытаюсь преобразовать поток данных в таблицу A и запустить запрос sql в таблице A для агрегирования по окну, как показано ниже. Я использую столбец f2 в качестве поля типа данных временной метки. EnvironmentSettings fsSetti…
0 ответов

Как присвоить уникальный идентификатор каждой строке в таблице в API таблиц Flink?

Я использую Flink для вычисления серии операций. Каждая операция создает таблицу, которая используется для следующей операции, а также сохраняется в S3. Это позволяет просматривать данные на каждом промежуточном этапе расчета и видеть эффект каждой …
0 ответов

Ошибка нехватки памяти - куча при сохранении паркетных файлов с помощью Flink Table API (Flink версия-1.12.0) в Google Cloud Storage

Надеюсь у тебя все хорошо. В настоящее время мы используем Flink Table API (Flink Version-1.12.0) для потоковой передачи данных из Kafka и хранения их в Google Cloud Storage. Формат файла, который мы используем для хранения данных, - Parquet. Первон…
26 мар '21 в 02:07
0 ответов

Последующая оконная группировка в Flink TableAPI приводит к RuntimeException

Я работаю над проектом по группировке / суммированию потоков графов с использованием API таблиц Apache Flink (1.12.0). В нашем алгоритме мы сначала обрабатываем вершины, т. Е. Группируем их и объединяем некоторые свойства. Вот фрагмент моей заявки: …
1 ответ

Приемник flink DataStream с использованием соединителя jdbc для приемника mysql с перезаписью

Мой вариант использования Получение данных из потока данных AWS Kinesis и фильтрация / отображение с помощью api потока данных flink Используйте StreamTable Environment для группировки и агрегирования данных Используйте SQLTableEnvironment для запис…
1 ответ

неправильный результат в Apache flink full external join

У меня есть 2 потока данных, которые были созданы из 2 таблиц, например: Table orderRes1 = ste.sqlQuery( "SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble + " Group by orderId, userId"); Table orderRes2 = ste.sqlQuery( "SELECT orderId, userI…
08 май '21 в 09:20
2 ответа

Ошибка преобразования вложенных классов в DataStream

Я использую flink 1.13. Я пытаюсь преобразовать результаты таблицы в поток данных следующим образом, но продолжаю получать ошибку. public class HybridTrial { public static class Address { public String street; public String houseNumber; public Addre…
1 ответ

Flink Table,Create table Ошибка типа массива «ValidationException»

Я создал таблицу flink, которая содержит поля типа данных, и тип ошибки не совпадает。 Я хочу знать, как создать временную таблицу, содержащую тип массива в таблице flink. public class FlinkConnectorClickhouse { public static void main(String[] args)…