Триггер водяного знака в Onyx не срабатывает

У меня есть поток сегментов Оникса, которые являются сообщениями с отметкой времени (поступающими в хронологическом порядке). Скажем, они выглядят так:

{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:id 2 :timestamp "2018-09-04 21:32:03" :msg "Lorem ipsum"}
{:id 3 :timestamp "2018-09-05 03:01:52" :msg "Dolor sit amet"}
{:id 4 :timestamp "2018-09-05 09:28:16" :msg "Consetetur sadipscing"}
{:id 5 :timestamp "2018-09-05 12:45:33" :msg "Elitr sed diam"}
{:id 6 :timestamp "2018-09-06 08:14:29" :msg "Nonumy eirmod"}
...

Для каждого временного окна (одного дня) в данных я хочу выполнить вычисление на множестве всех его сегментов. Т.е. в этом примере я хотел бы работать с сегментами с идентификаторами 1 и 2 (4 сентября), затем с идентификаторами 3, 4 и 5 (5 сентября) и так далее.

Оникс предлагает окна и триггеры, и они должны делать то, что я хочу из коробки. Если я использую окно :window/type :fixed и агрегировать по :window/range [1 :day] в отношении :window/window-key :timestamp Я буду агрегировать все сегменты каждого дня.

Чтобы запускать мои вычисления только по прибытии всех сегментов дня, Onyx предлагает поведение триггера. :onyx.triggers/watermark, Согласно документации, он должен стрелять

если значение :window/window-key в сегменте превышает верхнюю границу в пределах активного окна

Однако триггер не срабатывает, хотя я вижу, что более поздние сегменты уже поступают, и несколько окон должны быть заполнены. В качестве проверки работоспособности я попробовал простую :onyx.triggers/segment триггер, который работал как положено.


Моя неудачная попытка создать минимальный пример:

Я изменил игрушку с фиксированными окнами, чтобы проверить срабатывание водяных знаков, и это сработало там.

Тем не менее, я обнаружил, что в этой игрушечной работе причина срабатывания триггера водяного знака может быть:

Это закрыло входной канал? Может быть, работа только что завершена, что может вызвать водяной знак тоже.


Другим аспектом, который взаимодействует с запуском водяных знаков, является распределенная работа над задачами между партнерами.

Комментарии к выпуску № 839 (:trigger/emit не работает с :onyx.triggers/watermark) в репозитории Onyx указал мне на проблему # 840 (Watermark не работает с темой Kafka, имеющей> 1 раздел), где я нашел эту подсказку (выделение мое):

Проблема заключается в том, что все ваши данные оказываются в одном разделе, а водяные знаки всегда принимают минимальный водяной знак по всем входным одноранговым узлам (и, если используются нативные водяные знаки kafka, минимальный водяной знак для данного однорангового узла).

Когда вы вызываете g/send с небольшими объемами данных и автоматическим назначением раздела, все ваши данные попадают в один раздел, что означает, что одноранговый узел другого раздела продолжает испускать водяной знак 0.


Я узнал, что:

Его нельзя использовать с текущим триггером водяного знака, который зависит от источника входного сигнала. Вы можете попытаться вытащить предыдущую реализацию водяного знака [...]

Однако в моем графе задач сегменты, которые я хочу объединить в окнах, создаются только в какой-то промежуточной задаче, а не в исходной задаче как таковой. Сегменты ввода предоставляют только информацию о том, как создавать / извлекать содержимое сегментов для этой промежуточной задачи.

Опять же, эта конструкция прекрасно работает в вышеупомянутой игрушечной работе. Причина в том, что входной канал в какой-то момент закрыт, что завершает задание, что, в свою очередь, вызывает появление водяного знака. Так что мой игрушечный пример на самом деле не очень хорошая модель, потому что это не открытый поток.

Если задание получает рассматриваемые сегменты из фактического источника ввода, но без временных отметок, Оникс, кажется, предоставляет пространство для указания assign-watermark-fn, который является необязательным атрибутом входной задачи. Эта функция устанавливает водяной знак при каждом прибытии нового сегмента. В моем случае это не помогает, поскольку сегменты не происходят из входной задачи.

1 ответ

Теперь я нашел способ обойти себя. Документация в основном дает подсказку, как это можно сделать:

Это функция быстрого вызова триггера пунктуации, который срабатывает, когда какой-либо фрагмент данных имеет основанный на времени ключ окна, превышающий другой экстент, фактически объявляя, что больше данных для более ранних окон поступать не будет.

Таким образом, я изменил задачу, которая генерирует сегменты, так что для каждого сегмента будет также генерироваться другой сегмент, похожий на "дозорный":

[{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:timestamp "2018-09-03 13:15:42" :over :out}]

Обратите внимание, что :timestamp предшествует диапазону окна (здесь, 1 день). Так что оно будет отправлено в предыдущее окно. Поскольку мои данные поступают в хронологическом порядке, :punctuation По наличию сегмента "страж" (с ключевым словом:over) триггер может сказать, что окно можно закрыть. Не забудьте выселить (т.е. :trigger/post-evictor [:all]) и выбросить сегмент "страж" из последнего окна. Добавление :onyx/max-peers 1 в карте задач следит за тем, чтобы дозорный всегда появлялся в конечном итоге, особенно при использовании группировки.

Обратите внимание, что в это временное решение входят два предположения:

  1. Данные поступают в хронологическом
  2. Там нет окон без сегментов
Другие вопросы по тегам