Py4JJavaError в API таблицы pyflink

Этот код преобразует pandas в таблицу flink, выполняет преобразование, чем снова конвертируется в pandas. Он отлично работает, когда я используюfilter filter чем select но выдает ошибку, когда я добавляю group_by а также order_by.

import pandas as pd
import numpy as np

f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)

df = pd.read_csv("dataBase/New/candidate.csv")

col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
       'candidate_middle_name', 'candidate_last_name', 'candidate_email',
       'created_date', 'last_modified_date', 'last_modified_by']

table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
    .filter("candidate_first_name === 'Libby'")\
    .group_by("candidate_id, candidate_source_id")\
    .select("candidate_id, candidate_source_id")\
    .order_by("candidate_id").to_pandas()

Моя ошибка

Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
    at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
    at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
    at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
    at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)

1 ответ

Решение

Если вы посмотрите документацию, вы увидите, что в API таблиц ORDER BY поддерживается только для пакетных запросов. Если вы переключитесь на SQL, у вас могут быть потоковые запросы, которые сортируются по возрастающему атрибуту времени.

Сортировка по чему-либо еще в запросе неограниченной потоковой передачи просто невозможна, поскольку для сортировки требуется полное знание входных данных.

Другие вопросы по тегам