Самый простой способ использовать обратный вызов A / O в экземплярах http-kit/get

Я запускаю несколько сотен одновременно http-kit.client/get запросы снабжены обратным вызовом для записи результатов в один файл.

Что было бы хорошим способом борьбы с безопасностью потоков? С помощью chanа также <!! от core.asyc?

Вот код, который я хотел бы рассмотреть:

(defn launch-async [channel url]                                                                                                                                
  (http/get url {:timeout 5000                                                                                                                                  
                 :user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}                                              
          (fn [{:keys [status headers body error]}]                                                                                                             
            (if error                                                                                                                                           
              (put! channel (json/generate-string {:url url :headers headers :status status}))                                                                  
              (put! channel (json/generate-string body))))))                                                                                                    

(defn process-async [channel func]                                                                                                                              
  (when-let [response (<!! channel)]                                                                                                                            
    (func response)))                                                                                                                                           

(defn http-gets-async [func urls]                                                                                                                               
  (let [channel (chan)]                                                                                                                                         
    (doall (map #(launch-async channel %) urls))                                                                                                                
    (process-async channel func)))    

Спасибо за ваши идеи.

2 ответа

Решение

Поскольку вы уже используете core.async в своем примере, я подумал, что укажу несколько проблем и как вы можете их решить. В другом ответе упоминается более базовый подход, и я искренне согласен с тем, что более простой подход вполне подходит. Однако, с каналами, у вас есть простой способ потребления данных, который не включает отображение по вектору, который также будет увеличиваться со временем, если у вас будет много ответов. Рассмотрим следующие проблемы и как мы можем их исправить:

(1) Ваша текущая версия потерпит крах, если ваш список URL содержит более 1024 элементов. Есть внутренний буфер для операций ввода-вывода, которые являются асинхронными (т.е. put! а также take! не блокируйте, но всегда возвращайте немедленно), и ограничение составляет 1024. Это необходимо для предотвращения неограниченного асинхронного использования канала. Чтобы убедиться в этом, позвоните (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com")),

То, что вы хотите сделать, это размещать что-то на канале, только когда есть место для этого. Это называется противодавлением. Взяв страницу из превосходного wiki on go, заблокируйте лучшие практики, один умный способ сделать это с помощью обратного вызова http-kit - это использовать put! опция обратного вызова для запуска вашего следующего http get; это произойдет только тогда, когда put! сразу же успешно, так что у вас никогда не будет ситуации, когда вы можете выйти за пределы буфера канала:

(defn launch-async
  [channel [url & urls]]
  (when url
    (http/get url {:timeout 5000
                   :user-agent "Mozilla"}
              (fn [{:keys [status headers body error]}]
                (let [put-on-chan (if error
                                    (json/generate-string {:url url :headers headers :status status})
                                    (json/generate-string body))]
                  (put! channel put-on-chan (fn [_] (launch-async channel urls))))))))

(2) Затем вы, похоже, обрабатываете только один ответ. Вместо этого используйте go-loop:

(defn process-async
  [channel func]
  (go-loop []
    (when-let [response (<! channel)]
      (func response)
      (recur))))

(3) Вот твой http-gets-async функция. Я не вижу никакого вреда в добавлении буфера здесь, так как это должно помочь вам запустить хороший пакет запросов в начале:

(defn http-gets-async
  [func urls]
  (let [channel (chan 1000)]
    (launch-async channel urls)
    (process-async channel func)))

Теперь у вас есть возможность обрабатывать бесконечное количество URL с обратным давлением. Чтобы проверить это, определите счетчик, а затем заставьте свою функцию обработки увеличивать этот счетчик, чтобы увидеть ваш прогресс. Используя локальный URL-адрес, на который легко наброситься (не рекомендуется запускать сотни тысяч запросов, скажем, в Google и т. Д.):

(def responses (atom 0))
(http-gets-async (fn [_] (swap! responses inc))
                 (repeat 1000000 "http://localhost:8000"))

Поскольку все это асинхронно, ваша функция вернется немедленно, и вы можете посмотреть на @responses расти.

Еще одна интересная вещь, которую вы можете сделать, это вместо запуска вашей функции обработки в process-asyncВы можете при желании применить его в качестве датчика на самом канале.

(defn process-async
  [channel]
  (go-loop []
    (when-let [_ (<! channel)]
      (recur))))

(defn http-gets-async
  [func urls]
  (let [channel (chan 10000 (map func))] ;; <-- transducer on channel
    (launch-async channel urls)
    (process-async channel)))

Есть много способов сделать это, включая построение так, чтобы канал закрывался (обратите внимание, что выше, он остается открытым). У тебя есть java.util.concurrent Примитивы помогут в этом отношении, если вам нравится, и ими довольно легко пользоваться. Возможностей очень много.

Это достаточно просто, чтобы я не использовал для этого core.async. Вы можете сделать это с хранением атома, используя вектор ответов, а затем создать отдельный поток, читающий содержимое атома, пока не будут просмотрены все ответы. Затем, в вашем обратном вызове http-kit, вы можете просто swap! ответ в атом прямо.

Если вы хотите использовать core.async, я бы рекомендовал буферизованный канал, чтобы не блокировать пул потоков http-kit.

Другие вопросы по тегам