Нерестовые задания с нестатическим временем жизни в Токио
У меня есть ядро Tokio, основной задачей которого является запуск веб-сокета (клиента). Когда я получаю некоторые сообщения от сервера, я хочу выполнить новую задачу, которая обновит некоторые данные. Ниже приведен минимальный ошибочный пример:
use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;
struct Client {
handle: Handle,
data: usize,
}
impl Client {
fn update_data(&mut self) {
// spawn a new task that updates the data
self.handle.spawn(future::ok(()).and_then(|x| {
self.data += 1; // error here
future::ok(())
}));
}
}
fn main() {
let mut runtime = Core::new().unwrap();
let mut client = Client {
handle: runtime.handle(),
data: 0,
};
let task = future::ok::<(), ()>(()).and_then(|_| {
// under some conditions (omitted), we update the data
client.update_data();
future::ok::<(), ()>(())
});
runtime.run(task).unwrap();
}
Который производит эту ошибку:
error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
--> src/main.rs:13:21
|
13 | self.handle.spawn(future::ok(()).and_then(|x| {
| ^^^^^
|
= note: type must satisfy the static lifetime
Проблема в том, что новые задачи, которые создаются через дескриптор, должны быть статическими. Та же проблема описана здесь. К сожалению, мне неясно, как я могу решить эту проблему. Даже некоторые попытки с и Arc
и Mutex
(что действительно не должно быть нужно для однопоточного приложения), я потерпел неудачу.
Поскольку события в ландшафте Токио происходят довольно быстро, мне интересно, каково текущее лучшее решение. У вас есть какие-нибудь предложения?
редактировать
Решение Peter Hall работает для примера выше. К сожалению, когда я построил неудачный пример, я сменил реактор Tokio, думая, что они будут похожи. С помощью tokio::runtime::current_thread
use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};
struct Client {
handle: Handle,
data: Rc<Cell<usize>>,
}
impl Client {
fn update_data(&mut self) {
// spawn a new task that updates the data
let mut data = Rc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
data.set(data.get() + 1);
future::ok(())
}));
}
}
fn main() {
// let mut runtime = Core::new().unwrap();
let mut runtime = Builder::new().build().unwrap();
let mut client = Client {
handle: runtime.handle(),
data: Rc::new(Cell::new(1)),
};
let task = future::ok::<(), ()>(()).and_then(|_| {
// under some conditions (omitted), we update the data
client.update_data();
future::ok::<(), ()>(())
});
runtime.block_on(task).unwrap();
}
Я получаю:
error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21
|
17 | self.handle.spawn(future::ok(()).and_then(move |_x| {
| ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
Так что кажется, что в этом случае мне нужно Arc
и Mutex
хотя весь код однопоточный?
1 ответ
В однопоточной программе вам не нужно использовать Arc
; Rc
достаточно:
use std::{rc::Rc, cell::Cell};
struct Client {
handle: Handle,
data: Rc<Cell<usize>>,
}
impl Client {
fn update_data(&mut self) {
let data = Rc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
data.set(data.get() + 1);
future::ok(())
}));
}
}
Дело в том, что вам больше не нужно беспокоиться о жизни, потому что каждый клон Rc
действует так, как будто он владеет данными, а не обращается к ним через ссылку на self
, Внутренний Cell
(или же RefCell
для не Copy
типы) необходимо, потому что Rc
не может быть разыменовано, поскольку оно было клонировано.
spawn
метод tokio::runtime::current_thread::Handle
требует, чтобы будущее Send
, что является причиной проблемы в обновлении вашего вопроса. Существует объяснение (своего рода), почему это так в этом выпуске Tokio Github.
Ты можешь использовать tokio::runtime::current_thread::spawn
вместо метода Handle
, который всегда будет запускать будущее в текущем потоке и не требует, чтобы будущее Send
, Вы можете заменить self.handle.spawn
в коде выше, и это будет работать просто отлично.
Если вам нужно использовать метод на Handle
тогда вам также нужно будет прибегнуть к Arc
а также Mutex
(или же RwLock
) для того, чтобы удовлетворить Send
Требование:
use std::sync::{Mutex, Arc};
struct Client {
handle: Handle,
data: Arc<Mutex<usize>>,
}
impl Client {
fn update_data(&mut self) {
let data = Arc::clone(&self.data);
self.handle.spawn(future::ok(()).and_then(move |_x| {
*data.lock().unwrap() += 1;
future::ok(())
}));
}
}
Если ваши данные действительно usize
Вы также можете использовать AtomicUsize
вместо Mutex<usize>
, но я лично считаю, что работать с ним так же неудобно.