Как запомнить функцию, которая использует core.async и неблокирующее чтение канала?
Я хотел бы использовать memoize
для функции, которая использует core.async
а также <!
например
(defn foo [x]
(go
(<! (timeout 2000))
(* 2 x)))
(В реальной жизни это может быть полезно для кеширования результатов вызовов сервера)
Я смог добиться этого, написав версию memoize для core.async (почти такой же код, как и для memoize):
(defn memoize-async [f]
(let [mem (atom {})]
(fn [& args]
(go
(if-let [e (find @mem args)]
(val e)
(let [ret (<! (apply f args))]; this line differs from memoize [ret (apply f args)]
(swap! mem assoc args ret)
ret))))))
Пример использования:
(def foo-memo (memoize-async foo))
(go (println (<! (foo-memo 3)))); delay because of (<! (timeout 2000))
(go (println (<! (foo-memo 3)))); subsequent calls are memoized => no delay
Мне интересно, есть ли более простые способы достичь того же результата.
** Примечание: мне нужно решение, которое работает с <!
, За <!!
, посмотрите на этот вопрос: Как запоминать функцию, которая использует core.async и блокирует чтение канала? **
2 ответа
Вы можете использовать встроенную функцию памятки для этого. Начнем с определения метода, который читает канал и возвращает значение:
(defn wait-for [ch]
(<!! ch))
Обратите внимание, что мы будем использовать <!!
и не <!
потому что мы хотим этот функциональный блок, пока не будет данных на канале во всех случаях. <!
это поведение проявляется только при использовании в форме внутри блока go.
Затем вы можете создать свою запомненную функцию, составив эту функцию с foo
вот так:
(def foo-memo (memoize (comp wait-for foo)))
foo
возвращает канал, так wait-for
будет блокироваться, пока этот канал не будет иметь значение (то есть, пока операция внутри foo
законченный).
foo-memo
можно использовать аналогично вашему примеру выше, за исключением того, что вам не нужен вызов <!
так как wait-for
заблокирует для вас:
(go (println (foo-memo 3))
Вы также можете вызвать это вне блока go, и он будет вести себя так, как вы ожидаете (т.е. блокировать вызывающий поток до тех пор, пока foo не вернется).
Это было немного сложнее, чем я ожидал. Ваше решение не является правильным, потому что, когда вы снова вызываете свою запомненную функцию с теми же аргументами, раньше, чем первый запуск завершит выполнение блока go, вы снова вызовете ее и получите ошибку. Это часто бывает, когда вы обрабатываете списки с помощью core.async.
В приведенном ниже примере для решения этой проблемы используется pub/sub из core.async (протестировано только в CLJS):
(def lookup-sentinel #?(:clj ::not-found :cljs (js-obj))
(def pending-sentinel #?(:clj ::pending :cljs (js-obj))
(defn memoize-async
[f]
(let [>in (chan)
pending (pub >in :args)
mem (atom {})]
(letfn
[(memoized [& args]
(go
(let [v (get @mem args lookup-sentinel)]
(condp identical? v
lookup-sentinel
(do
(swap! mem assoc args pending-sentinel)
(go
(let [ret (<! (apply f args))]
(swap! mem assoc args ret)
(put! >in {:args args :ret ret})))
(<! (apply memoized args)))
pending-sentinel
(let [<out (chan 1)]
(sub pending args <out)
(:ret (<! <out)))
v))))]
memoized)))
ПРИМЕЧАНИЕ: это, вероятно, утечка памяти, подписок и <out
каналы не закрыты
Я использовал эту функцию в одном из своих проектов для кеширования HTTP-вызовов. Функция кэширует результаты в течение заданного времени и использует барьер для предотвращения выполнения функции несколько раз, когда кеш "холодный" (из-за переключения контекста внутри блока go).
(defn memoize-af-until
[af ms clock]
(let [barrier (async/chan 1)
last-return (volatile! nil)
last-return-ms (volatile! nil)]
(fn [& args]
(async/go
(>! barrier :token)
(let [now-ms (.now clock)]
(when (or (not @last-return-ms) (< @last-return-ms (- now-ms ms)))
(vreset! last-return (<! (apply af args)))
(vreset! last-return-ms now-ms))
(<! barrier)
@last-return)))))
Вы можете проверить, что он работает правильно, установив время кеширования на 0 и заметив, что два вызова функций занимают примерно 10 секунд. Без барьера два вызова завершились бы одновременно:
(def memo (memoize-af-until #(async/timeout 5000) 0 js/Date))
(async/take! (memo) #(println "[:a] Finished"))
(async/take! (memo) #(println "[:b] Finished"))