Как добиться нулевого копирования в tokio_core::io::Codec::decode(...)?

Моя цель - реализовать Codec который истощает предоставленное EasyBuf к границам сообщения и декодирует его в структуры, которые просто ссылаются на содержимое, чтобы предотвратить ненужное копирование. Заглядывая в реализацию EasyBufКажется, в настоящее время это невозможно, но, может быть, я что-то упустил.

Вот код, с которым я играл, и делать это на самом деле желательно для Токио-Кассандры:

struct V<'a> {
    s: &'a [u8],
}

struct R<'a> {
    b: EasyBuf,
    v: Option<V<'a>>,
}


struct C;

impl Codec for C {
    type In = R<'static>;
    type Out = String;

    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
        let mut r = R {
            b: buf.clone(),
            v: None,
        };
        r.v = Some(V { s: r.b.as_slice() });
        Ok(Some(r))
    }
    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
        Ok(())
    }
}


fn main() {
    let b = EasyBuf::new();
    let mut r = R { b: b, v: None };
    r.v = Some(V { s: r.b.as_slice() });
}


use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;



#[derive(Clone)]
pub struct EasyBuf {
    buf: Arc<Vec<u8>>,
    start: usize,
    end: usize,
}

pub struct EasyBufMut<'a> {
    buf: &'a mut Vec<u8>,
    end: &'a mut usize,
}

impl EasyBuf {
    pub fn new() -> EasyBuf {
        EasyBuf::with_capacity(8 * 1024)
    }

    pub fn with_capacity(cap: usize) -> EasyBuf {
        EasyBuf {
            buf: Arc::new(Vec::with_capacity(cap)),
            start: 0,
            end: 0,
        }
    }

    fn set_start(&mut self, start: usize) -> &mut EasyBuf {
        assert!(start <= self.buf.as_ref().len());
        assert!(start <= self.end);
        self.start = start;
        self
    }

    fn set_end(&mut self, end: usize) -> &mut EasyBuf {
        assert!(end <= self.buf.len());
        assert!(self.start <= end);
        self.end = end;
        self
    }

    pub fn len(&self) -> usize {
        self.end - self.start
    }

    pub fn as_slice(&self) -> &[u8] {
        self.as_ref()
    }

    pub fn split_off(&mut self, at: usize) -> EasyBuf {
        let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
        let idx = self.start + at;
        other.set_start(idx);
        self.set_end(idx);
        return other;
    }

    pub fn drain_to(&mut self, at: usize) -> EasyBuf {
        let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
        let idx = self.start + at;
        other.set_end(idx);
        self.set_start(idx);
        return other;
    }

    pub fn get_mut(&mut self) -> EasyBufMut {
        if Arc::get_mut(&mut self.buf).is_some() {
            let buf = Arc::get_mut(&mut self.buf).unwrap();
            buf.drain(..self.start);
            self.start = 0;
            return EasyBufMut {
                buf: buf,
                end: &mut self.end,
            };
        }

        let mut v = Vec::with_capacity(self.buf.capacity());
        v.extend_from_slice(self.as_ref());
        self.start = 0;
        self.buf = Arc::new(v);
        EasyBufMut {
            buf: Arc::get_mut(&mut self.buf).unwrap(),
            end: &mut self.end,
        }
    }
}

impl AsRef<[u8]> for EasyBuf {
    fn as_ref(&self) -> &[u8] {
        &self.buf[self.start..self.end]
    }
}

impl<'a> Deref for EasyBufMut<'a> {
    type Target = Vec<u8>;

    fn deref(&self) -> &Vec<u8> {
        self.buf
    }
}

impl<'a> DerefMut for EasyBufMut<'a> {
    fn deref_mut(&mut self) -> &mut Vec<u8> {
        self.buf
    }
}

impl From<Vec<u8>> for EasyBuf {
    fn from(vec: Vec<u8>) -> EasyBuf {
        let end = vec.len();
        EasyBuf {
            buf: Arc::new(vec),
            start: 0,
            end: end,
        }
    }
}

impl<'a> Drop for EasyBufMut<'a> {
    fn drop(&mut self) {
        *self.end = self.buf.len();
    }
}

/// Encoding and decoding of frames via buffers.
///
/// This trait is used when constructing an instance of `Framed`. It provides
/// two types: `In`, for decoded input frames, and `Out`, for outgoing frames
/// that need to be encoded. It also provides methods to actually perform the
/// encoding and decoding, which work with corresponding buffer types.
///
/// The trait itself is implemented on a type that can track state for decoding
/// or encoding, which is particularly useful for streaming parsers. In many
/// cases, though, this type will simply be a unit struct (e.g. `struct
/// HttpCodec`).
pub trait Codec {
    /// The type of decoded frames.
    type In;

    /// The type of frames to be encoded.
    type Out;

    /// Attempts to decode a frame from the provided buffer of bytes.
    ///
    /// This method is called by `Framed` whenever bytes are ready to be parsed.
    /// The provided buffer of bytes is what's been read so far, and this
    /// instance of `Decode` can determine whether an entire frame is in the
    /// buffer and is ready to be returned.
    ///
    /// If an entire frame is available, then this instance will remove those
    /// bytes from the buffer provided and return them as a decoded
    /// frame. Note that removing bytes from the provided buffer doesn't always
    /// necessarily copy the bytes, so this should be an efficient operation in
    /// most circumstances.
    ///
    /// If the bytes look valid, but a frame isn't fully available yet, then
    /// `Ok(None)` is returned. This indicates to the `Framed` instance that
    /// it needs to read some more bytes before calling this method again.
    ///
    /// Finally, if the bytes in the buffer are malformed then an error is
    /// returned indicating why. This informs `Framed` that the stream is now
    /// corrupt and should be terminated.
    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>>;

