Как закрыть измененный и исполняющий поток `futures::sync::mpsc::Receiver`?
Я хотел бы иметь возможность сделать что-то в этом направлении, чтобы закрыть Receiver
Поток асинхронно:
extern crate futures;
extern crate tokio;
use futures::future::lazy;
use futures::stream::AndThen;
use futures::sync::mpsc::Receiver;
use futures::{Future, Sink, Stream};
use std::sync::{Arc, Mutex};
use tokio::timer::{Delay, Interval};
fn main() {
tokio::run(lazy(|| {
let (tx, rx) = futures::sync::mpsc::channel(1000);
let arc = Arc::new(Mutex::<Option<AndThen<Receiver<u32>, _, _>>>::new(None));
{
let mut and_then = arc.lock().unwrap();
*and_then = Some(rx.and_then(|num| {
println!("{}", num);
Ok(())
}));
}
let arc_clone = arc.clone();
// This is the part I'd like to be able to do
// After one second, close the `Receiver` so that future
// calls to the `Sender` don't call the callback above in the
// closure passed to `rx.and_then`
tokio::spawn(
Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
.map_err(|e| eprintln!("Some delay err {:?}", e))
.and_then(move |_| {
let mut maybe_stream = arc_clone.lock().unwrap();
match maybe_stream.take() {
Some(stream) => stream.into_inner().close(),
None => eprintln!("Can't close non-existent stream"), // line "A"
}
Ok(())
}),
);
{
let mut maybe_stream = arc.lock().unwrap();
let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"
let rx = stream.for_each(|_| Ok(()));
tokio::spawn(rx);
}
tokio::spawn(
Interval::new_interval(std::time::Duration::from_millis(10))
.take(10)
.map_err(|e| {
eprintln!("Interval error?! {:?}", e);
})
.fold((tx, 0), |(tx, i), _| {
tx.send(i as u32)
.map_err(|e| eprintln!("Send error?! {:?}", e))
.map(move |tx| (tx, i + 1))
})
.map(|_| ()),
);
Ok(())
}));
}
Тем не менее, строка A работает, потому что мне нужно переместить поток на линии B, чтобы вызвать .for_each
в теме. Если я не позвоню .for_each
(или что-то подобное), я не могу выполнить AndThen
Насколько я знаю. Я не могу позвонить .for_each
фактически не перемещая объект, потому что for_each
это движущийся метод.
Могу ли я сделать то, что я пытаюсь сделать? Кажется, что это определенно должно быть возможно, но, возможно, я упускаю что-то очевидное.
Я использую фьючерсы на 0,1 и Токио на 0,1.
2 ответа
Не буду врать, я с @shepmaster об этом, ваш вопрос довольно неясен. Тем не менее, кажется, что вы пытаетесь сделать что-то mpsc
часть futures
не приспособлен для этого.
Тем не мение. Время объяснения.
Всякий раз, когда вы объединяете / компонуете потоки (или фьючерсы!), Каждый отдельный метод композиции занимает self
не &self
или же &mut self
как я думаю, вы могли бы надеяться.
В тот момент, когда вы доберетесь до этого своего блока кода:
{
let mut maybe_stream = arc.lock().unwrap();
let stream = maybe_stream.take().expect("Stream already ripped out"); // line "B"
let rx = stream.for_each(|_| Ok(()));
tokio::spawn(rx);
}
... Поток извлекается из Arc<Option<Receiver<T>>>
когда ты take()
это, и содержание этого заменяется None
, Затем вы создаете его на реакторе Токио, который начинает обработку этой части. это rx
сейчас на связи, и больше не доступен для вас. Кроме того, ваш maybe_stream
теперь содержит None
,
После задержки вы затем пытаетесь take()
содержание Arc<Option<Receiver<T>>>
(строка А). Так как теперь ничего не осталось, вы остались ни с чем, и поэтому не осталось ничего, чтобы закрыть. Ваш код ошибки.
Вместо того, чтобы обойти mpsc::Receiver
и надеясь уничтожить его, используйте механизм, чтобы остановить сам поток. Вы можете сделать это самостоятельно или использовать ящик stream-cancel
сделать это для вас.
Сделай сам версия здесь, модифицированная из твоего кода:
extern crate futures;
extern crate tokio;
use futures::future::lazy;
use futures::{future, Future, Sink, Stream};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::timer::{Delay, Interval};
fn main() {
tokio::run(lazy(|| {
let (tx, rx) = futures::sync::mpsc::channel(1000);
let circuit_breaker:Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let c_b_copy = Arc::clone(&circuit_breaker);
tokio::spawn(
Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
.map_err(|e| eprintln!("Some delay err {:?}", e))
.and_then(move |_| {
// We set the CB to true in order to stop processing of the stream
circuit_breaker.store(true, Ordering::Relaxed);
Ok(())
}),
);
{
let rx2 = rx.for_each(|e| {
println!("{:?}", e);
Ok(())
});
tokio::spawn(rx2);
}
tokio::spawn(
Interval::new_interval(std::time::Duration::from_millis(100))
.take(100)
// take_while causes the stream to continue as long as its argument returns a future resolving to true.
// In this case, we're checking every time if the circuit-breaker we've introduced is false
.take_while(move |_| {
future::ok(
c_b_copy.load(Ordering::Relaxed) == false
);
})
.map_err(|e| {
eprintln!("Interval error?! {:?}", e);
})
.fold((tx, 0), |(tx, i), _| {
tx.send(i as u32)
.map_err(|e| eprintln!("Send error?! {:?}", e))
.map(move |tx| (tx, i + 1))
})
.map(|_| ()),
);
Ok(())
}));
}
Добавленный take_while()
позволяет вам работать либо с содержимым потока, либо с внешним предикатом, чтобы продолжить или остановить поток. Обратите внимание, что хотя мы используем AtomicBool
нам все еще нужно Arc
из-за 'static
Пожизненные требования от Токио.
Обратный поток
После некоторого обсуждения в комментариях, это решение может быть более подходящим для вашего варианта использования. Я эффективно реализовал поток разветвления, охватываемый автоматическим выключателем. Волшебство происходит здесь:
impl<S> Stream for FanOut<S> where S:Stream, S::Item:Clone {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
match self.inner.as_mut() {
Some(ref mut r) => {
let mut breaker = self.breaker.write().expect("Poisoned lock");
match breaker.status {
false => {
let item = r.poll();
match &item {
&Ok(Async::Ready(Some(ref i))) => {
breaker.registry.iter_mut().for_each(|sender| {
sender.try_send(i.clone()).expect("Dead channel");
});
item
},
_ => item
}
},
true => Ok(Async::Ready(None))
}
}
_ => {
let mut breaker = self.breaker.write().expect("Poisoned lock");
// Stream is over, drop all the senders
breaker.registry = vec![];
Ok(Async::Ready(None))
}
}
}
}
Если индикатор состояния имеет значение false, вышеуказанный поток опрашивается; Затем результат отправляется всем слушателям. Если результат poll
является Async::Ready(None)
(указывая, что поток закончен), все каналы слушателя закрыты.
Если индикатор состояния установлен в значение true, все каналы слушателя закрываются, и поток возвращается Async::Ready(None)
(и снят с казни Токио).
FanOut
Объект является клонируемым, но только начальный экземпляр будет делать что угодно.
Вы можете использовать ящик как поток-отмена, чтобы достигнуть этого. Здесь я использовал Valved
обертка потока, которая берет существующий поток и возвращает значение, которое вы можете использовать для последующего отмены потока:
use futures::{
future::lazy,
{Future, Sink, Stream},
}; // 0.1.25
use stream_cancel::Valved; // 0.4.4
use tokio::timer::{Delay, Interval}; // 0.1.13
fn main() {
tokio::run(lazy(|| {
let (tx, rx) = futures::sync::mpsc::channel(1000);
let (trigger, rx) = Valved::new(rx);
tokio::spawn({
rx.for_each(|num| {
println!("{}", num);
Ok(())
})
});
tokio::spawn({
Delay::new(std::time::Instant::now() + std::time::Duration::from_secs(1))
.map_err(|e| eprintln!("Some delay err {:?}", e))
.map(move |_| trigger.cancel()),
});
tokio::spawn({
Interval::new_interval(std::time::Duration::from_millis(10))
.take(10)
.map_err(|e| eprintln!("Interval error?! {:?}", e))
.fold((tx, 0), |(tx, i), _| {
tx.send(i)
.map_err(|e| eprintln!("Send error?! {:?}", e))
.map(move |tx| (tx, i + 1))
})
.map(|_| ()),
});
Ok(())
}));
}
Ящик имеет другие типы, подходящие для немного разных вариантов использования. Обязательно ознакомьтесь с документацией.
Посмотрите ответ Себастьяна Рено для одного способа реализовать это самостоятельно.