Как создать канал из другого с преобразователями?

Я хочу создать канал clojure.core.async от другого, который просто фильтрует конкретные сообщения. Поэтому я нашел функцию под названием фильтр<.

=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2

Но функция и ее друзья помечены как устаревшие:

Устаревший - эта функция будет удалена. Вместо этого используйте преобразователь

Есть несколько способов использования каналов с преобразователем, таких как chan с xform параметр. Как я могу построить новый канал из существующего, используя преобразователи?

1 ответ

Решение

Я провел некоторое исследование по этому вопросу, нашел пару интересных статей ( первую и вторую), а затем получил кое-что, используя pipeline

(require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
(def c1 (chan))
(def c2 (chan))

(pipeline 4 c2 (filter even?) c1)

(put! c1 1)
(put! c1 2)
(<!! c2)
;;=> 2

Вторая статья, на которую я ссылаюсь, делает это немного чище с некоторыми вспомогательными функциями вокруг функции конвейера:

(defn ncpus []
  (.availableProcessors (Runtime/getRuntime)))

(defn parallelism []
  (+ (ncpus) 1))

(defn add-transducer
  [in xf]
  (let [out (chan (buffer 16))]
    (pipeline (parallelism) out xf in)
    out))

Тогда вы можете просто связать каналы вместе с

(def c1 (chan))
(def c2 (add-transducer c1 (filter even?))

Чтобы завершить ответ, вы сами можете использовать pipe аналогичным образом:

(defn pipe-trans
  [ci xf]
  (let [co (chan 1 xf)]
    (pipe ci co)
    co))
(def c1 (chan))
(def c2 (pipe-trans c1 (filter even?)))
Другие вопросы по тегам