.NET Асинхронный поток чтения / записи
Я пытался решить это упражнение "Параллельное программирование" (в C#):
Знаю это
Stream
класс содержитint Read(byte[] buffer, int offset, int size)
а такжеvoid Write(byte[] buffer, int offset, int size)
методы, реализующие в C#NetToFile
метод, который копирует все данные, полученные отNetworkStream net
экземпляр кFileStream file
пример. Чтобы выполнить передачу, используйте асинхронное чтение и синхронную запись, избегая блокировки одного потока во время операций чтения. Передача заканчивается, когдаnet
Операция чтения возвращает значение 0. Для упрощения, нет необходимости поддерживать контролируемую отмену операции.
void NetToFile(NetworkStream net, FileStream file);
Я пытался решить это упражнение, но я борюсь с вопросом, связанным с самим вопросом. Но сначала вот мой код:
public static void NetToFile(NetworkStream net, FileStream file) {
byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
int offset = 0; // read/write offset
int nBytesRead = 0; // number of bytes read on each cycle
IAsyncResult ar;
do {
// read partial content of net (asynchronously)
ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
// wait until read is completed
ar.AsyncWaitHandle.WaitOne();
// get number of bytes read on each cycle
nBytesRead = net.EndRead(ar);
// write partial content to file (synchronously)
fs.Write(buffer,offset,nBytesRead);
// update offset
offset += nBytesRead;
}
while( nBytesRead > 0);
}
Вопрос, который у меня есть, заключается в том, что в постановке вопроса сказано:
Для передачи используйте асинхронное чтение и синхронную запись, избегая блокировки одного потока во время операций чтения.
Я не совсем уверен, что мое решение выполняет то, что нужно в этом упражнении, потому что я использую AsyncWaitHandle.WaitOne()
ждать завершения асинхронного чтения.
С другой стороны, я не совсем понимаю, что в этом сценарии должно быть "неблокирующим" решением, так как FileStream
запись должна выполняться синхронно... и для этого нужно подождать, пока NetworkStream
чтение завершено, чтобы продолжить FileStream
пишу, не так ли?
Можете ли вы помочь мне с этим?
[РЕДАКТИРОВАТЬ 1 ] Использование решения обратного вызова
Хорошо, если я понял, что ответили Mitchel Sellers и willvv, мне посоветовали использовать метод обратного вызова, чтобы превратить это в "неблокирующее" решение. Вот мой код:
byte[] buffer; // buffer
public static void NetToFile(NetworkStream net, FileStream file) {
// buffer with same dimension as file stream data
buffer = new byte[file.Length];
//start asynchronous read
net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}
//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
//NetworkStream retrieve
NetworkStream net = (NetworkStream) ar.IAsyncState;
//get number of bytes read
int nBytesRead = net.EndRead(ar);
//write content to file
//... and now, how do I write to FileStream instance without
//having its reference??
//fs.Write(buffer,0,nBytesRead);
}
Как вы могли заметить, я застрял в методе обратного вызова, так как у меня нет ссылки на FileStream
экземпляр, где я хочу вызвать метод "Write(...)".
Кроме того, это не многопоточное решение, так как byte[]
поле открыто и может быть разделено между NetToFile
вызовы. Я не знаю, как решить эту проблему, не подвергая это byte[]
поле во внешней области видимости... и я почти уверен, что это не может быть выставлено таким образом.
Я не хочу использовать лямбда-решение или метод анонимного метода, потому что это не входит в программу курса "Параллельное программирование".
6 ответов
Вам понадобится использовать обратный вызов из чтения NetStream для обработки этого. И, честно говоря, может быть проще обернуть логику копирования в свой собственный класс, чтобы вы могли поддерживать экземпляр активных потоков.
Вот как я подхожу к этому (не проверено):
public class Assignment1
{
public static void NetToFile(NetworkStream net, FileStream file)
{
var copier = new AsyncStreamCopier(net, file);
copier.Start();
}
public static void NetToFile_Option2(NetworkStream net, FileStream file)
{
var completedEvent = new ManualResetEvent(false);
// copy as usual but listen for completion
var copier = new AsyncStreamCopier(net, file);
copier.Completed += (s, e) => completedEvent.Set();
copier.Start();
completedEvent.WaitOne();
}
/// <summary>
/// The Async Copier class reads the input Stream Async and writes Synchronously
/// </summary>
public class AsyncStreamCopier
{
public event EventHandler Completed;
private readonly Stream input;
private readonly Stream output;
private byte[] buffer = new byte[4096];
public AsyncStreamCopier(Stream input, Stream output)
{
this.input = input;
this.output = output;
}
public void Start()
{
GetNextChunk();
}
private void GetNextChunk()
{
input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
}
private void InputReadComplete(IAsyncResult ar)
{
// input read asynchronously completed
int bytesRead = input.EndRead(ar);
if (bytesRead == 0)
{
RaiseCompleted();
return;
}
// write synchronously
output.Write(buffer, 0, bytesRead);
// get next
GetNextChunk();
}
private void RaiseCompleted()
{
if (Completed != null)
{
Completed(this, EventArgs.Empty);
}
}
}
}
Даже если это идет против зерна, чтобы помочь людям с домашней работой, учитывая, что этому больше года, вот правильный способ сделать это. Все, что вам нужно для перекрытия ваших операций чтения / записи - не требует создания дополнительных потоков или чего-либо еще.
public static class StreamExtensions
{
private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
public static void CopyTo( this Stream input , Stream output )
{
input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
return ;
}
public static void CopyTo( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
IAsyncResult read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
IAsyncResult write = null ;
while ( true )
{
// wait for the read operation to complete
read.AsyncWaitHandle.WaitOne() ;
bufl[bufno] = input.EndRead(read) ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break ;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won't exist is after the very first read operation completes
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
// start the new write operation
write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won't exist is if the input stream is empty.
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
output.Flush() ;
// return to the caller ;
return ;
}
public static async Task CopyToAsync( this Stream input , Stream output )
{
await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
return;
}
public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
Task<int> read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
Task write = null ;
while ( true )
{
await read ;
bufl[bufno] = read.Result ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won't exist is after the very first read operation completes
if ( write != null )
{
await write ;
}
// start the new write operation
write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won't exist is if the input stream is empty.
if ( write != null )
{
await write;
}
output.Flush();
// return to the caller ;
return;
}
}
Приветствия.
Я сомневаюсь, что это самый быстрый код (есть некоторые издержки от абстракции.NET Task), но я думаю, что это более чистый подход ко всему асинхронному копированию.
Мне нужен был CopyTransformAsync
где я мог передать делегату что-то сделать, так как чанки проходили через операцию копирования. например, вычислить дайджест сообщения при копировании. Вот почему я заинтересовался катанием своего собственного варианта.
Выводы:
- CopyToAsync bufferSize является чувствительным (требуется большой буфер)
- FileOptions.Asynchronous -> делает его ужасно медленным (не знаю точно, почему это так)
- BufferSize объектов FileStream может быть меньше (это не так важно)
Serial
тест явно самый быстрый и самый ресурсоемкий
Вот что я нашел и полный исходный код программы, которую я использовал для проверки этого. На моей машине эти тесты выполнялись на диске SSD и являются эквивалентом копии файла. Обычно вы не захотите использовать это только для копирования файлов, вместо этого, когда у вас есть сетевой поток (что и является моим вариантом использования), именно тогда вы захотите использовать что-то подобное.
4K buffer
Serial... in 0.474s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... timed out
CopyTransformAsync (Asynchronous)... timed out
8K buffer
Serial... in 0.344s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 1.116s
CopyTransformAsync (Asynchronous)... timed out
40K buffer
Serial... in 0.195s
CopyToAsync... in 0.624s
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 0.378s
CopyTransformAsync (Asynchronous)... timed out
80K buffer
Serial... in 0.190s
CopyToAsync... in 0.355s
CopyToAsync (Asynchronous)... in 1.196s
CopyTransformAsync... in 0.300s
CopyTransformAsync (Asynchronous)... in 0.886s
160K buffer
Serial... in 0.432s
CopyToAsync... in 0.252s
CopyToAsync (Asynchronous)... in 0.454s
CopyTransformAsync... in 0.447s
CopyTransformAsync (Asynchronous)... in 0.555s
Здесь вы можете увидеть Process Explorer, график производительности при запуске теста. По сути, каждая вершина (на нижнем из трех графиков) является началом последовательного теста. Вы можете ясно видеть, как пропускная способность резко увеличивается с ростом размера буфера. Казалось бы, как будто он планирует где-то около 80 КБ, что является.NET Framework CopyToAsync
Метод использует, внутренне.
Приятно, что окончательная реализация не была такой сложной:
static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
, Stream outputStream
, Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
)
{
var temp = new byte[bufferSize];
var temp2 = new byte[bufferSize];
int i = 0;
var readTask = inputStream
.ReadAsync(temp, 0, bufferSize)
.ConfigureAwait(false);
var writeTask = CompletedTask.ConfigureAwait(false);
for (; ; )
{
// synchronize read
int read = await readTask;
if (read == 0)
{
break;
}
if (i++ > 0)
{
// synchronize write
await writeTask;
}
var chunk = new ArraySegment<byte>(temp, 0, read);
// do transform (if any)
if (!(transform == null))
{
chunk = transform(chunk);
}
// queue write
writeTask = outputStream
.WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
.ConfigureAwait(false);
// queue read
readTask = inputStream
.ReadAsync(temp2, 0, bufferSize)
.ConfigureAwait(false);
// swap buffer
var temp3 = temp;
temp = temp2;
temp2 = temp3;
}
await writeTask; // complete any lingering write task
}
Этот метод чередования чтения / записи, несмотря на огромные буферы, где-то на 18% быстрее, чем BCL CopyToAsync
,
Из любопытства я изменил асинхронные вызовы на типичные начальные / конечные асинхронные вызовы шаблонов, и это не улучшило ситуацию ни на один бит, а только ухудшило ситуацию. Для всех, кого я люблю использовать для абстракции Task, они делают некоторые изящные вещи, когда вы пишете код с ключевыми словами async / await, и читать этот код гораздо приятнее!
Вау, это все очень сложно! Вот мое асинхронное решение, и это всего лишь одна функция. Read() и BeginWrite() работают одновременно.
/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
/// and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
// This stream copy supports a source-read happening at the same time
// as target-write. A simpler implementation would be to use just
// Write() instead of BeginWrite(), at the cost of speed.
byte[] readbuffer = new byte[4096];
byte[] writebuffer = new byte[4096];
IAsyncResult asyncResult = null;
for (; ; )
{
// Read data into the readbuffer. The previous call to BeginWrite, if any,
// is executing in the background..
int read = source.Read(readbuffer, 0, readbuffer.Length);
// Ok, we have read some data and we're ready to write it, so wait here
// to make sure that the previous write is done before we write again.
if (asyncResult != null)
{
// This should work down to ~0.01kb/sec
asyncResult.AsyncWaitHandle.WaitOne(60000);
target.EndWrite(asyncResult); // Last step to the 'write'.
if (!asyncResult.IsCompleted) // Make sure the write really completed.
throw new IOException("Stream write failed.");
}
if (read <= 0)
return; // source stream says we're done - nothing else to read.
// Swap the read and write buffers so we can write what we read, and we can
// use the then use the other buffer for our next read.
byte[] tbuf = writebuffer;
writebuffer = readbuffer;
readbuffer = tbuf;
// Asynchronously write the data, asyncResult.AsyncWaitHandle will
// be set when done.
asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
}
}
Странно, что никто не упомянул TPL.
Вот очень хороший пост от команды PFX (Стивен Тауб) о том, как реализовать одновременную асинхронную потоковую копию. Пост содержит устаревшие ссылки на образцы, так что следующий:
Получите дополнения Parallel Extensions из code.msdn затем
var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();
Также рассмотрите возможность использования AsyncEnumerator Дж. Ричера.
Вы правы, то, что вы делаете, это в основном синхронное чтение, потому что вы используете метод WaitOne(), и он просто останавливает выполнение до тех пор, пока данные не будут готовы, это в основном то же самое, что делать это с использованием Read() вместо BeginRead() и EndRead().
Что вам нужно сделать, это использовать аргумент обратного вызова в методе BeginRead(), с его помощью вы определяете метод обратного вызова (или лямбда-выражение), этот метод будет вызываться, когда информация будет прочитана (в методе обратного вызова, который вы необходимо проверить конец потока и записать в выходной поток), таким образом, вы не будете блокировать основной поток (вам не понадобятся WaitOne() и EndRead().
Надеюсь это поможет.