Агенты / субъекты, подобные конструкциям в clojure, которые работают со всеми сообщениями, полученными после последнего обновления

Какой лучший способ в clojure реализовать что-то вроде актера или агента (асинхронно обновляемая, несогласованная ссылка), который выполняет следующее?

  • получает отправленные сообщения / данные
  • выполняет некоторую функцию над этими данными для получения нового состояния; что-то вроде (fn [state new-msgs] ...)
  • продолжает получать сообщения / данные во время этого обновления
  • по завершении этого обновления запускает ту же функцию обновления для всех сообщений, отправленных в промежуточный период.

Агент, кажется, не совсем здесь. Надо одновременно send функция и данные для агентов, которые не оставляют места для функции, которая работает со всеми данными, которые были введены во время последнего обновления. Цель неявно требует разделения функций и данных.

Модель актера, как правило, лучше подходит в том смысле, что существует разделение функций и данных. Тем не менее, все действующие структуры, о которых я знаю, полагают, что каждое отправленное сообщение будет обрабатываться отдельно. Непонятно, как можно было бы перевернуть это, не добавляя лишних машин. Я знаю, что актеры Пульсара принимают :lifecycle-handle функция, которая может быть использована для того, чтобы актеры делали "специальные трюки", но документации по этому вопросу не так много, поэтому неясно, будет ли полезна эта функциональность.

У меня есть решение этой проблемы с помощью агентов, core.async каналы и watch функции, но это немного грязно, и я надеюсь, что есть лучшее решение. Я опубликую это как решение на тот случай, если другие найдут его полезным, но я бы хотел посмотреть, что придут другие.

3 ответа

Вот решение, которое я придумал, используя агенты, каналы core.async и функции наблюдения. Опять же, это немного грязно, но делает то, что мне нужно сейчас. Вот оно, в общих чертах:

(require '[clojure.core.async :as async :refer [>!! <!! >! <! chan go]])

; We'll call this thing a queued-agent
(defprotocol IQueuedAgent
  (enqueue [this message])
  (ping [this]))

(defrecord QueuedAgent [agent queue]
  IQueuedAgent
  (enqueue [_ message]
    (go (>! queue message)))
  (ping [_]
    (send agent identity)))


; Need a function for draining a core async channel of all messages
(defn drain! [c]
  (let [cc (chan 1)]
    (go (>! cc ::queue-empty))
    (letfn
      ; This fn does all the hard work, but closes over cc to avoid reconstruction
      [(drainer! [c]
         (let [[v _] (<!! (go (async/alts! [c cc] :priority true)))]
           (if (= v ::queue-empty)
             (lazy-seq [])
             (lazy-seq (cons v (drainer! c))))))]
      (drainer! c))))

; Constructor function
(defn queued-agent [& {:keys [buffer update-fn init-fn error-handler-builder] :or {:buffer 100}}]
  (let [q                (chan buffer)
        a                (agent (if init-fn (init-fn) {}))
        error-handler-fn (error-handler-builder q a)]
    ; Set up the queue, and watcher which runs the update function when there is new data
    (add-watch
      a
      :update-conv
      (fn [k r o n]
        (let [queued (drain! q)]
          (when-not (empty? queued)
            (send a update-fn queued error-handler-fn)))))
    (QueuedAgent. a q)))

; Now we can use these like this

(def a (queued-agent
         :init-fn   (fn [] {:some "initial value"})
         :update-fn (fn [a queued-data error-handler-fn]
                      (println "Receiving data" queued-data)
                      ; Simulate some work/load on data
                      (Thread/sleep 2000)
                      (println "Done with work; ready to queue more up!"))
         ; This is a little warty at the moment, but closing over the queue and agent lets you requeue work on
         ; failure so you can try again.
         :error-handler-builder
                    (fn [q a] (println "do something with errors"))))

(defn -main []
  (doseq [i (range 10)]
    (enqueue a (str "data" i))
    (Thread/sleep 500) ; simulate things happening
    ; This part stinks... have to manually let the queued agent know that we've queued some things up for it
    (ping a)))

Как вы заметите, необходимость пинговать агента с очередями здесь каждый раз, когда добавляются новые данные, довольно проблематична. Это определенно чувствует, что вещи выворачиваются из типичного использования.

Агенты обратны тому, что вы хотите здесь - они представляют собой значение, которое получает отправленные функции обновления. Это проще всего с очередью и потоком. Для удобства я использую future построить поток.

user> (def q (java.util.concurrent.LinkedBlockingDeque.)) 
#'user/q
user> (defn accumulate
        [summary input]
        (let [{vowels true consonents false}
              (group-by #(contains? (set "aeiouAEIOU") %) input)]
          (-> summary
            (update-in [:vowels] + (count vowels))
            (update-in [:consonents] + (count consonents)))))
#'user/accumulate
user> (def worker
           (future (loop [summary {:vowels 0 :consonents 0} in-string (.take q)]
                         (if (not in-string)
                             summary
                           (recur (accumulate summary in-string)
                                  (.take q))))))
#'user/worker
user> (.add q "hello")
true
user> (.add q "goodbye")
true
user> (.add q false)
true
user> @worker
{:vowels 5, :consonents 7}

Я придумал что-то ближе к актеру, вдохновленный актерами Тима Болдриджа (актеры 16). Я думаю, что это решает проблему намного более чисто.

(defmacro take-all! [c]
  `(loop [acc# []]
     (let [[v# ~c] (alts! [~c] :default nil)]
       (if (not= ~c :default)
         (recur (conj acc# v#))
         acc#))))


(defn eager-actor [f]
  (let [msgbox (chan 1024)]
    (go (loop [f f]
          (let [first-msg (<! msgbox) ; do this so we park efficiently, and only
                                      ; run when there are actually messages
                msgs      (take-all! msgbox)
                msgs      (concat [first-msg] msgs)]
            (recur (f msgs)))))
    msgbox))


(let [a (eager-actor (fn f [ms]
                       (Thread/sleep 1000) ; simulate work
                       (println "doing something with" ms)
                       f))]
  (doseq [i (range 20)]
    (Thread/sleep 300)
    (put! a i)))
;; =>
;; doing something with (0)
;; doing something with (1 2 3)
;; doing something with (4 5 6)
;; doing something with (7 8 9 10)
;; doing something with (11 12 13)
Другие вопросы по тегам