Правильный способ убедиться, что менеджер соединений clj-http закрыт после выполнения всех запросов
У меня есть код, который представляет собой комбинацию clj-http
, core.async
объекты и atom
, Он создает несколько потоков для извлечения и анализа нескольких страниц:
(defn fetch-page
([url] (fetch-page url nil))
([url conn-manager]
(-> (http.client/get url {:connection-manager conn-manager})
:body hickory/parse hickory/as-hickory)))
(defn- create-worker
[url-chan result conn-manager]
(async/thread
(loop [url (async/<!! url-chan)]
(when url
(swap! result assoc url (fetch-page url conn-manager))
(recur (async/<!! url-chan))))))
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/alts!! workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
Идея состоит в том, чтобы использовать несколько потоков, чтобы сократить время выборки и анализа страниц, но я бы не хотел перегружать сервер, отправляя сразу много запросов - вот почему использовался диспетчер соединений. Я не знаю, если мой подход правильный, предложения приветствуются. В настоящее время проблема заключается в том, что последние запросы не выполняются, потому что диспетчер соединений отключается, прежде чем они завершаются: Exception in thread "async-thread-macro-15" java.lang.IllegalStateException: Connection pool shut down
,
Основные вопросы: как мне закрыть диспетчер соединений в нужный момент (и почему мой текущий код не справляется с этим)? Побочный квест: мой подход правильный? Если нет, что я могу сделать, чтобы получить и проанализировать несколько страниц одновременно, не перегружая сервер?
Спасибо!
2 ответа
Проблема в том, что async/alts!!
возвращается на первый результат (и будет продолжать делать это, так как workers
никогда не меняется). Я думаю, используя async/merge
построить канал, а затем повторно считывать его должно работать.
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))
all-workers (async/merge workers)]
; wait for workers to finish and shut conn-manager down
(dotimes [_ n-cpus] (async/<!! all-workers))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
В качестве альтернативы, вы можете повторить и продолжать сокращаться workers
вместо этого, чтобы вы ожидали только незавершенных рабочих.
(defn fetch-pages
[urls]
(let [url-chan (async/to-chan urls)
pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
conn-manager (http.conn-mgr/make-reusable-conn-manager {})
workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
(range n-cpus))]
; wait for workers to finish and shut conn-manager down
(loop [workers workers]
(when (seq workers)
(let [[_ finished-worker] (async/alts!! workers)]
(recur (filterv #(not= finished-worker %) workers)))))
(http.conn-mgr/shutdown-manager conn-manager)
(mapv #(get @pages %) urls)))
Я считаю, что Алехандро прав в отношении причины вашей ошибки, и это логично, поскольку ваша ошибка указывает на то, что вы завершили работу диспетчера подключений до того, как все запросы были выполнены, поэтому вполне вероятно, что все работники не завершили работу после его закрытия. вниз.
Другое решение, которое я предлагаю, связано с тем, что вы на самом деле ничего не делаете в своем create-worker
поток, который требует, чтобы это был канал, который неявно создается async/thread
, Таким образом, вы можете заменить его на future
, вот так:
(defn- create-worker
[url-chan result conn-manager]
(future
(loop [url (a/<!! url-chan)]
(when url
(swap! result assoc url (fetch-page url conn-manager))
(recur (a/<!! url-chan))))))
И в твоем fetch-pages
функция "join" путем разыменования:
(doseq [worker workers]
@worker) ; alternatively, use deref to specify timeout
Это устраняет значительное вмешательство core.async в то, что изначально не является проблемой core.async. Это, конечно, зависит от того, сохраняете ли вы метод сбора данных как есть, то есть swap!
на атоме, чтобы отслеживать данные страницы. Если вы должны были отправить результат fetch-page
на обратный канал, или что-то подобное, то вы хотите сохранить свой текущий thread
подход.
Что касается вашей озабоченности по поводу перегрузки сервера - вы еще не определили, что значит "перегрузить" сервер. Это имеет два измерения: одно - это скорость запросов (например, количество запросов в секунду), а другое - количество одновременных запросов. Ваше текущее приложение имеет n
рабочие потоки, и это эффективный параллелизм (вместе с настройками в диспетчере соединений). Но это никак не влияет на скорость запросов в секунду.
Это немного сложнее, чем может показаться, хотя это возможно. Вы должны учитывать сумму всех запросов, выполненных всеми потоками за единицу времени, и управление этим не является чем-то, что можно решить в одном ответе здесь. Я предлагаю вам провести некоторое исследование о троттлинге и ограничении скорости, и попробуйте, а затем перейдите оттуда к вопросам.