Оникс: Не удается подобрать триггер / испустить результаты в следующей задаче
Я пытаюсь начать работу с Onyx, платформой распределенных вычислений в Clojure. В частности, я пытаюсь понять, как агрегировать данные. Если я правильно понимаю документацию, сочетание окна и :trigger/emit
Функция должна позволить мне сделать это.
Итак, я изменил пример агрегации (Onyx 0.13.0) тремя способами (см. Gist с полным кодом):
- в
-main
яprintln
любые сегменты, помещенные в выходной канал; это работает, как и ожидалось, с исходным кодом, так как он собирает все сегменты и выводит их на стандартный вывод. Я добавляю функцию emit следующим образом:
(defn make-ds [event window trigger {:keys [lower-bound upper-bound event-type] :as state-event} extent-state] (println "make-ds called") {:ds window})
Я добавляю конфигурацию триггера (оригинал
dump-words
триггер испускается для краткости):(def triggers [{:trigger/window-id :word-counter :trigger/id :make-ds :trigger/on :onyx.triggers/segment :trigger/fire-all-extents? true :trigger/threshold [5 :elements] :trigger/emit ::make-ds}])
Я меняю
:count-words
Задача от вызоваidentity
функция кreduce
введите, чтобы он не передавал все входные сегменты на выход (и добавил опции конфигурации, которые оникс должен решать как пакет):{:onyx/name :count-words ;:onyx/fn :clojure.core/identity :onyx/type :reduce ; :function :onyx/group-by-key :word :onyx/flux-policy :kill :onyx/min-peers 1 :onyx/max-peers 1 :onyx/batch-size 1000 :onyx/batch-fn? true}
Когда я запускаю это сейчас, я вижу в выводе, что функция emit (т.е. make-ds
) вызывается для каждого входного сегмента (первый выход поступает из dump-words
триггер оригинального кода):
> lein run
[....]
Om -> 1
name -> 1
My -> 2
a -> 1
gone -> 1
Coffee -> 1
to -> 1
get -> 1
Time -> 1
make-ds called
make-ds called
make-ds called
make-ds called
[....]
Однако сборка сегмента от make-ds не доходит до выходного канала, они никогда не печатаются. Если я вернусь :count-words
задача к identity
функция, это работает просто отлично. Кроме того, это выглядит так, как будто функция emit вызывается для каждого входного сегмента, тогда как я ожидаю, что она будет вызываться только тогда, когда пороговое условие истинно (т. Е. Всякий раз, когда 5 элементов были агрегированы в окне).
В качестве теста для этой функциональности в базе кода Onyx (onyx.windowing.emit-aggregate-test
) проходит очень хорошо, я предполагаю, что где-то совершаю глупую ошибку, но я не могу понять, что.
1 ответ
Я наконец-то увидел, что в журнале было предупреждение onxy.log
как это:
[clojure.lang.ExceptionInfo: Windows cannot be checkpointed with ZooKeeper unless
:onyx.peer/storage.zk.insanely-allow-windowing? is set to true in the peer config.
This should only be turned on as a development convenience.
[clojure.lang.ExceptionInfo: Handling uncaught exception thrown inside task
lifecycle :lifecycle/checkpoint-state. Killing the job. -> Exception type:
clojure.lang.ExceptionInfo. Exception message: Windows cannot be checkpointed with
ZooKeeper unless :onyx.peer/storage.zk.insanely-allow-windowing? is set to true in
the peer config. This should only be turned on as a development convenience.
Как только я установил это, я наконец получил некоторые сегменты для следующей задачи. Т.е. мне пришлось изменить одноранговый конфиг на:
(def peer-config
{:zookeeper/address "127.0.0.1:2189"
:onyx/tenancy-id id
:onyx.peer/job-scheduler :onyx.job-scheduler/balanced
:onyx.peer/storage.zk.insanely-allow-windowing? true
:onyx.messaging/impl :aeron
:onyx.messaging/peer-port 40200
:onyx.messaging/bind-addr "localhost"})
Сейчас, :onyx.peer/storage.zk.insanely-allow-windowing?
не похоже на хорошую вещь, чтобы сделать. Лукас Брэдстрит рекомендовал при переключении канала Clojurians Slack на контрольную точку S3.