Как я могу проверить будущее, которое связано с Токио TcpStream?

У меня есть будущее, которое оборачивает поток TCP в Framed с использованием LinesCodec,

Когда я пытаюсь обернуть это в тесте, будущее блокируется примерно в 20% случаев, но из-за того, что я ничего не слушаю в сокете, к которому пытаюсь подключиться, я всегда получаю ошибку:

thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.

Это тестовый код, который я использовал:

#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio;   // 0.1.8

use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;

struct MyFuture {
    addr: SocketAddr,
}

impl Future for MyFuture {
    type Item = Framed<TcpStream, LinesCodec>;
    type Error = io::Error;
    fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
        let strm = try_ready!(TcpStream::connect(&self.addr).poll());
        Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = MyFuture { addr: addr }
            .and_then(|f| {
                println!("connected");
                let cn = f.get_ref();
                cn.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

детская площадка

Я видел шаблоны в других языках, где сам тестовый двоичный файл предлагает механизм для асинхронного возврата результатов, но я не нашел хорошего способа использования подобного механизма в Rust.

2 ответа

Решение

Простой способ тестирования асинхронного кода может заключаться в использовании выделенной среды выполнения для каждого теста: запустить ее, дождаться будущего завершения и завершить работу в конце теста.

#[test]
fn my_case() {

    // setup future f
    // ...

    tokio::run(f);
}

Я не знаю, есть ли уже консолидированные модели в экосистеме Rust; посмотрите это обсуждение эволюции поддержки тестирования для будущего кода.

Почему ваш код не работает должным образом

Когда вы вызываете poll(), будущее запрашивается, чтобы проверить, доступно ли значение.

Если значение недоступно, проценты регистрируются так, чтобы poll() будет вызван снова, когда что-то случится, что может решить будущее.

Когда ваш MyFuture::poll() вызывается:

  1. TcpStream::connect создает новое будущее TcpStreamNew
  2. TcpStreamNew::poll вызывается сразу только один раз при создании будущего на шаге 1.
  3. Будущее выходит за рамки, так что в следующий раз вы вызываете MyFuture::poll Вы никогда не разрешите ранее созданные фьючерсы.

Вы зарегистрировали интерес к будущему, который, если не разрешен в первый раз, когда вы опрашиваете его, вы никогда не будете запрашивать снова (опрос) для разрешенного значения или для ошибки.

Причина "недетерминированного" поведения заключается в том, что первый poll иногда решить сразу с ConnectionRefused ошибка, а иногда он всегда ждет будущего события подключения или сбоя, который он никогда не получает.

смотреть на mio::sys::unix::tcp::TcpStream используется Токио:

 impl TcpStream {
     pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
         set_nonblock(stream.as_raw_fd())?;

         match stream.connect(addr) {
             Ok(..) => {}
             Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
             Err(e) => return Err(e),
         }

         Ok(TcpStream {
             inner: stream,
         })
     }

Когда ты connect на неблокирующем сокете системный вызов может немедленно соединиться / потерпеть неудачу или вернуться EINPROGRESS, в этом последнем случае должен быть запущен опрос для получения значения ошибки.

Проблема не в тесте, а в реализации.

Этот рабочий тестовый сценарий, основанный на вашем, не имеет собственной будущей реализации и только вызывает TcpStream::connect(), Это работает так, как вы ожидаете.

extern crate futures;
extern crate tokio;

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;
    use std::net::SocketAddr;
    use tokio::net::TcpStream;
    use tokio::prelude::*;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = TcpStream::connect(&addr)
            .and_then(|f| {
                println!("connected");
                f.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

детская площадка

Вы подключаетесь к одной и той же конечной точке снова и снова в своем poll() метод. Это не так, как работает будущее. poll() метод будет вызываться повторно, ожидая, что в какой-то момент он вернет либо Ok(Async::Ready(..)) или же Err(..),

Если вы каждый раз инициируете новое TCP-соединение poll() называется, это вряд ли будет завершено вовремя.

Вот модифицированный пример, который делает то, что вы ожидаете:

#[macro_use(try_ready)]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::{ConnectFuture, TcpStream};
use tokio::prelude::*;

struct MyFuture {
    tcp: ConnectFuture,
}

impl MyFuture {
    fn new(addr: SocketAddr) -> MyFuture {
        MyFuture {
            tcp: TcpStream::connect(&addr),
        }
    }
}

impl Future for MyFuture {
    type Item = Framed<TcpStream, LinesCodec>;
    type Error = io::Error;

    fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
        let strm = try_ready!(self.tcp.poll());
        Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = MyFuture::new(addr)
            .and_then(|f| {
                println!("connected");
                let cn = f.get_ref();
                cn.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

Я не уверен, что вы намерены сделать в будущем, хотя; Я не могу комментировать, если это правильный подход.

До некоторой степени вы можете зайти в тестовую библиотеку tokio, чтобы упростить задачу; он поддерживает async/await в модульных тестах.

#[tokio::test]
async fn my_future_test() {
  let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
  match MyFuture { addr }.poll().await {
    Ok(f) => assert!("something good")
    Err(e) => assert!("something bad")
  }
}

https://docs.rs/tokio/0.3.3/tokio/attr.test.html

Другие вопросы по тегам