Невозможно использовать Stream::take_while на mpsc::channel: bool: Будущее не удовлетворено

Я хочу запустить цикл обработки событий в одном потоке и обрабатывать данные из сокета UDP, пока другой поток не прекратит работу.

Это сложная задача для меня, поэтому я хочу начать с более простой задачи: один поток запускает цикл обработки событий и ждет, пока другой поток сообщит об окончании:

use futures::{future, future::Future, stream::Stream, sync::mpsc};
use std::{io, io::BufRead, thread};

fn main() {
    let (mut tx, rx) = mpsc::channel::<bool>(1);

    let thr = thread::spawn(|| {
        let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
        runtime.spawn(
            future::lazy(|| {
                println!("event loop started");
                Ok(())
            })
            .and_then(rx.take_while(|x| *x == true).into_future()),
        );

        runtime.run()
    });

    let stdin = io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        println!("{}", line);
        if line == "exit" {
            tx.try_send(false).unwrap();
            break;
        }
    }
    thr.join().unwrap().unwrap();
}

Этот код не компилируется:

error[E0277]: the trait bound `bool: futures::future::Future` is not satisfied
  --> src/main.rs:14:26
   |
14 |             .and_then(rx.take_while(|x| *x == true).into_future()),
   |                          ^^^^^^^^^^ the trait `futures::future::Future` is not implemented for `bool`
   |
   = note: required because of the requirements on the impl of `futures::future::IntoFuture` for `bool`

error[E0599]: no method named `into_future` found for type `futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool>` in the current scope
  --> src/main.rs:14:53
   |
14 |             .and_then(rx.take_while(|x| *x == true).into_future()),
   |                                                     ^^^^^^^^^^^
   |
   = note: the method `into_future` exists but the following trait bounds were not satisfied:
           `futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool> : futures::stream::Stream`
           `&mut futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool> : futures::stream::Stream`

Как исправить ошибку компиляции?

1 ответ

Прочитайте и поймите документацию и подпись функций методов, которые вы пытаетесь использовать:

fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
where
    P: FnMut(&Self::Item) -> R,
    R: IntoFuture<Item = bool, Error = Self::Error>,
    Self: Sized, 

take_while принимает замыкание, которое возвращает некоторый тип, который должен быть преобразован в будущее; bool не конвертируется в будущее. Самый простой способ сделать это через future::ok:

let thr = thread::spawn(|| {
    let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
    runtime.spawn({
        rx.take_while(|&x| future::ok(x))
            .for_each(|x| {
                println!("{}", x);
                future::ok(())
            })
    });

    runtime.run()
});

Смотрите также:

Но моя проблема также в присоединении future::lazy а также rx.take_while

Это не относится к тому, о чем вы спрашивали. Опять же, мы смотрим на документы, на этот раз для Future::and_then:

fn and_then<F, B>(self, f: F) -> AndThen<Self, B, F>
where
    F: FnOnce(Self::Item) -> B,
    B: IntoFuture<Error = Self::Error>,
    Self: Sized, 

Аналогично take_while требуется закрытие, и закрытие должно возвращать то, что может быть преобразовано в будущее. Ваш код не обеспечивает закрытие.

Тогда посмотрите на Stream::into_future, Это возвращает тип, который реализует Future и возвращает кортеж. Первый элемент в кортеже - это отдельное значение из потока, второй - сам поток, позволяющий получить больше значений.

Чтобы все элементы и типы ошибок были правильными, я свободно использую map(drop) а также map_err(drop) - вы хотите сделать что-то лучше для ваших данных и обработки ошибок.

runtime.spawn({
    future::lazy(|| {
        println!("event loop started");
        Ok(())
    })
    .and_then(|_| {
        rx.take_while(|&x| future::ok(x))
            .into_future()
            .map(drop)
            .map_err(drop)
    })
    .map(drop)
});

На самом деле, вы должны просто использовать канал с одним выстрелом; это намного проще:

use futures::{
    future::{self, Future},
    sync::oneshot,
};
use std::thread;

fn main() {
    let (tx, rx) = oneshot::channel();

    let thr = thread::spawn(|| {
        let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();

        runtime.spawn({
            future::lazy(|| {
                println!("event loop started");
                Ok(())
            })
            .and_then(|_| rx.map_err(drop))
        });

        runtime.run()
    });

    let lines = ["hello", "goodbye", "exit"];
    for &line in &lines {
        if line == "exit" {
            tx.send(()).unwrap();
            break;
        }
    }

    thr.join().unwrap().unwrap();
}
Другие вопросы по тегам