C#: Реализация NetworkStream.Peek?
В настоящее время нет NetworkStream.Peek
метод в C#. Каков наилучший способ реализации такого метода, который функционирует так же, как NetworkStream.ReadByte
кроме того, что вернулся byte
на самом деле не удаляется из Stream
?
5 ответов
Если вам не нужно на самом деле получить байт, вы можете обратиться к DataAvailable
имущество.
В противном случае вы можете обернуть его StreamReader
и вызвать его Peek
метод.
Обратите внимание, что ни один из них не является особенно надежным для чтения из сетевого потока из-за проблем с задержкой. Данные могут стать доступными (присутствующими в буфере чтения) в самый момент после просмотра.
Я не уверен, что вы собираетесь делать с этим, но Read
метод на NetworkStream
это блокирующий вызов, поэтому вам не нужно проверять статус, даже если вы получаете порциями. Если вы пытаетесь поддерживать адаптацию приложения во время чтения из потока, вместо этого вы должны использовать поток или асинхронный вызов для получения данных.
Редактировать: Согласно этому сообщению, StreamReader.Peek
глючит на NetworkStream
или, по крайней мере, имеет недокументированное поведение, поэтому будьте осторожны, если вы решите пойти по этому пути.
Обновлено - ответ на комментарии
Понятие "взглянуть" на сам реальный поток фактически невозможно; это просто поток, и как только байт получен, он больше не находится в потоке. Некоторые потоки поддерживают поиск, так что вы можете технически перечитать этот байт, но NetworkStream
не один из них.
Peeking применяется только тогда, когда чтение потока в буфер; как только данные находятся в буфере, просмотр легко, потому что вы просто проверяете все, что находится в текущей позиции в буфере. Вот почему StreamReader
умеет это делать; нет Stream
класс, как правило, будет иметь свой собственный Peek
метод.
Теперь, специально для этой проблемы, я спрашиваю, действительно ли это правильный ответ. Я понимаю идею динамического выбора метода для обработки потока, но вам действительно нужно делать это в сыром потоке? Вы не можете сначала прочитать поток в байтовый массив или даже скопировать его в MemoryStream
и обработать его с этого момента?
Основная проблема, которую я вижу, заключается в том, что если что-то плохое случается, когда вы читаете из сетевого потока, данные исчезают. Но если вы сначала прочитаете его во временное место, вы можете отладить его. Вы можете узнать, что это были за данные и почему объект, который пытался обработать данные, потерпел неудачу на полпути.
В общем, самое первое, что вы хотите сделать с NetworkStream
считывается в локальный буфер. Единственная причина, по которой я могу думать об этом, заключается в том, что вы читаете огромное количество данных - и даже тогда я мог бы рассмотреть возможность использования файловой системы в качестве промежуточного буфера, если она не помещается в памяти.
Я не знаю ваших точных требований, но из того, что я узнал до сих пор, мой совет: не пытайтесь обрабатывать ваши данные непосредственно из NetworkStream
если нет веской причины для этого. Попробуйте сначала прочитать данные в память или на диск, а затем обработать копию.
Я столкнулся с тем же "поиском по магическому номеру", а затем решил, к какому потоковому процессору отправить поток ", и, к сожалению, не могу уклониться от этой проблемы - как это предлагается в комментариях к ответу Ааронаута - путем передачи уже использованных байтов в методы обработки потока в отдельных параметрах, так как эти методы являются данными, и они ожидают System.IO.Stream и ничего больше.
Я решил это, создав более или менее универсальный класс PeekableStream, который оборачивает поток. Это работает для NetworkStreams, но также и для любого другого потока, если вы Stream.CanRead его.
редактировать
В качестве альтернативы, вы можете использовать новый ReadSeekableStream
и делать
var readSeekableStream = new ReadSeekableStream(networkStream, /* >= */ count);
...
readSeekableStream.Read(..., count);
readSeekableStream.Seek(-count, SeekOrigin.Current);
В любом случае, здесь приходит PeekableStream
:
/// <summary>
/// PeekableStream wraps a Stream and can be used to peek ahead in the underlying stream,
/// without consuming the bytes. In other words, doing Peek() will allow you to look ahead in the stream,
/// but it won't affect the result of subsequent Read() calls.
///
/// This is sometimes necessary, e.g. for peeking at the magic number of a stream of bytes and decide which
/// stream processor to hand over the stream.
/// </summary>
public class PeekableStream : Stream
{
private readonly Stream underlyingStream;
private readonly byte[] lookAheadBuffer;
private int lookAheadIndex;
public PeekableStream(Stream underlyingStream, int maxPeekBytes)
{
this.underlyingStream = underlyingStream;
lookAheadBuffer = new byte[maxPeekBytes];
}
protected override void Dispose(bool disposing)
{
if (disposing)
underlyingStream.Dispose();
base.Dispose(disposing);
}
/// <summary>
/// Peeks at a maximum of count bytes, or less if the stream ends before that number of bytes can be read.
///
/// Calls to this method do not influence subsequent calls to Read() and Peek().
///
/// Please note that this method will always peek count bytes unless the end of the stream is reached before that - in contrast to the Read()
/// method, which might read less than count bytes, even though the end of the stream has not been reached.
/// </summary>
/// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and
/// (offset + number-of-peeked-bytes - 1) replaced by the bytes peeked from the current source.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data peeked from the current stream.</param>
/// <param name="count">The maximum number of bytes to be peeked from the current stream.</param>
/// <returns>The total number of bytes peeked into the buffer. If it is less than the number of bytes requested then the end of the stream has been reached.</returns>
public virtual int Peek(byte[] buffer, int offset, int count)
{
if (count > lookAheadBuffer.Length)
throw new ArgumentOutOfRangeException("count", "must be smaller than peekable size, which is " + lookAheadBuffer.Length);
while (lookAheadIndex < count)
{
int bytesRead = underlyingStream.Read(lookAheadBuffer, lookAheadIndex, count - lookAheadIndex);
if (bytesRead == 0) // end of stream reached
break;
lookAheadIndex += bytesRead;
}
int peeked = Math.Min(count, lookAheadIndex);
Array.Copy(lookAheadBuffer, 0, buffer, offset, peeked);
return peeked;
}
public override bool CanRead { get { return true; } }
public override long Position
{
get
{
return underlyingStream.Position - lookAheadIndex;
}
set
{
underlyingStream.Position = value;
lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Position, as that might throw NotSupportedException,
// in which case we don't want to change the lookAhead status
}
}
public override int Read(byte[] buffer, int offset, int count)
{
int bytesTakenFromLookAheadBuffer = 0;
if (count > 0 && lookAheadIndex > 0)
{
bytesTakenFromLookAheadBuffer = Math.Min(count, lookAheadIndex);
Array.Copy(lookAheadBuffer, 0, buffer, offset, bytesTakenFromLookAheadBuffer);
count -= bytesTakenFromLookAheadBuffer;
offset += bytesTakenFromLookAheadBuffer;
lookAheadIndex -= bytesTakenFromLookAheadBuffer;
if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
// copying into same array should be fine, according to http://msdn.microsoft.com/en-us/library/z50k9bft(v=VS.90).aspx :
// "If sourceArray and destinationArray overlap, this method behaves as if the original values of sourceArray were preserved
// in a temporary location before destinationArray is overwritten."
Array.Copy(lookAheadBuffer, lookAheadBuffer.Length - bytesTakenFromLookAheadBuffer + 1, lookAheadBuffer, 0, lookAheadIndex);
}
return count > 0
? bytesTakenFromLookAheadBuffer + underlyingStream.Read(buffer, offset, count)
: bytesTakenFromLookAheadBuffer;
}
public override int ReadByte()
{
if (lookAheadIndex > 0)
{
lookAheadIndex--;
byte firstByte = lookAheadBuffer[0];
if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
Array.Copy(lookAheadBuffer, 1, lookAheadBuffer, 0, lookAheadIndex);
return firstByte;
}
else
{
return underlyingStream.ReadByte();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
long ret = underlyingStream.Seek(offset, origin);
lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Seek(), as that might throw NotSupportedException,
// in which case we don't want to change the lookAhead status
return ret;
}
// from here on, only simple delegations to underlyingStream
public override bool CanSeek { get { return underlyingStream.CanSeek; } }
public override bool CanWrite { get { return underlyingStream.CanWrite; } }
public override bool CanTimeout { get { return underlyingStream.CanTimeout; } }
public override int ReadTimeout { get { return underlyingStream.ReadTimeout; } set { underlyingStream.ReadTimeout = value; } }
public override int WriteTimeout { get { return underlyingStream.WriteTimeout; } set { underlyingStream.WriteTimeout = value; } }
public override void Flush() { underlyingStream.Flush(); }
public override long Length { get { return underlyingStream.Length; } }
public override void SetLength(long value) { underlyingStream.SetLength(value); }
public override void Write(byte[] buffer, int offset, int count) { underlyingStream.Write(buffer, offset, count); }
public override void WriteByte(byte value) { underlyingStream.WriteByte(value); }
}
Если у вас есть доступ к Socket
объект, вы можете попробовать метод Receive, передавая SocketFlags.Peek
, Это аналогично MSG_PEEK
флаг, который может быть передан recv
позвоните в BSD сокеты или Winsock.
Здесь очень просто PeekStream
реализация, которая позволяет вам просматривать определенное количество байтов только в начале потока (в отличие от возможности просмотра в любое время). Просмотренные байты возвращаются как Stream
сами по себе, чтобы минимизировать изменения в существующем коде.
Вот как вы используете это:
Stream nonSeekableStream = ...;
PeekStream peekStream = new PeekStream(nonSeekableStream, 30); // Peek max 30 bytes
Stream initialBytesStream = peekStream.GetInitialBytesStream();
ParseHeaders(initialBytesStream); // Work on initial bytes of nonSeekableStream
peekStream.Read(...) // Read normally, the read will start from the beginning
GetInitialBytesStream()
возвращает искомый поток, содержащий до peekSize
начальные байты базового потока (меньше, если поток короче, чем peekSize
).
Из-за своей простоты чтение PeekStream должно быть только немного медленнее (если вообще), чем чтение основного потока напрямую.
public class PeekStream : Stream
{
private Stream m_stream;
private byte[] m_buffer;
private int m_start;
private int m_end;
public PeekStream(Stream stream, int peekSize)
{
if (stream == null)
{
throw new ArgumentNullException("stream");
}
if (!stream.CanRead)
{
throw new ArgumentException("Stream is not readable.");
}
if (peekSize < 0)
{
throw new ArgumentOutOfRangeException("peekSize");
}
m_stream = stream;
m_buffer = new byte[peekSize];
m_end = stream.Read(m_buffer, 0, peekSize);
}
public override bool CanRead
{
get
{
return true;
}
}
public override bool CanWrite
{
get
{
return false;
}
}
public override bool CanSeek
{
get
{
return false;
}
}
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public MemoryStream GetInitialBytesStream()
{
return new MemoryStream(m_buffer, 0, m_end, false);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
// Validate arguments
if (buffer == null)
{
throw new ArgumentNullException("buffer");
}
if (offset < 0)
{
throw new ArgumentOutOfRangeException("offset");
}
if (offset + count > buffer.Length)
{
throw new ArgumentOutOfRangeException("count");
}
int totalRead = 0;
// Read from buffer
if (m_start < m_end)
{
int toRead = Math.Min(m_end - m_start, count);
Array.Copy(m_buffer, m_start, buffer, offset, toRead);
m_start += toRead;
offset += toRead;
count -= toRead;
totalRead += toRead;
}
// Read from stream
if (count > 0)
{
totalRead += m_stream.Read(buffer, offset, count);
}
// Return total bytes read
return totalRead;
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override int ReadByte()
{
if (m_start < m_end)
{
return m_buffer[m_start++];
}
else
{
return m_stream.ReadByte();
}
}
public override void Flush()
{
m_stream.Flush();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
m_stream.Dispose();
}
base.Dispose(disposing);
}
}
Отказ от ответственности: PeekStream выше взят из работающей программы, но не всесторонне протестирован, поэтому может содержать ошибки. Это работает для меня, но вы могли бы раскрыть некоторые случаи, когда это не удается.
FWIW, вот поток для просмотра по сравнению с потоком без поиска, оптимизированный всего на один байт вперед:
public class OneBytePeekableStream : Stream
{
private readonly bool _disposeStreamOnDispose;
private readonly Stream _stream;
private int _buffer; // byte or -1
private int _bufferLength; // 0 or 1
public OneBytePeekableStream(Stream stream, bool disposeStreamOnDispose)
{
if (stream == null)
throw new ArgumentNullException(nameof(stream));
_stream = stream;
_disposeStreamOnDispose = disposeStreamOnDispose;
}
public override long Length => _stream.Length;
public override bool CanRead => _stream.CanRead;
public override bool CanSeek => _stream.CanSeek;
public override bool CanWrite => _stream.CanWrite;
public override bool CanTimeout => _stream.CanTimeout;
public override int ReadTimeout { get => _stream.ReadTimeout; set => _stream.ReadTimeout = value; }
public override int WriteTimeout { get => _stream.WriteTimeout; set => _stream.WriteTimeout = value; }
public override long Position { get => _stream.Position - _bufferLength; set { _stream.Position = value; _bufferLength = 0; } }
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset));
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count));
if (buffer.Length - offset < count)
throw new ArgumentOutOfRangeException(nameof(count));
if (count == 0)
return 0;
if (_bufferLength == 0)
return _stream.Read(buffer, offset, count);
if (_buffer < 0)
return 0;
_bufferLength = 0;
buffer[offset] = (byte)_buffer;
if (count == 1)
return count;
var read = _stream.Read(buffer, offset + 1, count - 1);
return read + 1;
}
// this is the sole reason of this class
// returns -1 is stream is EOF
public virtual int PeekByte()
{
if (_bufferLength > 0)
return _buffer;
_buffer = _stream.ReadByte();
_bufferLength = 1;
return _buffer;
}
public override int ReadByte()
{
if (_bufferLength == 0)
return _stream.ReadByte();
if (_buffer < 0)
return -1;
_bufferLength = 0;
return _buffer;
}
public override long Seek(long offset, SeekOrigin origin)
{
var ret = _stream.Seek(offset, origin);
_bufferLength = 0;
return ret;
}
public override void Flush() => _stream.Flush();
public override void SetLength(long value) => _stream.SetLength(value);
public override void WriteByte(byte value) => _stream.WriteByte(value);
public override void Write(byte[] buffer, int offset, int count) => _stream.Write(buffer, offset, count);
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_disposeStreamOnDispose)
{
_stream.Dispose();
}
}
base.Dispose(disposing);
}
}