Описание тега flink-table-api
Apache Flink - это платформа с открытым исходным кодом для масштабируемой пакетной и потоковой обработки данных. Flink поддерживает пакетную и потоковую аналитику в одной системе. Аналитические программы можно писать краткими и элегантными API-интерфейсами на Java и Scala. Apache Flink предлагает Table API (и SQL API) как унифицированные API для потоковой и пакетной обработки.
2
ответа
Простой SQL-запрос TableAPI не работает на Flink 1.10 и Blink
Я хочу определить коннектор Kafka с помощью TableAPI и запустить SQL для такой описанной таблицы (при поддержке Kafka). К сожалению, кажется, чтоRowtime определение работает не так, как ожидалось. Вот воспроизводимый пример: object DefineSource exte…
01 июн '20 в 18:41
1
ответ
Apache-Flink 1.11 Невозможно использовать Python UDF в SQL Function DDL
Согласно этой странице слияния: https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL python udf во Flink 1.11 доступны для использования в функциях SQL. Я пошел к документации flink здесь: https://ci.a…
09 июл '20 в 22:39
0
ответов
FLINK SQL JOIN с НЕ СУЩЕСТВУЮЩИМ ключевым словом и окном HOP
Я пытаюсь реализовать этот вариант использования с помощью flink sql. Пользователи, которые щелкнули товар 2 раза (ItemBuyPopOpen), но не купили его (ItemBuySuccess) в течение 24 часов Первый подход не сработал из-за следующего исключения.Caused by:…
19 июл '20 в 15:52
1
ответ
Какой бэкэнд использует Flink Table API? Требуется ли реляционная БД?
Я новичок во Flink и пытаюсь понять подходящие варианты использования, в которых можно использовать Stream API/ Table API. Как часть этого пытается понять Как и Stream API, есть ли у Table API гибкость для выбора типа серверной части состояния, кото…
29 июл '20 в 08:07
1
ответ
Apache-Flink 1.11 Невозможно использовать Python UDF через DDL-функцию SQL в задании потоковой передачи Java Flink
В Flip-106 есть пример того, как вызвать определяемую пользователем функцию python в java-приложении пакетного задания через SQL Function DDL... BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); tEnv.getConfig().getConfiguration().setS…
04 авг '20 в 00:49
1
ответ
Интеграция DataStreamAPI и TableAPI
В дополнение к этому вопросу я создал этот пример для интеграции DataStreamAPI и TableAPI и на этот раз у меня нет ошибки, и у меня есть два задания вместо одного, одно создано для DataStreamAPI который работает идеально, а другая работа создана для…
08 сен '20 в 01:18
1
ответ
Модульное тестирование Flink SQL: как назначить водяной знак?
Я пишу модульный тест для SQL-оператора Flink, который использует match_recognize. Я настраиваю тестовые данные вот так Table data = tEnv.fromValues(DataTypes.ROW( DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)), DataTypes.FIELD(&quo…
24 сен '20 в 00:40
2
ответа
pyflink JDBC Postgresql Каталог выдаёт ошибки для типа данных UUID, как обрабатывать тип данных uuid в API таблиц Flink?
Каталог API таблиц Python Apache Flink 1.11.0: postgresql Чтение и запись данных из таблиц каталога postgresql, которые содержат столбцы с типом данных UUID через API таблиц, выбрасывая тип данных UUID unsupportedOperatorException. Как обрабатывать …
18 окт '20 в 07:33
1
ответ
Список всех источников и приемников в задании Flink SQL
Я создаю своего рода оболочку вокруг Flink SQL. Я создаю задание с набором задаваемых пользователем операторов SQL с StreamTableEnvironment.sqlUpdate. Некоторые INSERTs, некоторые из них CREATEс. Я также делаю некоторые sqlQueryс. Прежде чем я позво…
10 ноя '20 в 17:48
1
ответ
API таблицы Flink: GROUP BY при выполнении SQL вызывает исключение org.apache.flink.table.api.TableException
У меня очень упрощенный вариант использования: я хочу использовать Apache Flink (1.11) для чтения данных из темы Kafka (назовем ее source_topic), подсчета атрибута в ней (называемого b) и записи результата в другую тему Kafka (result_topic). Пока у …
16 ноя '20 в 20:23
0
ответов
Выполнение удаленного задания Flink с запросом к Hive на кластере Flink
Я использую Flink 1.11.2, Hive 2.1.1, Java 8. Попытка выполнить удаленный запрос к Hive, упаковать его в jar и запустить с помощью RestClient Flink: private static String jar = "/path/Job.jar"; Configuration config = RemoteConfiguration.ge…
26 ноя '20 в 11:39
1
ответ
Хранилище Flink Table и Hive Catalog
У меня есть тема кафки и Hive Metastore. Я хочу присоединиться к входящим событиям из темы kafka с записями метастора. Я увидел возможность с Flink использовать каталог для запроса Hive Metastore. Итак, я вижу два способа справиться с этим: использо…
22 фев '21 в 16:52
2
ответа
Исключение таблицы Flink: совокупность окон может быть определена только для столбца атрибутов времени, но обнаружено TIMESTAMP(6)
Я использую flink 1.12.0. Пытаюсь преобразовать поток данных в таблицу A и запустить запрос sql в таблице A для агрегирования по окну, как показано ниже. Я использую столбец f2 в качестве поля типа данных временной метки. EnvironmentSettings fsSetti…
15 фев '21 в 10:31
0
ответов
Как присвоить уникальный идентификатор каждой строке в таблице в API таблиц Flink?
Я использую Flink для вычисления серии операций. Каждая операция создает таблицу, которая используется для следующей операции, а также сохраняется в S3. Это позволяет просматривать данные на каждом промежуточном этапе расчета и видеть эффект каждой …
15 фев '21 в 18:47
0
ответов
Ошибка нехватки памяти - куча при сохранении паркетных файлов с помощью Flink Table API (Flink версия-1.12.0) в Google Cloud Storage
Надеюсь у тебя все хорошо. В настоящее время мы используем Flink Table API (Flink Version-1.12.0) для потоковой передачи данных из Kafka и хранения их в Google Cloud Storage. Формат файла, который мы используем для хранения данных, - Parquet. Первон…
26 мар '21 в 02:07
0
ответов
Последующая оконная группировка в Flink TableAPI приводит к RuntimeException
Я работаю над проектом по группировке / суммированию потоков графов с использованием API таблиц Apache Flink (1.12.0). В нашем алгоритме мы сначала обрабатываем вершины, т. Е. Группируем их и объединяем некоторые свойства. Вот фрагмент моей заявки: …
29 апр '21 в 11:50
1
ответ
Приемник flink DataStream с использованием соединителя jdbc для приемника mysql с перезаписью
Мой вариант использования Получение данных из потока данных AWS Kinesis и фильтрация / отображение с помощью api потока данных flink Используйте StreamTable Environment для группировки и агрегирования данных Используйте SQLTableEnvironment для запис…
04 май '21 в 09:30
1
ответ
неправильный результат в Apache flink full external join
У меня есть 2 потока данных, которые были созданы из 2 таблиц, например: Table orderRes1 = ste.sqlQuery( "SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble + " Group by orderId, userId"); Table orderRes2 = ste.sqlQuery( "SELECT orderId, userI…
08 май '21 в 09:20
2
ответа
Ошибка преобразования вложенных классов в DataStream
Я использую flink 1.13. Я пытаюсь преобразовать результаты таблицы в поток данных следующим образом, но продолжаю получать ошибку. public class HybridTrial { public static class Address { public String street; public String houseNumber; public Addre…
14 май '21 в 03:25
1
ответ
Flink Table,Create table Ошибка типа массива «ValidationException»
Я создал таблицу flink, которая содержит поля типа данных, и тип ошибки не совпадает。 Я хочу знать, как создать временную таблицу, содержащую тип массива в таблице flink. public class FlinkConnectorClickhouse { public static void main(String[] args)…
24 май '21 в 15:17