Самый простой способ использовать обратный вызов A / O в экземплярах http-kit/get
Я запускаю несколько сотен одновременно http-kit.client/get
запросы снабжены обратным вызовом для записи результатов в один файл.
Что было бы хорошим способом борьбы с безопасностью потоков? С помощью chan
а также <!!
от core.asyc
?
Вот код, который я хотел бы рассмотреть:
(defn launch-async [channel url]
(http/get url {:timeout 5000
:user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}
(fn [{:keys [status headers body error]}]
(if error
(put! channel (json/generate-string {:url url :headers headers :status status}))
(put! channel (json/generate-string body))))))
(defn process-async [channel func]
(when-let [response (<!! channel)]
(func response)))
(defn http-gets-async [func urls]
(let [channel (chan)]
(doall (map #(launch-async channel %) urls))
(process-async channel func)))
Спасибо за ваши идеи.
2 ответа
Поскольку вы уже используете core.async в своем примере, я подумал, что укажу несколько проблем и как вы можете их решить. В другом ответе упоминается более базовый подход, и я искренне согласен с тем, что более простой подход вполне подходит. Однако, с каналами, у вас есть простой способ потребления данных, который не включает отображение по вектору, который также будет увеличиваться со временем, если у вас будет много ответов. Рассмотрим следующие проблемы и как мы можем их исправить:
(1) Ваша текущая версия потерпит крах, если ваш список URL содержит более 1024 элементов. Есть внутренний буфер для операций ввода-вывода, которые являются асинхронными (т.е. put!
а также take!
не блокируйте, но всегда возвращайте немедленно), и ограничение составляет 1024. Это необходимо для предотвращения неограниченного асинхронного использования канала. Чтобы убедиться в этом, позвоните (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com"))
,
То, что вы хотите сделать, это размещать что-то на канале, только когда есть место для этого. Это называется противодавлением. Взяв страницу из превосходного wiki on go, заблокируйте лучшие практики, один умный способ сделать это с помощью обратного вызова http-kit - это использовать put!
опция обратного вызова для запуска вашего следующего http get; это произойдет только тогда, когда put!
сразу же успешно, так что у вас никогда не будет ситуации, когда вы можете выйти за пределы буфера канала:
(defn launch-async
[channel [url & urls]]
(when url
(http/get url {:timeout 5000
:user-agent "Mozilla"}
(fn [{:keys [status headers body error]}]
(let [put-on-chan (if error
(json/generate-string {:url url :headers headers :status status})
(json/generate-string body))]
(put! channel put-on-chan (fn [_] (launch-async channel urls))))))))
(2) Затем вы, похоже, обрабатываете только один ответ. Вместо этого используйте go-loop:
(defn process-async
[channel func]
(go-loop []
(when-let [response (<! channel)]
(func response)
(recur))))
(3) Вот твой http-gets-async
функция. Я не вижу никакого вреда в добавлении буфера здесь, так как это должно помочь вам запустить хороший пакет запросов в начале:
(defn http-gets-async
[func urls]
(let [channel (chan 1000)]
(launch-async channel urls)
(process-async channel func)))
Теперь у вас есть возможность обрабатывать бесконечное количество URL с обратным давлением. Чтобы проверить это, определите счетчик, а затем заставьте свою функцию обработки увеличивать этот счетчик, чтобы увидеть ваш прогресс. Используя локальный URL-адрес, на который легко наброситься (не рекомендуется запускать сотни тысяч запросов, скажем, в Google и т. Д.):
(def responses (atom 0))
(http-gets-async (fn [_] (swap! responses inc))
(repeat 1000000 "http://localhost:8000"))
Поскольку все это асинхронно, ваша функция вернется немедленно, и вы можете посмотреть на @responses
расти.
Еще одна интересная вещь, которую вы можете сделать, это вместо запуска вашей функции обработки в process-async
Вы можете при желании применить его в качестве датчика на самом канале.
(defn process-async
[channel]
(go-loop []
(when-let [_ (<! channel)]
(recur))))
(defn http-gets-async
[func urls]
(let [channel (chan 10000 (map func))] ;; <-- transducer on channel
(launch-async channel urls)
(process-async channel)))
Есть много способов сделать это, включая построение так, чтобы канал закрывался (обратите внимание, что выше, он остается открытым). У тебя есть java.util.concurrent
Примитивы помогут в этом отношении, если вам нравится, и ими довольно легко пользоваться. Возможностей очень много.
Это достаточно просто, чтобы я не использовал для этого core.async. Вы можете сделать это с хранением атома, используя вектор ответов, а затем создать отдельный поток, читающий содержимое атома, пока не будут просмотрены все ответы. Затем, в вашем обратном вызове http-kit, вы можете просто swap!
ответ в атом прямо.
Если вы хотите использовать core.async, я бы рекомендовал буферизованный канал, чтобы не блокировать пул потоков http-kit.