время жизни вокруг async и stream
Я пытаюсь сделать функцию, которая потребляет Stream
и обрезать его, когда есть max_consecutive_fails
последовательные неудачи. Однако все пошло не так (E0495). Я изменилсяStream
с к Iterator
s (и удален async
s) и это просто сработало. Почему так происходит? Как я могу реорганизовать этот код (чтобы он работал)?
use futures::stream::Stream;
pub fn max_fail<'a, T>(stream : impl Stream<Item = Option<T>> +'a , max_consecutive_fails: usize) -> impl Stream +'a where T : 'a
{
use futures::stream::StreamExt;
let mut consecutive_fails = 0;
stream.take_while(move |x| async {
if x.is_some(){
consecutive_fails = 0;
true
}
else{
consecutive_fails += 1;
consecutive_fails != max_consecutive_fails
}
})
}
Ниже приведен минимизированный пример. Я пытался указать, в чем проблема, но все еще не мог понять сообщение об ошибке rustc.
use futures::stream::Stream;
pub fn minified_example<'a>(stream: impl Stream<Item = bool> + 'a) -> impl Stream + 'a
{
use futures::stream::StreamExt;
stream.take_while( |x| async { *x })
}
2 ответа
Асинхронные блоки (async { ... }
) похожи на замыкания по способу захвата своего окружения. По умолчанию каждое использование переменной из другой области осуществляется по ссылке, что означаетimpl core::future::Future
созданный блоком не может пережить фиксируемые им переменные.
Тебе нужно переехать x
в блок с async move { ... }
(Как и с закрытием)
Так Future
захватывает переменную, а компилятор недостаточно умен, чтобы удалить ненужные захваты, и что нужно сделать, так это явно расчленить захваты с помощью отдельного асинхронного блока.
use futures::stream::Stream;
pub fn max_fail<'a, T>(
stream: impl Stream<Item = Option<T>> + 'a,
max_consecutive_fails: usize,
) -> impl Stream + 'a
where
T: 'a,
{
use futures::stream::StreamExt;
let mut consecutive_fails = 0;
stream.take_while(move |x| {
let t = if x.is_some() {
consecutive_fails = 0;
true
} else {
consecutive_fails += 1;
consecutive_fails != max_consecutive_fails
};
return async move { t };
})
}