Асинхронные / многопоточные запросы работают не так, как ожидалось

Во-первых, я пишу свой код на Clojure, но я не думаю, что этот вопрос имеет непосредственное отношение к Clojure, потому что я использовал библиотеку Java, которая делала запросы для меня (вместо библиотеки Clojure, которую я использую сейчас), и проблема все еще была. Вот почему я не установил тег Clojure для этого поста.

Кроме того, у меня слишком много кода и я не могу опубликовать все это здесь, потому что это займет слишком много времени, чтобы понять все это. Поэтому я попытаюсь объяснить свой код словами, лог-сообщениями и небольшим псевдо-замыканием.


У меня есть система, которая должна отправлять запросы на сервер. В любое время может быть создан новый Request, Request это просто структура данных, которая содержит необходимые данные для отправки запроса на сервер.

Кроме того, сервер имеет API call-limit, поэтому я не могу спамить с запросами и мне нужно ждать около 200 миллисекунд после каждого звонка. Некоторые запросы могут занимать 500 мс, поэтому для повышения производительности я не хочу ждать окончания каждого вызова и хочу отправить еще один запрос через 200 мс после того, как был отправлен предыдущий. Из-за всего этого я создал RequestMaster,

RequestMaster имеет queue где он получает все ожидающие Requests, Есть один поток, где он выполняет свою работу:

1) ждет 100 мс после предыдущего Request было отправлено
2) Подбирает наиболее приоритетные Request от queue
3) принимает параметр func от Request и называет это
3.1) func является закрытием, которое подготавливает необходимые данные запроса для вызова. Это включает в себя nonce параметр.
3.2) Наконец, func отправить подготовленный запрос на сервер
4) RequestMaster создает обратный вызов для доступа к результату только что отправленного запроса.
5) Пишет запрос времени звонка
6) петля

Капсульная версия RequestMaster цикл (На самом деле это агент. Неважно).

(fn [queue*]
  (wait-for-call-available)
  ;; Find the most prior request
  (let [req (get-most-prior-req queue*)
    ;; Do req
    (let [resp* ((:func req))]
      (dosync
        (ref-set (:last-call* rm-state) (current-millis)))
      (Thread/sleep 10)
      (future (handle-response req resp*)))
  queue*)]

Затем для демонстрации я создаю клиента и вызываю 10 запросов одновременно.

(def callbacks [(get-balances client)
                (get-balances client)
                (get-balances client)
                (get-balances client)
                (get-balances client)
                (get-balances client)
                (get-balances client)
                (get-balances client)
                (get-balances client)
                (get-balances client)])

После этого кода 10 запросов добавляются в queue и ждут своей очереди. Сейчас я использую clj-http для отправки запросов. И прямо перед вызовом функции clj-http (client/post) Я использую ведение журнала, чтобы увидеть, есть ли ошибка с ожиданием после каждого запроса:

"doreq: " 1509365666674
"doreq: " 1509365666881
"doreq: " 1509365667089
"doreq: " 1509365667347
"doreq: " 1509365667552
"doreq: " 1509365667762
"doreq: " 1509365667966
"doreq: " 1509365668178
"doreq: " 1509365668383
"doreq: " 1509365668588

Вы можете видеть, что каждый запрос ожидает около 200 мс после предыдущего. Все отлично. Но я также вижу:

An exception was thrown:
{:error "Nonce must be greater than 1509365668178. You provided 1509365667966.", 
 :params {"command" "returnCompleteBalances", 
          "nonce"   1509365667967}}

Понятно, что запрос, который нужно было отправить в...7966ms, почему-то ждал>200мс и пытался отправить его после того, как следующий запрос уже был отправлен.

Я пытался использовать clj-http без {:async? true}, Только что перезвонил с (promise) и выполнял блокирующие звонки внутри фьючерсов. Результат тот же.

Я пытался использовать библиотеку XChange, и эта библиотека выполняла запросы, а не clj-http. Все тот же.

Я совершенно ничего не понимаю. Кажется, что поток, который должен отправить запрос, готовит все данные, готов отправить запрос, но внезапно говорит: "О, я так устал, даем ждать 200 мс!!" и позволяет отправить следующий запрос.

Кто-нибудь видит что-то знакомое? Может быть, я не понимаю что-то фундаментальное в JVM? Потому что все должно быть хорошо, если верить журналам.

Я предоставлю больше информации, если вы скажете мне, что это необходимо

(Кстати, извините за мой дерьмовый английский. Надеюсь, вы поняли, что отправка была отправлена, что была отправлена ​​...)

0 ответов

Другие вопросы по тегам