Py4JJavaError в API таблицы pyflink
Этот код преобразует pandas в таблицу flink, выполняет преобразование, чем снова конвертируется в pandas. Он отлично работает, когда я используюfilter
filter
чем select
но выдает ошибку, когда я добавляю group_by
а также order_by
.
import pandas as pd
import numpy as np
f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)
df = pd.read_csv("dataBase/New/candidate.csv")
col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
'candidate_middle_name', 'candidate_last_name', 'candidate_email',
'created_date', 'last_modified_date', 'last_modified_by']
table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
.filter("candidate_first_name === 'Libby'")\
.group_by("candidate_id, candidate_source_id")\
.select("candidate_id, candidate_source_id")\
.order_by("candidate_id").to_pandas()
Моя ошибка
Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
1 ответ
Решение
Если вы посмотрите документацию, вы увидите, что в API таблиц ORDER BY поддерживается только для пакетных запросов. Если вы переключитесь на SQL, у вас могут быть потоковые запросы, которые сортируются по возрастающему атрибуту времени.
Сортировка по чему-либо еще в запросе неограниченной потоковой передачи просто невозможна, поскольку для сортировки требуется полное знание входных данных.