Экспоненциально убывающая скользящая средняя по скачкообразному окну в Flink SQL: время приведения

Теперь у нас есть SQL с причудливым управлением окнами во Flink, я пытаюсь, чтобы затухающая скользящая средняя называлась "что будет возможно в будущих выпусках Flink как для Table API, так и для SQL". из их дорожной карты SQL / превью 2017-03 пост:

table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

Вот моя попытка (также вдохновленная примером распада кальцита):

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

Время - это время обработки, которое мы получаем как proctime при создании write_position из таблицы AppendStream как:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

Я получаю эту ошибку:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

Я пробовал приводить proctime ко всем известным мне типам (в попытке достичь ЦУМНОЙ земли обетованной), и я просто не могу найти, как заставить это работать.

Я что-то пропустил? Является ли proctime каким-то особым видом времени изменения системы, которое вы не можете преобразовать? Если так, то все равно должен быть какой-то способ сравнить его со значением HOP_START(proctime,...).

1 ответ

Решение

Вы можете использовать timestampDiff для вычитания двух временных точек (см. Документы). Вы используете это так

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

где timepointunit может быть ВТОРОМ, МИНУТОМ, ЧАСОМ, ДНЕМ, МЕСЯЦЕМ или ГОДОМ.

Я не пробовал это с временем обработки, но он работает с полями времени события, так что, надеюсь, так и будет.

Другие вопросы по тегам