Использование ROW() для вложенной структуры данных

Я успешно использовал JsonRowSerializationSchema из артефакта flink-json для создания TableSink<Row> и вывести JSON из SQL, используя ROW. Он отлично работает для передачи плоских данных:

INSERT INTO outputTable 
SELECT 
  ROW(col1, col1)
FROM inputTable
>>>> OK:
{"outCol1":"dasdasdas","outCol2":"dasdasdas"}

Теперь я пробую вложенную схему, и она странным образом разбивается:

INSERT INTO outputTable 
SELECT 
  ROW('ttt', ROW('ppp'))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"ppp"}}

INSERT INTO outputTable 
SELECT 
  ROW('ttt', ROW(col1))
FROM inputTable
>>>> OK:
{"outCol1":"ttt","outCol2":{"outCol3":"dasdasdas"}}

INSERT INTO outputTable 
SELECT 
  ROW(col1, ROW(col1))
FROM inputTable
>>>> KO

Это проблема разбора, но я озадачен тем, почему это может произойти. col1 и 'ttt' имеют выражения типа String и должны быть заменяемыми; но каким-то образом синтаксический анализатор возмущается следующей строкой, как говорит трассировка стека:

Caused by: org.apache.calcite.sql.parser.impl.ParseException: Encountered ", ROW" at line 3, column 11.
Was expecting one of:
    ")" ...
    "," <IDENTIFIER> ...
    "," <QUOTED_IDENTIFIER> ...
    "," <BACK_QUOTED_IDENTIFIER> ...
    "," <BRACKET_QUOTED_IDENTIFIER> ...
    "," <UNICODE_QUOTED_IDENTIFIER> ...

    at org.apache.calcite.sql.parser.impl.SqlParserImpl.generateParseException(SqlParserImpl.java:23019)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.jj_consume_token(SqlParserImpl.java:22836)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.ParenthesizedSimpleIdentifierList(SqlParserImpl.java:4466)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression3(SqlParserImpl.java:3328)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2b(SqlParserImpl.java:3066)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression2(SqlParserImpl.java:3092)
    at org.apache.calcite.sql.parser.impl.SqlParserImpl.Expression(SqlParserImpl.java:3045)
    at ...

Я что-то упускаю из-за синтаксиса? Что пытается сделать парсер? Должен ли я использовать ROW() по-другому?

Это ошибка?

1 ответ

Решение

После дальнейших раскопок я пришел к следующему результату: вам просто нужно красиво поговорить с ROW().

Это будет работать:

INSERT INTO outputTable
SELECT ROW(col1, col2) 
FROM (
  SELECT 
    col1, 
    ROW(col1, col1) as col2 
  FROM inputTable
) tbl2

Замечания:

  • Вложенность: возможно, SQL допускает только один уровень вложенности. Но вам разрешено несколько табличных выражений. Я полагаю, что в данный момент Flink мало что делает для преобразования семантики SQL, прежде чем передать ее в механизм исполнения. План выполнения создаст слитную строку (col1, ROW(col1, col1)) в одном блоке, так что это не так эффективно.
  • ROW (col1, col1): ROW (col1) во вторичной таблице не будет работать. (это будет работать автономно в первой таблице). Не знаю почему. Но эй, мне действительно нужно это, когда у меня есть только одно значение? Я могу свернуть это одно значение. Если у вас есть некоторая свобода действий в выходной схеме, это не будет проблемой.

Я представил вопрос JIRA здесь:

https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11399

Буду обновлять этот пост соответственно

Другие вопросы по тегам