Агенты / субъекты, подобные конструкциям в clojure, которые работают со всеми сообщениями, полученными после последнего обновления
Какой лучший способ в clojure реализовать что-то вроде актера или агента (асинхронно обновляемая, несогласованная ссылка), который выполняет следующее?
- получает отправленные сообщения / данные
- выполняет некоторую функцию над этими данными для получения нового состояния; что-то вроде
(fn [state new-msgs] ...)
- продолжает получать сообщения / данные во время этого обновления
- по завершении этого обновления запускает ту же функцию обновления для всех сообщений, отправленных в промежуточный период.
Агент, кажется, не совсем здесь. Надо одновременно send
функция и данные для агентов, которые не оставляют места для функции, которая работает со всеми данными, которые были введены во время последнего обновления. Цель неявно требует разделения функций и данных.
Модель актера, как правило, лучше подходит в том смысле, что существует разделение функций и данных. Тем не менее, все действующие структуры, о которых я знаю, полагают, что каждое отправленное сообщение будет обрабатываться отдельно. Непонятно, как можно было бы перевернуть это, не добавляя лишних машин. Я знаю, что актеры Пульсара принимают :lifecycle-handle
функция, которая может быть использована для того, чтобы актеры делали "специальные трюки", но документации по этому вопросу не так много, поэтому неясно, будет ли полезна эта функциональность.
У меня есть решение этой проблемы с помощью агентов, core.async
каналы и watch
функции, но это немного грязно, и я надеюсь, что есть лучшее решение. Я опубликую это как решение на тот случай, если другие найдут его полезным, но я бы хотел посмотреть, что придут другие.
3 ответа
Вот решение, которое я придумал, используя агенты, каналы core.async и функции наблюдения. Опять же, это немного грязно, но делает то, что мне нужно сейчас. Вот оно, в общих чертах:
(require '[clojure.core.async :as async :refer [>!! <!! >! <! chan go]])
; We'll call this thing a queued-agent
(defprotocol IQueuedAgent
(enqueue [this message])
(ping [this]))
(defrecord QueuedAgent [agent queue]
IQueuedAgent
(enqueue [_ message]
(go (>! queue message)))
(ping [_]
(send agent identity)))
; Need a function for draining a core async channel of all messages
(defn drain! [c]
(let [cc (chan 1)]
(go (>! cc ::queue-empty))
(letfn
; This fn does all the hard work, but closes over cc to avoid reconstruction
[(drainer! [c]
(let [[v _] (<!! (go (async/alts! [c cc] :priority true)))]
(if (= v ::queue-empty)
(lazy-seq [])
(lazy-seq (cons v (drainer! c))))))]
(drainer! c))))
; Constructor function
(defn queued-agent [& {:keys [buffer update-fn init-fn error-handler-builder] :or {:buffer 100}}]
(let [q (chan buffer)
a (agent (if init-fn (init-fn) {}))
error-handler-fn (error-handler-builder q a)]
; Set up the queue, and watcher which runs the update function when there is new data
(add-watch
a
:update-conv
(fn [k r o n]
(let [queued (drain! q)]
(when-not (empty? queued)
(send a update-fn queued error-handler-fn)))))
(QueuedAgent. a q)))
; Now we can use these like this
(def a (queued-agent
:init-fn (fn [] {:some "initial value"})
:update-fn (fn [a queued-data error-handler-fn]
(println "Receiving data" queued-data)
; Simulate some work/load on data
(Thread/sleep 2000)
(println "Done with work; ready to queue more up!"))
; This is a little warty at the moment, but closing over the queue and agent lets you requeue work on
; failure so you can try again.
:error-handler-builder
(fn [q a] (println "do something with errors"))))
(defn -main []
(doseq [i (range 10)]
(enqueue a (str "data" i))
(Thread/sleep 500) ; simulate things happening
; This part stinks... have to manually let the queued agent know that we've queued some things up for it
(ping a)))
Как вы заметите, необходимость пинговать агента с очередями здесь каждый раз, когда добавляются новые данные, довольно проблематична. Это определенно чувствует, что вещи выворачиваются из типичного использования.
Агенты обратны тому, что вы хотите здесь - они представляют собой значение, которое получает отправленные функции обновления. Это проще всего с очередью и потоком. Для удобства я использую future
построить поток.
user> (def q (java.util.concurrent.LinkedBlockingDeque.))
#'user/q
user> (defn accumulate
[summary input]
(let [{vowels true consonents false}
(group-by #(contains? (set "aeiouAEIOU") %) input)]
(-> summary
(update-in [:vowels] + (count vowels))
(update-in [:consonents] + (count consonents)))))
#'user/accumulate
user> (def worker
(future (loop [summary {:vowels 0 :consonents 0} in-string (.take q)]
(if (not in-string)
summary
(recur (accumulate summary in-string)
(.take q))))))
#'user/worker
user> (.add q "hello")
true
user> (.add q "goodbye")
true
user> (.add q false)
true
user> @worker
{:vowels 5, :consonents 7}
Я придумал что-то ближе к актеру, вдохновленный актерами Тима Болдриджа (актеры 16). Я думаю, что это решает проблему намного более чисто.
(defmacro take-all! [c]
`(loop [acc# []]
(let [[v# ~c] (alts! [~c] :default nil)]
(if (not= ~c :default)
(recur (conj acc# v#))
acc#))))
(defn eager-actor [f]
(let [msgbox (chan 1024)]
(go (loop [f f]
(let [first-msg (<! msgbox) ; do this so we park efficiently, and only
; run when there are actually messages
msgs (take-all! msgbox)
msgs (concat [first-msg] msgs)]
(recur (f msgs)))))
msgbox))
(let [a (eager-actor (fn f [ms]
(Thread/sleep 1000) ; simulate work
(println "doing something with" ms)
f))]
(doseq [i (range 20)]
(Thread/sleep 300)
(put! a i)))
;; =>
;; doing something with (0)
;; doing something with (1 2 3)
;; doing something with (4 5 6)
;; doing something with (7 8 9 10)
;; doing something with (11 12 13)