Запретить `chan::Receiver` блокировать пустой буфер

Я хотел бы построить канал Multi-Producer Multi-Consumer (MPMC) с различными параллельными задачами, обрабатывающими и производящими в нем данные. Некоторые из этих задач отвечают за взаимодействие с файловой системой или сетью.

Два примера:

  • PrintOutput(String) будет использоваться регистратором, консольным выходом или графическим интерфейсом.

  • NewJson(String) будет потребляться регистратором или парсером.

Чтобы добиться этого, я выбрал chan в качестве поставщика каналов MPMC и tokio как система для управления циклами событий для каждого слушателя на канале.

Прочитав пример на сайте Токио, я начал реализовывать futures::stream::Stream за chan::Receiver, Это позволило бы использовать для каждого будущего прослушивания на канале. Однако документация этих двух библиотек подчеркивает конфликт:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>

Попытайтесь извлечь следующее значение этого потока, возвращая None, если поток завершен.

Этот метод, как и Future::poll, является единственным методом извлечения значения из потока. Этот метод также должен выполняться в контексте задачи, как правило, и разработчики этой черты должны гарантировать, что реализации этого метода не блокируются, так как это может привести к плохому поведению потребителей.

fn recv(&self) -> Option<T>

Получите значение на этом канале.

Если это асинхронный канал, recv блокируется только тогда, когда буфер пуст.

Если это синхронный канал, recv блокируется только тогда, когда буфер пуст.

Если это канал рандеву, recv блокируется, пока соответствующий send не отправит значение.

Для всех каналов, если канал закрыт и буфер пуст, то recv всегда и сразу возвращает None. (Если буфер закрыт на закрытом канале, возвращаются значения из буфера.)

Значения гарантированно будут получены в том же порядке, в котором они были отправлены.

Эта операция никогда не паникует! но он может зайти в тупик, если канал никогда не будет закрыт.

chan::Receiver может блокировать, когда буфер пуст, но futures::stream::Stream ожидает никогда не блокировать при опросе.

Если пустой буфер блокируется, нет четкого способа подтвердить, что он пуст. Как проверить, пуст ли буфер, чтобы предотвратить блокировку?

Хотя Кабуки находится на моем радаре и кажется самым зрелым из ящиков актерской модели, в ней почти полностью отсутствует документация.


Это моя реализация до сих пор:

extern crate chan;
extern crate futures;

struct RX<T>(chan::Receiver<T>);

impl<T> futures::stream::Stream for RX<T> {
    type Item = T;
    type Error = Box<std::error::Error>;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        let &mut RX(ref receiver) = self;
        let item = receiver.recv();

        match item {
            Some(value) => Ok(futures::Async::Ready(Some(value))),
            None => Ok(futures::Async::NotReady),
        }
    }
}

Я закончил быстрый тест, чтобы увидеть, как это работает. Кажется, все в порядке, но, как и ожидалось, блокируется после завершения буфера. Хотя это должно сработать, меня несколько беспокоит то, что для потребителя означает "плохо себя вести". Сейчас я продолжу тестировать этот подход и, надеюсь, не столкнусь с плохим поведением.

extern crate chan;
extern crate futures;
use futures::{Stream, Future};

fn my_test() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let (tx, rx) = chan::async::<String>();

    tx.send("Hello".to_string()); // fill the buffer before it blocks; single thread here.

    let incoming = RX(rx).for_each(|s| {
        println!("Result: {}", s);

        Ok(())
    });

    core.run(incoming).unwrap()
}

1 ответ

Решение

chan ящик обеспечивает chan_select макрос, который позволил бы неблокировать recv; но реализовать Future для таких примитивов вам также нужно разбудить задачу, когда канал станет готовым (см. futures::task::current()).

Вы можете реализовать Future используя существующие примитивы; Внедрение новых, как правило, сложнее. В этом случае вам, вероятно, придется раскошелиться chan сделать это Future совместимы.

Кажется multiqueue ящик имеет Future совместимый канал MPMC mpmc_fut_queue,

Другие вопросы по тегам