    /// A default method available to be called when there are no more bytes
    /// available to be read from the underlying I/O.
    ///
    /// This method defaults to calling `decode` and returns an error if
    /// `Ok(None)` is returned. Typically this doesn't need to be implemented
    /// unless the framing protocol differs near the end of the stream.
    fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result<Self::In> {
        match try!(self.decode(buf)) {
            Some(frame) => Ok(frame),
            None => Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream")),
        }
    }

    /// Encodes a frame into the buffer provided.
    ///
    /// This method will encode `msg` into the byte buffer provided by `buf`.
    /// The `buf` provided is an internal buffer of the `Framed` instance and
    /// will be written out when possible.
    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()>;
}

Rust Playground

мысли

  • Я не думаю, что это может быть сделано в настоящее время из-за отсутствия времени жизни в Codec::In - мы должны объявить это 'static и это проблема для заемщика.
  • Это может быть реализовано путем возврата EasyBuf сам (Codec::In = EasyBuf) и выполнить декодирование на более позднем этапе, например, в будущем, прикованном так же, как в этом примере
  • Это также может быть реализовано простым анализом индексов в EasyBuf и ленивой генерацией реальных типов данных со ссылками. Например, строка будет просто (usize, *const u8)который впоследствии становится &str по запросу пользователя.

1 ответ

Последние версии Tokio-Core не содержат codec модуль, он был перемещен в Tokio-IO. Во время движения он также был переключен с EasyBuf в BytesMut из байтового ящика. С этим изменением ваши буферы войдут как BytesMut который построен для потребления буфера 0-copy. Вы можете использовать буфер, используя split_to(), который вернет BytesMut данных перед позицией, из которой вы разделены, и передвиньте начало буфера к этому указателю. Вот пример из порта Tokio сервера, над которым я работал, rustygear:

pub struct PacketCodec {
    pub data_todo: Option<usize>,
}

type PacketItem = Frame<PacketHeader, BytesMut, io::Error>;
impl PacketHeader {
    pub fn admin_decode(buf: &mut BytesMut) -> Result<Option<PacketItem>, io::Error> {
        let newline = buf[..].iter().position(|b| *b == b'\n');
        if let Some(n) = newline {
            let line = buf.split_to(n);
            buf.split_to(1); // drop the newline itself
            let data_str = match str::from_utf8(&line[..]) {
                Ok(s) => s,
                Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
            };
            info!("admin command data: {:?}", data_str);
            let command = match data_str.trim() {
                "version" => ADMIN_VERSION,
                "status" => ADMIN_STATUS,
                _ => ADMIN_UNKNOWN,
            };
            return Ok(Some(Frame::Message {
                message: PacketHeader {
                    magic: PacketMagic::TEXT,
                    ptype: command,
                    psize: 0,
                },
                body: false,
            }));
        }
        Ok(None) // Wait for more data
    }

    pub fn decode(buf: &mut BytesMut) -> Result<Option<PacketItem>, io::Error> {
        debug!("Decoding {:?}", buf);
        // Peek at first 4
        // Is this a req/res
        if buf.len() < 4 {
            return Ok(None);
        }
        let mut magic_buf: [u8; 4] = [0; 4];
        magic_buf.clone_from_slice(&buf[0..4]);
        let magic = match magic_buf {
            REQ => PacketMagic::REQ,
            RES => PacketMagic::RES,
            // TEXT/ADMIN protocol
            _ => PacketMagic::TEXT,
        };
        debug!("Magic is {:?}", magic);
        if magic == PacketMagic::TEXT {
            debug!("admin protocol detected");
            return PacketHeader::admin_decode(buf);
        }
        if buf.len() < 12 {
            return Ok(None);
        }
        buf.split_to(4);
        // Now get the type
        let ptype = buf.split_to(4).into_buf().get_u32::<BigEndian>();
        debug!("We got a {}", &PTYPES[ptype as usize].name);
        // Now the length
        let psize = buf.split_to(4).into_buf().get_u32::<BigEndian>();
        debug!("Data section is {} bytes", psize);
        Ok(Some(Frame::Message {
            message: PacketHeader {
                magic: magic,
                ptype: ptype,
                psize: psize,
            },
            body: true, // TODO: false for 0 psize?
        }))
    }
}

impl Decoder for PacketCodec {
    type Item = Frame<PacketHeader, BytesMut, io::Error>;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
        match self.data_todo {
            None => {
                match PacketHeader::decode(buf)? {
                    Some(Frame::Message { message, body }) => {
                        self.data_todo = Some(message.psize as usize);
                        Ok(Some(Frame::Message {
                            message: message,
                            body: body,
                        }))
                    }
                    Some(_) => panic!("Expecting Frame::Message, got something else"),
                    None => Ok(None),
                }
            }
            Some(0) => {
                self.data_todo = None;
                Ok(Some(Frame::Body { chunk: None }))
            }
            Some(data_todo) => {
                let chunk_size = min(buf.len(), data_todo);
                self.data_todo = Some(data_todo - chunk_size);
                Ok(Some(Frame::Body { chunk: Some(buf.split_to(chunk_size)) }))
            }
        }
    }
}
Другие вопросы по тегам