Оникс: Не удается подобрать триггер / испустить результаты в следующей задаче

Я пытаюсь начать работу с Onyx, платформой распределенных вычислений в Clojure. В частности, я пытаюсь понять, как агрегировать данные. Если я правильно понимаю документацию, сочетание окна и :trigger/emit Функция должна позволить мне сделать это.

Итак, я изменил пример агрегации (Onyx 0.13.0) тремя способами (см. Gist с полным кодом):

  • в -main я println любые сегменты, помещенные в выходной канал; это работает, как и ожидалось, с исходным кодом, так как он собирает все сегменты и выводит их на стандартный вывод.
  • Я добавляю функцию emit следующим образом:

    (defn make-ds
       [event window trigger {:keys [lower-bound upper-bound event-type] :as state-event} extent-state]
       (println "make-ds called")
       {:ds window})
    
  • Я добавляю конфигурацию триггера (оригинал dump-words триггер испускается для краткости):

    (def triggers
     [{:trigger/window-id :word-counter
       :trigger/id :make-ds
       :trigger/on :onyx.triggers/segment
       :trigger/fire-all-extents? true
       :trigger/threshold [5 :elements]
       :trigger/emit ::make-ds}])
    
  • Я меняю :count-words Задача от вызова identity функция к reduce введите, чтобы он не передавал все входные сегменты на выход (и добавил опции конфигурации, которые оникс должен решать как пакет):

        {:onyx/name :count-words
         ;:onyx/fn :clojure.core/identity
         :onyx/type :reduce ; :function
         :onyx/group-by-key :word
         :onyx/flux-policy :kill
         :onyx/min-peers 1
         :onyx/max-peers 1
         :onyx/batch-size 1000
         :onyx/batch-fn? true}  
    

Когда я запускаю это сейчас, я вижу в выводе, что функция emit (т.е. make-ds) вызывается для каждого входного сегмента (первый выход поступает из dump-words триггер оригинального кода):

     > lein run
     [....]
     Om -> 1
     name -> 1
     My -> 2
     a -> 1
     gone -> 1
     Coffee -> 1
     to -> 1
     get -> 1
     Time -> 1
     make-ds called
     make-ds called
     make-ds called
     make-ds called
     [....]

Однако сборка сегмента от make-ds не доходит до выходного канала, они никогда не печатаются. Если я вернусь :count-words задача к identity функция, это работает просто отлично. Кроме того, это выглядит так, как будто функция emit вызывается для каждого входного сегмента, тогда как я ожидаю, что она будет вызываться только тогда, когда пороговое условие истинно (т. Е. Всякий раз, когда 5 элементов были агрегированы в окне).

В качестве теста для этой функциональности в базе кода Onyx (onyx.windowing.emit-aggregate-test) проходит очень хорошо, я предполагаю, что где-то совершаю глупую ошибку, но я не могу понять, что.

1 ответ

Решение

Я наконец-то увидел, что в журнале было предупреждение onxy.log как это:

[clojure.lang.ExceptionInfo: Windows cannot be checkpointed with ZooKeeper unless 
  :onyx.peer/storage.zk.insanely-allow-windowing? is set to true in the peer config.
  This should only be turned on as a development convenience.
[clojure.lang.ExceptionInfo: Handling uncaught exception thrown inside task 
  lifecycle :lifecycle/checkpoint-state. Killing the job. -> Exception type: 
  clojure.lang.ExceptionInfo. Exception message: Windows cannot be checkpointed with
  ZooKeeper unless :onyx.peer/storage.zk.insanely-allow-windowing? is set to true   in
  the peer config. This should only be turned on as a development convenience.  

Как только я установил это, я наконец получил некоторые сегменты для следующей задачи. Т.е. мне пришлось изменить одноранговый конфиг на:

(def peer-config
  {:zookeeper/address "127.0.0.1:2189"
   :onyx/tenancy-id id
   :onyx.peer/job-scheduler :onyx.job-scheduler/balanced
   :onyx.peer/storage.zk.insanely-allow-windowing? true
   :onyx.messaging/impl :aeron
   :onyx.messaging/peer-port 40200
   :onyx.messaging/bind-addr "localhost"})

Сейчас, :onyx.peer/storage.zk.insanely-allow-windowing? не похоже на хорошую вещь, чтобы сделать. Лукас Брэдстрит рекомендовал при переключении канала Clojurians Slack на контрольную точку S3.

Другие вопросы по тегам