Как прервать или приостановить поток Rust из другого потока?
Я создал поток с бесконечной петлей и таймером внутри.
thread::spawn(|| {
let mut timer = Timer::new().unwrap();
let periodic = timer.periodic(Duration::milliseconds(200));
loop {
periodic.recv();
// Do my work here
}
});
Обратите внимание, что хотя этот пример был создан до Rust 1.0 и с тех пор определенные типы были изменены или удалены, общий вопрос и концепция остаются в силе
Через некоторое время, основанное на некоторых условиях, мне нужно прекратить этот поток из другой части моей программы. Другими словами, я просто хочу выйти из бесконечного цикла. Как я могу сделать это правильно? Кроме того, как я могу приостановить эту тему и возобновить ее позже?
Я пытался использовать глобальный небезопасный флаг, чтобы разорвать цикл, но я думаю, что это решение выглядит не очень хорошо.
2 ответа
Редактировать: обновлено для Rust 1.x
Для обеих этих задач (завершить и приостановить поток) вы можете использовать каналы.
Вот как поток может быть завершен извне:
use std::thread;
use std::time::Duration;
use std::sync::mpsc::{self, TryRecvError};
use std::io::{self, BufRead};
fn main() {
println!("Press enter to terminate the child thread");
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
loop {
println!("Working...");
thread::sleep(Duration::from_millis(500));
match rx.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
println!("Terminating.");
break;
}
Err(TryRecvError::Empty) => {}
}
}
});
let mut line = String::new();
let stdin = io::stdin();
let _ = stdin.lock().read_line(&mut line);
let _ = tx.send(());
}
То есть на каждой итерации рабочего цикла мы проверяем, уведомил ли кто-нибудь нас через канал. Если да или если другой конец канала вышел из области видимости, мы просто разрываем цикл.
Вот как поток может быть "приостановлен" и "возобновлен":
use std::time::Duration;
use std::thread;
use std::sync::mpsc;
use std::io::{self, BufRead};
fn main() {
println!("Press enter to wake up the child thread");
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
loop {
println!("Suspending...");
match rx.recv() {
Ok(_) => {
println!("Working...");
thread::sleep(Duration::from_millis(500));
}
Err(_) => {
println!("Terminating.");
break;
}
}
}
});
let mut line = String::new();
let stdin = io::stdin();
for _ in 0..4 {
let _ = stdin.lock().read_line(&mut line);
let _ = tx.send(());
}
}
Здесь мы используем recv()
метод, который приостанавливает поток до тех пор, пока что-то не поступит на канал, поэтому для возобновления потока вам просто нужно что-то отправить (значение единицы ()
в данном случае) через канал. Если передающая сторона канала отброшена, recv()
вернусь Err(())
- мы используем это, чтобы выйти из цикла.
Каналы - это самый простой и естественный (IMO) способ решения этих задач, но не самый эффективный. Есть и другие примитивы параллелизма, которые вы можете найти в std::sync
модуль. Они принадлежат более низкому уровню, чем каналы, но могут быть более эффективными в определенных задачах.
Идеальным решением был бы Кондвар. Вы могли бы пойти на wait_timeout
в модуле std::sync, как правильно указал @Vladimir -
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
// As long as the value inside the `Mutex` is false, we wait.
loop {
let result = cvar.wait_timeout_ms(started, 10).unwrap();
// 10 milliseconds have passed, or maybe the value changed!
started = result.0;
if *started == true {
// We received the notification and the value has been updated, we can leave.
break
}
}
Приведенный выше пример размещен на странице документации
Я сам несколько раз возвращался к этому вопросу, и вот что, как я думаю, касается намерений OP и других лучших практик по остановке потока. Основываясь на принятом ответе, Crossbeam - хорошее обновление до mpsc, позволяющее клонировать и перемещать конечные точки сообщений. Также есть удобная функция галочки. На самом деле здесь есть функция try_recv(), которая не блокирует работу.
Я не уверен, насколько универсально полезно было бы поместить средство проверки сообщений в середину рабочего цикла, подобного этому. Я не обнаружил, что Actix (или ранее Akka) действительно мог остановить поток без, как указано выше, без того, чтобы поток делал это сам. Итак, это то, что я использую сейчас (здесь широко открыт для исправлений, все еще учусь).
// Cargo.toml:
// [dependencies]
// crossbeam-channel = "0.4.4"
use crossbeam_channel::{Sender, Receiver, unbounded, tick};
use std::time::{Duration, Instant};
fn main() {
let (tx, rx):(Sender<String>, Receiver<String>) = unbounded();
let rx2 = rx.clone();
// crossbeam allows clone and move of receiver
std::thread::spawn(move || {
// OP:
// let mut timer = Timer::new().unwrap();
// let periodic = timer.periodic(Duration::milliseconds(200));
let ticker: Receiver<Instant> = tick(std::time::Duration::from_millis(500));
loop {
// OP:
// periodic.recv();
crossbeam_channel::select! {
recv(ticker) -> _ => {
// OP: Do my work here
println!("Hello, work.");
// Comms Check: keep doing work?
// try_recv is non-blocking
// rx, the single consumer is clone-able in crossbeam
let try_result = rx2.try_recv();
match try_result {
Err(_e) => {},
Ok(msg) => {
match msg.as_str() {
"END_THE_WORLD" => {
println!("Ending the world.");
break;
},
_ => {},
}
},
_ => {}
}
}
}
}
});
// let work continue for 10 seconds then tell that thread to end.
std::thread::sleep(std::time::Duration::from_secs(10));
println!("Goodbye, world.");
tx.send("END_THE_WORLD".to_string());
}
Использование строк в качестве средства сообщения - для меня неприятно. Можно было бы приостановить и перезапустить другие вещи в перечислении.