Как закрыть измененный и исполняющий поток `futures::sync::mpsc::Receiver`?

Я хотел бы иметь возможность сделать что-то в этом направлении, чтобы закрыть Receiver Поток асинхронно:

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use futures::stream::AndThen;
use futures::sync::mpsc::Receiver;
use futures::{Future, Sink, Stream};
use std::sync::{Arc, Mutex};

use tokio::timer::{Delay, Interval};

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);

        let arc = Arc::new(Mutex::<Option<AndThen<Receiver<u32>, _, _>>>::new(None));

        {
            let mut and_then = arc.lock().unwrap();
            *and_then = Some(rx.and_then(|num| {
                println!("{}", num);
                Ok(())
            }));
        }

        let arc_clone = arc.clone();
        // This is the part I'd like to be able to do
        // After one second, close the `Receiver` so that future
        // calls to the `Sender` don't call the callback above in the
        // closure passed to `rx.and_then`
        tokio::spawn(
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .and_then(move |_| {
                    let mut maybe_stream = arc_clone.lock().unwrap();
                    match maybe_stream.take() {
                        Some(stream) => stream.into_inner().close(),
                        None => eprintln!("Can't close non-existent stream"), // line "A"
                    }
                    Ok(())
                }),
        );

        {
            let mut maybe_stream = arc.lock().unwrap();
            let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"

            let rx = stream.for_each(|_| Ok(()));
            tokio::spawn(rx);
        }

        tokio::spawn(
            Interval::new_interval(std::time::Duration::from_millis(10))
                .take(10)
                .map_err(|e| {
                    eprintln!("Interval error?! {:?}", e);
                })
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i as u32)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        );

        Ok(())
    }));
}

Детская площадка

Тем не менее, строка A работает, потому что мне нужно переместить поток на линии B, чтобы вызвать .for_each в теме. Если я не позвоню .for_each (или что-то подобное), я не могу выполнить AndThen Насколько я знаю. Я не могу позвонить .for_each фактически не перемещая объект, потому что for_each это движущийся метод.

Могу ли я сделать то, что я пытаюсь сделать? Кажется, что это определенно должно быть возможно, но, возможно, я упускаю что-то очевидное.

Я использую фьючерсы на 0,1 и Токио на 0,1.

2 ответа

Решение

Не буду врать, я с @shepmaster об этом, ваш вопрос довольно неясен. Тем не менее, кажется, что вы пытаетесь сделать что-то mpsc часть futures не приспособлен для этого.

Тем не мение. Время объяснения.

Всякий раз, когда вы объединяете / компонуете потоки (или фьючерсы!), Каждый отдельный метод композиции занимает selfне &self или же &mut self как я думаю, вы могли бы надеяться.

В тот момент, когда вы доберетесь до этого своего блока кода:

    {
        let mut maybe_stream = arc.lock().unwrap();
        let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"

        let rx = stream.for_each(|_| Ok(()));
        tokio::spawn(rx);
    }

... Поток извлекается из Arc<Option<Receiver<T>>> когда ты take() это, и содержание этого заменяется None, Затем вы создаете его на реакторе Токио, который начинает обработку этой части. это rx сейчас на связи, и больше не доступен для вас. Кроме того, ваш maybe_stream теперь содержит None,

После задержки вы затем пытаетесь take() содержание Arc<Option<Receiver<T>>> (строка А). Так как теперь ничего не осталось, вы остались ни с чем, и поэтому не осталось ничего, чтобы закрыть. Ваш код ошибки.

Вместо того, чтобы обойти mpsc::Receiver и надеясь уничтожить его, используйте механизм, чтобы остановить сам поток. Вы можете сделать это самостоятельно или использовать ящик stream-cancel сделать это для вас.

