Как запомнить функцию, которая использует core.async и неблокирующее чтение канала?

Я хотел бы использовать memoize для функции, которая использует core.async а также <! например

(defn foo [x]
  (go
    (<! (timeout 2000))
    (* 2 x)))

(В реальной жизни это может быть полезно для кеширования результатов вызовов сервера)

Я смог добиться этого, написав версию memoize для core.async (почти такой же код, как и для memoize):

(defn memoize-async [f]
  (let [mem (atom {})]
    (fn [& args]
      (go
        (if-let [e (find @mem args)]
          (val e)
         (let [ret (<! (apply f args))]; this line differs from memoize [ret (apply f args)]
            (swap! mem assoc args ret)
            ret))))))

Пример использования:

(def foo-memo (memoize-async foo))
(go (println (<! (foo-memo 3)))); delay because of (<! (timeout 2000))

(go (println (<! (foo-memo 3)))); subsequent calls are memoized => no delay

Мне интересно, есть ли более простые способы достичь того же результата.

** Примечание: мне нужно решение, которое работает с <!, За <!!, посмотрите на этот вопрос: Как запоминать функцию, которая использует core.async и блокирует чтение канала? **

2 ответа

Вы можете использовать встроенную функцию памятки для этого. Начнем с определения метода, который читает канал и возвращает значение:

 (defn wait-for [ch]
      (<!! ch))

Обратите внимание, что мы будем использовать <!! и не <! потому что мы хотим этот функциональный блок, пока не будет данных на канале во всех случаях. <! это поведение проявляется только при использовании в форме внутри блока go.

Затем вы можете создать свою запомненную функцию, составив эту функцию с fooвот так:

(def foo-memo (memoize (comp wait-for foo)))

foo возвращает канал, так wait-for будет блокироваться, пока этот канал не будет иметь значение (то есть, пока операция внутри foo законченный).

foo-memo можно использовать аналогично вашему примеру выше, за исключением того, что вам не нужен вызов <! так как wait-for заблокирует для вас:

(go (println (foo-memo 3))

Вы также можете вызвать это вне блока go, и он будет вести себя так, как вы ожидаете (т.е. блокировать вызывающий поток до тех пор, пока foo не вернется).

Это было немного сложнее, чем я ожидал. Ваше решение не является правильным, потому что, когда вы снова вызываете свою запомненную функцию с теми же аргументами, раньше, чем первый запуск завершит выполнение блока go, вы снова вызовете ее и получите ошибку. Это часто бывает, когда вы обрабатываете списки с помощью core.async.

В приведенном ниже примере для решения этой проблемы используется pub/sub из core.async (протестировано только в CLJS):

(def lookup-sentinel  #?(:clj ::not-found :cljs (js-obj))
(def pending-sentinel #?(:clj ::pending   :cljs (js-obj))

(defn memoize-async
  [f]
  (let [>in (chan)
        pending (pub >in :args)
        mem (atom {})]
    (letfn
        [(memoized [& args]
           (go
             (let [v (get @mem args lookup-sentinel)]
               (condp identical? v
                 lookup-sentinel
                 (do
                   (swap! mem assoc args pending-sentinel)
                   (go
                     (let [ret (<! (apply f args))]
                       (swap! mem assoc args ret)
                       (put! >in {:args args :ret ret})))
                   (<! (apply memoized args)))
                 pending-sentinel
                 (let [<out (chan 1)]
                   (sub pending args <out)
                   (:ret (<! <out)))
                 v))))]
        memoized)))

ПРИМЕЧАНИЕ: это, вероятно, утечка памяти, подписок и <out каналы не закрыты

Я использовал эту функцию в одном из своих проектов для кеширования HTTP-вызовов. Функция кэширует результаты в течение заданного времени и использует барьер для предотвращения выполнения функции несколько раз, когда кеш "холодный" (из-за переключения контекста внутри блока go).

(defn memoize-af-until
  [af ms clock]
  (let [barrier (async/chan 1)
        last-return (volatile! nil)
        last-return-ms (volatile! nil)]
    (fn [& args]
      (async/go
        (>! barrier :token)
        (let [now-ms (.now clock)]
          (when (or (not @last-return-ms) (< @last-return-ms (- now-ms ms)))
            (vreset! last-return (<! (apply af args)))
            (vreset! last-return-ms now-ms))
          (<! barrier)
          @last-return)))))

Вы можете проверить, что он работает правильно, установив время кеширования на 0 и заметив, что два вызова функций занимают примерно 10 секунд. Без барьера два вызова завершились бы одновременно:

(def memo (memoize-af-until #(async/timeout 5000) 0 js/Date))
(async/take! (memo) #(println "[:a] Finished"))
(async/take! (memo) #(println "[:b] Finished"))
Другие вопросы по тегам