Как прервать или приостановить поток Rust из другого потока?

Я создал поток с бесконечной петлей и таймером внутри.

thread::spawn(|| {
    let mut timer = Timer::new().unwrap();
    let periodic = timer.periodic(Duration::milliseconds(200));
    loop {
        periodic.recv();

        // Do my work here
    }
});

Обратите внимание, что хотя этот пример был создан до Rust 1.0 и с тех пор определенные типы были изменены или удалены, общий вопрос и концепция остаются в силе

Через некоторое время, основанное на некоторых условиях, мне нужно прекратить этот поток из другой части моей программы. Другими словами, я просто хочу выйти из бесконечного цикла. Как я могу сделать это правильно? Кроме того, как я могу приостановить эту тему и возобновить ее позже?

Я пытался использовать глобальный небезопасный флаг, чтобы разорвать цикл, но я думаю, что это решение выглядит не очень хорошо.

2 ответа

Решение

Редактировать: обновлено для Rust 1.x

Для обеих этих задач (завершить и приостановить поток) вы можете использовать каналы.

Вот как поток может быть завершен извне:

use std::thread;
use std::time::Duration;
use std::sync::mpsc::{self, TryRecvError};
use std::io::{self, BufRead};

fn main() {
    println!("Press enter to terminate the child thread");
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        loop {
            println!("Working...");
            thread::sleep(Duration::from_millis(500));
            match rx.try_recv() {
                Ok(_) | Err(TryRecvError::Disconnected) => {
                    println!("Terminating.");
                    break;
                }
                Err(TryRecvError::Empty) => {}
            }
        }
    });

    let mut line = String::new();
    let stdin = io::stdin();
    let _ = stdin.lock().read_line(&mut line);

    let _ = tx.send(());
}

То есть на каждой итерации рабочего цикла мы проверяем, уведомил ли кто-нибудь нас через канал. Если да или если другой конец канала вышел из области видимости, мы просто разрываем цикл.

Вот как поток может быть "приостановлен" и "возобновлен":

use std::time::Duration;
use std::thread;
use std::sync::mpsc;
use std::io::{self, BufRead};

fn main() {
    println!("Press enter to wake up the child thread");
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        loop {
            println!("Suspending...");
            match rx.recv() {
                Ok(_) => {
                    println!("Working...");
                    thread::sleep(Duration::from_millis(500));
                }
                Err(_) => {
                    println!("Terminating.");
                    break;
                }
            }
        }
    });

    let mut line = String::new();
    let stdin = io::stdin();
    for _ in 0..4 {
        let _ = stdin.lock().read_line(&mut line);
        let _ = tx.send(());
    }
}

Здесь мы используем recv() метод, который приостанавливает поток до тех пор, пока что-то не поступит на канал, поэтому для возобновления потока вам просто нужно что-то отправить (значение единицы () в данном случае) через канал. Если передающая сторона канала отброшена, recv() вернусь Err(()) - мы используем это, чтобы выйти из цикла.

Каналы - это самый простой и естественный (IMO) способ решения этих задач, но не самый эффективный. Есть и другие примитивы параллелизма, которые вы можете найти в std::sync модуль. Они принадлежат более низкому уровню, чем каналы, но могут быть более эффективными в определенных задачах.

Идеальным решением был бы Кондвар. Вы могли бы пойти на wait_timeout в модуле std::sync, как правильно указал @Vladimir -

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();

thread::spawn(move|| {
    let &(ref lock, ref cvar) = &*pair2;
    let mut started = lock.lock().unwrap();
    *started = true;
    // We notify the condvar that the value has changed.
    cvar.notify_one();
});

// Wait for the thread to start up.
let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
// As long as the value inside the `Mutex` is false, we wait.
loop {
    let result = cvar.wait_timeout_ms(started, 10).unwrap();
    // 10 milliseconds have passed, or maybe the value changed!
    started = result.0;
    if *started == true {
        // We received the notification and the value has been updated, we can leave.
        break
    }
}

Приведенный выше пример размещен на странице документации

Я сам несколько раз возвращался к этому вопросу, и вот что, как я думаю, касается намерений OP и других лучших практик по остановке потока. Основываясь на принятом ответе, Crossbeam - хорошее обновление до mpsc, позволяющее клонировать и перемещать конечные точки сообщений. Также есть удобная функция галочки. На самом деле здесь есть функция try_recv(), которая не блокирует работу.

Я не уверен, насколько универсально полезно было бы поместить средство проверки сообщений в середину рабочего цикла, подобного этому. Я не обнаружил, что Actix (или ранее Akka) действительно мог остановить поток без, как указано выше, без того, чтобы поток делал это сам. Итак, это то, что я использую сейчас (здесь широко открыт для исправлений, все еще учусь).

// Cargo.toml:
// [dependencies]
// crossbeam-channel = "0.4.4"

use crossbeam_channel::{Sender, Receiver, unbounded, tick};
use std::time::{Duration, Instant};

fn main() {
    let (tx, rx):(Sender<String>, Receiver<String>) = unbounded();
    let rx2 = rx.clone();

    // crossbeam allows clone and move of receiver
    std::thread::spawn(move || {
        // OP:
        // let mut timer = Timer::new().unwrap();
        // let periodic = timer.periodic(Duration::milliseconds(200));

        let ticker: Receiver<Instant> = tick(std::time::Duration::from_millis(500));

        loop {
            // OP:
            // periodic.recv();
            crossbeam_channel::select! {
                recv(ticker) -> _ => {

                    // OP: Do my work here
                    println!("Hello, work.");

                    // Comms Check: keep doing work?
                    // try_recv is non-blocking
                    // rx, the single consumer is clone-able in crossbeam
                    let try_result = rx2.try_recv();
                    match try_result {
                        Err(_e) => {},
                        Ok(msg) => {
                            match msg.as_str() {
                                "END_THE_WORLD" => {
                                    println!("Ending the world.");
                                    break;
                                },
                                _ => {},
                            }
                        },
                        _ => {}
                    }
                }
            }
        }
    });

    // let work continue for 10 seconds then tell that thread to end.
    std::thread::sleep(std::time::Duration::from_secs(10));
    println!("Goodbye, world.");
    tx.send("END_THE_WORLD".to_string());
}

Использование строк в качестве средства сообщения - для меня неприятно. Можно было бы приостановить и перезапустить другие вещи в перечислении.

Другие вопросы по тегам