Невозможно использовать Stream::take_while на mpsc::channel: bool: Будущее не удовлетворено
Я хочу запустить цикл обработки событий в одном потоке и обрабатывать данные из сокета UDP, пока другой поток не прекратит работу.
Это сложная задача для меня, поэтому я хочу начать с более простой задачи: один поток запускает цикл обработки событий и ждет, пока другой поток сообщит об окончании:
use futures::{future, future::Future, stream::Stream, sync::mpsc};
use std::{io, io::BufRead, thread};
fn main() {
let (mut tx, rx) = mpsc::channel::<bool>(1);
let thr = thread::spawn(|| {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
runtime.spawn(
future::lazy(|| {
println!("event loop started");
Ok(())
})
.and_then(rx.take_while(|x| *x == true).into_future()),
);
runtime.run()
});
let stdin = io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
println!("{}", line);
if line == "exit" {
tx.try_send(false).unwrap();
break;
}
}
thr.join().unwrap().unwrap();
}
Этот код не компилируется:
error[E0277]: the trait bound `bool: futures::future::Future` is not satisfied
--> src/main.rs:14:26
|
14 | .and_then(rx.take_while(|x| *x == true).into_future()),
| ^^^^^^^^^^ the trait `futures::future::Future` is not implemented for `bool`
|
= note: required because of the requirements on the impl of `futures::future::IntoFuture` for `bool`
error[E0599]: no method named `into_future` found for type `futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool>` in the current scope
--> src/main.rs:14:53
|
14 | .and_then(rx.take_while(|x| *x == true).into_future()),
| ^^^^^^^^^^^
|
= note: the method `into_future` exists but the following trait bounds were not satisfied:
`futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool> : futures::stream::Stream`
`&mut futures::stream::take_while::TakeWhile<futures::sync::mpsc::Receiver<bool>, [closure@src/main.rs:14:37: 14:51], bool> : futures::stream::Stream`
Как исправить ошибку компиляции?
1 ответ
Прочитайте и поймите документацию и подпись функций методов, которые вы пытаетесь использовать:
fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
where
P: FnMut(&Self::Item) -> R,
R: IntoFuture<Item = bool, Error = Self::Error>,
Self: Sized,
take_while
принимает замыкание, которое возвращает некоторый тип, который должен быть преобразован в будущее; bool
не конвертируется в будущее. Самый простой способ сделать это через future::ok
:
let thr = thread::spawn(|| {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
runtime.spawn({
rx.take_while(|&x| future::ok(x))
.for_each(|x| {
println!("{}", x);
future::ok(())
})
});
runtime.run()
});
Смотрите также:
Но моя проблема также в присоединении
future::lazy
а такжеrx.take_while
Это не относится к тому, о чем вы спрашивали. Опять же, мы смотрим на документы, на этот раз для Future::and_then
:
fn and_then<F, B>(self, f: F) -> AndThen<Self, B, F>
where
F: FnOnce(Self::Item) -> B,
B: IntoFuture<Error = Self::Error>,
Self: Sized,
Аналогично take_while
требуется закрытие, и закрытие должно возвращать то, что может быть преобразовано в будущее. Ваш код не обеспечивает закрытие.
Тогда посмотрите на Stream::into_future
, Это возвращает тип, который реализует Future
и возвращает кортеж. Первый элемент в кортеже - это отдельное значение из потока, второй - сам поток, позволяющий получить больше значений.
Чтобы все элементы и типы ошибок были правильными, я свободно использую map(drop)
а также map_err(drop)
- вы хотите сделать что-то лучше для ваших данных и обработки ошибок.
runtime.spawn({
future::lazy(|| {
println!("event loop started");
Ok(())
})
.and_then(|_| {
rx.take_while(|&x| future::ok(x))
.into_future()
.map(drop)
.map_err(drop)
})
.map(drop)
});
На самом деле, вы должны просто использовать канал с одним выстрелом; это намного проще:
use futures::{
future::{self, Future},
sync::oneshot,
};
use std::thread;
fn main() {
let (tx, rx) = oneshot::channel();
let thr = thread::spawn(|| {
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
runtime.spawn({
future::lazy(|| {
println!("event loop started");
Ok(())
})
.and_then(|_| rx.map_err(drop))
});
runtime.run()
});
let lines = ["hello", "goodbye", "exit"];
for &line in &lines {
if line == "exit" {
tx.send(()).unwrap();
break;
}
}
thr.join().unwrap().unwrap();
}