время жизни вокруг async и stream

Я пытаюсь сделать функцию, которая потребляет Stream и обрезать его, когда есть max_consecutive_failsпоследовательные неудачи. Однако все пошло не так (E0495). Я изменилсяStreamс к Iterators (и удален asyncs) и это просто сработало. Почему так происходит? Как я могу реорганизовать этот код (чтобы он работал)?

use futures::stream::Stream;
pub fn max_fail<'a, T>(stream : impl Stream<Item = Option<T>> +'a , max_consecutive_fails: usize) -> impl Stream +'a where T : 'a
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| async {
        if x.is_some(){
            consecutive_fails = 0;
            true
        }
        else{
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        }
    })
}

Ниже приведен минимизированный пример. Я пытался указать, в чем проблема, но все еще не мог понять сообщение об ошибке rustc.

use futures::stream::Stream;
pub fn minified_example<'a>(stream: impl Stream<Item = bool> + 'a) -> impl Stream + 'a
{
    use futures::stream::StreamExt;
    stream.take_while( |x| async { *x })
}

2 ответа

Асинхронные блоки (async { ... }) похожи на замыкания по способу захвата своего окружения. По умолчанию каждое использование переменной из другой области осуществляется по ссылке, что означаетimpl core::future::Future созданный блоком не может пережить фиксируемые им переменные.

Тебе нужно переехать x в блок с async move { ... } (Как и с закрытием)

Так Future захватывает переменную, а компилятор недостаточно умен, чтобы удалить ненужные захваты, и что нужно сделать, так это явно расчленить захваты с помощью отдельного асинхронного блока.

use futures::stream::Stream;
pub fn max_fail<'a, T>(
    stream: impl Stream<Item = Option<T>> + 'a,
    max_consecutive_fails: usize,
) -> impl Stream + 'a
where
    T: 'a,
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| {
        let t = if x.is_some() {
            consecutive_fails = 0;
            true
        } else {
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        };
        return async move { t };
    })
}
Другие вопросы по тегам