Как я могу использовать hyper::client из другого потока?
У меня есть несколько потоков, выполняющих некоторые тяжелые операции, и мне нужно использовать клиента в середине работы. Я использую Hyper v0.11 в качестве HTTP-клиента, и я хотел бы повторно использовать подключения, поэтому мне нужно поделиться тем же hyper::Client
для того, чтобы держать открытыми соединения (под keep-alive
Режим).
Клиент не может делиться между потоками (он не реализует Sync
или же Send
). Вот небольшой фрагмент кода, который я пытался сделать:
let mut core = Core::new().expect("Create Client Event Loop");
let handle = core.handle();
let remote = core.remote();
let client = Client::new(&handle.clone());
thread::spawn(move || {
// intensive operations...
let response = &client.get("http://google.com".parse().unwrap()).and_then(|res| {
println!("Response: {}", res.status());
Ok(())
});
remote.clone().spawn(|_| {
response.map(|_| { () }).map_err(|_| { () })
});
// more intensive operations...
});
core.run(futures::future::empty::<(), ()>()).unwrap();
Этот код не компилируется:
thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Weak<std::cell::RefCell<tokio_core::reactor::Inner>>`
thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<tokio_proto::util::client_proxy::ClientProxy<tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RequestLine>, hyper::Body>, tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RawStatus>, tokio_proto::streaming::body::Body<hyper::Chunk, hyper::Error>>, hyper::Error>>>>`
...
remote.clone().spawn(|_| {
^^^^^ the trait `std::marker::Sync` is not implemented for `futures::Future<Error=hyper::Error, Item=hyper::Response> + 'static`
Есть ли способ повторно использовать один и тот же клиент из разных потоков или какой-то другой подход?
1 ответ
Короткий ответ - нет, но так лучше.
каждый Client
Объект содержит пул соединений. Вот как Гипер Pool
определено в версии 0.11.0:
pub struct Pool<T> {
inner: Rc<RefCell<PoolInner<T>>>,
}
Как inner
подсчитывается с помощью Rc
и заимствовано во время выполнения с RefCell
, бассейн, конечно, не потокобезопасный. Когда вы пытались переместить это Client
для нового потока этот объект будет содержать пул, который находится в другом потоке, который был бы источником гонок данных.
Эта реализация понятна. Попытка повторно использовать HTTP-соединение в нескольких потоках не очень обычна, поскольку требует синхронизированного доступа к ресурсу, который в основном требует интенсивного ввода-вывода. Это очень хорошо сочетается с асинхронной природой Токио. На самом деле более разумно выполнять несколько запросов в одном потоке и позволить ядру Tokio позаботиться об отправке сообщений и получении их асинхронно, не ожидая каждого ответа в последовательности. Более того, вычислительные задачи могут выполняться пулом ЦП из futures_cpupool
, Имея это в виду, код ниже работает нормально:
extern crate tokio_core;
extern crate hyper;
extern crate futures;
extern crate futures_cpupool;
use tokio_core::reactor::Core;
use hyper::client::Client;
use futures::Future;
use futures_cpupool::CpuPool;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle.clone());
let pool = CpuPool::new(1);
println!("Begin!");
let req = client.get("http://google.com".parse().unwrap())
.and_then(|res| {
println!("Response: {}", res.status());
Ok(())
});
let intensive = pool.spawn_fn(|| {
println!("I'm working hard!!!");
std::thread::sleep(std::time::Duration::from_secs(1));
println!("Phew!");
Ok(())
});
let task = req.join(intensive)
.map(|_|{
println!("End!");
});
core.run(task).unwrap();
}
Если ответ не получен слишком поздно, вывод будет:
Begin!
I'm working hard!!!
Response: 302 Found
Phew!
End!
Если у вас есть несколько задач, запущенных в разных потоках, проблема становится открытой, так как возможно несколько архитектур. Одним из них является делегирование всех сообщений одному действующему субъекту, что требует от всех остальных рабочих потоков отправлять на него свои данные. Кроме того, вы можете иметь один клиентский объект для каждого работника, таким образом, также иметь отдельные пулы соединений.