Сделай сам версия здесь, модифицированная из твоего кода:

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use futures::{future, Future, Sink, Stream};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::timer::{Delay, Interval};

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);

        let circuit_breaker:Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
        let c_b_copy = Arc::clone(&circuit_breaker);
        tokio::spawn(
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .and_then(move |_| {
                    // We set the CB to true in order to stop processing of the stream
                    circuit_breaker.store(true, Ordering::Relaxed);
                    Ok(())
                }),
        );

        {
            let rx2 = rx.for_each(|e| {
                println!("{:?}", e);
                Ok(())
            });
            tokio::spawn(rx2);
        }

        tokio::spawn(
            Interval::new_interval(std::time::Duration::from_millis(100))
                .take(100)
                // take_while causes the stream to continue as long as its argument returns a future resolving to true.
                // In this case, we're checking every time if the circuit-breaker we've introduced is false
                .take_while(move |_| {
                    future::ok(
                        c_b_copy.load(Ordering::Relaxed) == false
                    );
                })
                .map_err(|e| {
                    eprintln!("Interval error?! {:?}", e);
                })
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i as u32)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        );

        Ok(())
    }));
}

Детская площадка

Добавленный take_while() позволяет вам работать либо с содержимым потока, либо с внешним предикатом, чтобы продолжить или остановить поток. Обратите внимание, что хотя мы используем AtomicBoolнам все еще нужно Arc из-за 'static Пожизненные требования от Токио.

Обратный поток

После некоторого обсуждения в комментариях, это решение может быть более подходящим для вашего варианта использования. Я эффективно реализовал поток разветвления, охватываемый автоматическим выключателем. Волшебство происходит здесь:

impl<S> Stream for FanOut<S> where S:Stream, S::Item:Clone {

    type Item = S::Item;

    type Error = S::Error;

    fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
        match self.inner.as_mut() {
            Some(ref mut r) => {
                let mut breaker = self.breaker.write().expect("Poisoned lock");
                match breaker.status {
                    false => {
                        let item = r.poll();
                        match &item {
                            &Ok(Async::Ready(Some(ref i))) => {
                                breaker.registry.iter_mut().for_each(|sender| {
                                    sender.try_send(i.clone()).expect("Dead channel");
                                });
                                item
                            },
                            _ => item
                        }
                    },
                    true => Ok(Async::Ready(None))
                }
            }
            _ => {

                let mut breaker = self.breaker.write().expect("Poisoned lock");
                // Stream is over, drop all the senders

                breaker.registry = vec![];
                Ok(Async::Ready(None))
            }
        }
    }
}

Если индикатор состояния имеет значение false, вышеуказанный поток опрашивается; Затем результат отправляется всем слушателям. Если результат poll является Async::Ready(None) (указывая, что поток закончен), все каналы слушателя закрыты.

Если индикатор состояния установлен в значение true, все каналы слушателя закрываются, и поток возвращается Async::Ready(None) (и снят с казни Токио).

FanOut Объект является клонируемым, но только начальный экземпляр будет делать что угодно.

Вы можете использовать ящик как поток-отмена, чтобы достигнуть этого. Здесь я использовал Valved обертка потока, которая берет существующий поток и возвращает значение, которое вы можете использовать для последующего отмены потока:

use futures::{
    future::lazy,
    {Future, Sink, Stream},
}; // 0.1.25
use stream_cancel::Valved; // 0.4.4
use tokio::timer::{Delay, Interval}; // 0.1.13

fn main() {
    tokio::run(lazy(|| {
        let (tx, rx) = futures::sync::mpsc::channel(1000);
        let (trigger, rx) = Valved::new(rx);

        tokio::spawn({
            rx.for_each(|num| {
                println!("{}", num);
                Ok(())
            })
        });

        tokio::spawn({
            Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
                .map_err(|e| eprintln!("Some delay err {:?}", e))
                .map(move |_| trigger.cancel()),
        });

        tokio::spawn({
            Interval::new_interval(std::time::Duration::from_millis(10))
                .take(10)
                .map_err(|e| eprintln!("Interval error?! {:?}", e))
                .fold((tx, 0), |(tx, i), _| {
                    tx.send(i)
                        .map_err(|e| eprintln!("Send error?! {:?}", e))
                        .map(move |tx| (tx, i + 1))
                })
                .map(|_| ()),
        });

        Ok(())
    }));
}

Ящик имеет другие типы, подходящие для немного разных вариантов использования. Обязательно ознакомьтесь с документацией.

Посмотрите ответ Себастьяна Рено для одного способа реализовать это самостоятельно.

Другие вопросы по тегам