Как я могу проверить будущее, которое связано с Токио TcpStream?
У меня есть будущее, которое оборачивает поток TCP в Framed
с использованием LinesCodec
,
Когда я пытаюсь обернуть это в тесте, будущее блокируется примерно в 20% случаев, но из-за того, что я ничего не слушаю в сокете, к которому пытаюсь подключиться, я всегда получаю ошибку:
thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.
Это тестовый код, который я использовал:
#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;
struct MyFuture {
addr: SocketAddr,
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(TcpStream::connect(&self.addr).poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture { addr: addr }
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
Я видел шаблоны в других языках, где сам тестовый двоичный файл предлагает механизм для асинхронного возврата результатов, но я не нашел хорошего способа использования подобного механизма в Rust.
2 ответа
Простой способ тестирования асинхронного кода может заключаться в использовании выделенной среды выполнения для каждого теста: запустить ее, дождаться будущего завершения и завершить работу в конце теста.
#[test]
fn my_case() {
// setup future f
// ...
tokio::run(f);
}
Я не знаю, есть ли уже консолидированные модели в экосистеме Rust; посмотрите это обсуждение эволюции поддержки тестирования для будущего кода.
Почему ваш код не работает должным образом
Когда вы вызываете poll()
, будущее запрашивается, чтобы проверить, доступно ли значение.
Если значение недоступно, проценты регистрируются так, чтобы poll()
будет вызван снова, когда что-то случится, что может решить будущее.
Когда ваш MyFuture::poll()
вызывается:
TcpStream::connect
создает новое будущееTcpStreamNew
TcpStreamNew::poll
вызывается сразу только один раз при создании будущего на шаге 1.- Будущее выходит за рамки, так что в следующий раз вы вызываете
MyFuture::poll
Вы никогда не разрешите ранее созданные фьючерсы.
Вы зарегистрировали интерес к будущему, который, если не разрешен в первый раз, когда вы опрашиваете его, вы никогда не будете запрашивать снова (опрос) для разрешенного значения или для ошибки.
Причина "недетерминированного" поведения заключается в том, что первый poll
иногда решить сразу с ConnectionRefused
ошибка, а иногда он всегда ждет будущего события подключения или сбоя, который он никогда не получает.
смотреть на mio::sys::unix::tcp::TcpStream
используется Токио:
impl TcpStream {
pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
set_nonblock(stream.as_raw_fd())?;
match stream.connect(addr) {
Ok(..) => {}
Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(e) => return Err(e),
}
Ok(TcpStream {
inner: stream,
})
}
Когда ты connect
на неблокирующем сокете системный вызов может немедленно соединиться / потерпеть неудачу или вернуться EINPROGRESS
, в этом последнем случае должен быть запущен опрос для получения значения ошибки.
Проблема не в тесте, а в реализации.
Этот рабочий тестовый сценарий, основанный на вашем, не имеет собственной будущей реализации и только вызывает TcpStream::connect()
, Это работает так, как вы ожидаете.
extern crate futures;
extern crate tokio;
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::prelude::*;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = TcpStream::connect(&addr)
.and_then(|f| {
println!("connected");
f.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
Вы подключаетесь к одной и той же конечной точке снова и снова в своем poll()
метод. Это не так, как работает будущее. poll()
метод будет вызываться повторно, ожидая, что в какой-то момент он вернет либо Ok(Async::Ready(..))
или же Err(..)
,
Если вы каждый раз инициируете новое TCP-соединение poll()
называется, это вряд ли будет завершено вовремя.
Вот модифицированный пример, который делает то, что вы ожидаете:
#[macro_use(try_ready)]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::{ConnectFuture, TcpStream};
use tokio::prelude::*;
struct MyFuture {
tcp: ConnectFuture,
}
impl MyFuture {
fn new(addr: SocketAddr) -> MyFuture {
MyFuture {
tcp: TcpStream::connect(&addr),
}
}
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(self.tcp.poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture::new(addr)
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
Я не уверен, что вы намерены сделать в будущем, хотя; Я не могу комментировать, если это правильный подход.
До некоторой степени вы можете зайти в тестовую библиотеку tokio, чтобы упростить задачу; он поддерживает async/await в модульных тестах.
#[tokio::test]
async fn my_future_test() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
match MyFuture { addr }.poll().await {
Ok(f) => assert!("something good")
Err(e) => assert!("something bad")
}
}