Использование NamedPipeServerStream и NamedPipeClientStream асинхронно
У меня есть следующие требования к архитектуре сервер / клиент:
Напишите сервер / клиент, который работает асинхронно.
Связь должна быть дуплексной, т.е. читать и писать на обоих концах.
Несколько клиентов могут подключиться к серверу в любой момент времени.
Сервер / клиент должен подождать, пока они станут доступными, и, наконец, установить соединение.
Как только клиент подключается, он должен записать в поток.
Затем сервер должен прочитать из потока и записать ответ обратно клиенту.
Наконец, клиент должен прочитать ответ, и связь должна прекратиться.
Итак, имея в виду следующие требования, я написал следующий код, но я не слишком уверен в этом, потому что документы по каналам в некоторой степени отсутствуют, к сожалению, и, похоже, код не работает правильно, он зависает в определенный момент,
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private static async Task Main()
{
List<Task> tasks = new List<Task> {
HandleRequestAsync(),
};
tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));
await Task.WhenAll(tasks);
}
private static async Task HandleRequestAsync()
{
using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous))
{
Console.WriteLine("Waiting...");
await server.WaitForConnectionAsync().ConfigureAwait(false);
if (server.IsConnected)
{
Console.WriteLine("Connected");
if (server.CanRead) {
// Read something...
}
if (server.CanWrite) {
// Write something...
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
}
server.Disconnect();
await HandleRequestAsync().ConfigureAwait(false);
}
}
}
private static async Task SendRequestAsync(int index, int counter, int max)
{
using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
{
await client.ConnectAsync().ConfigureAwait(false);
if (client.IsConnected)
{
Console.WriteLine($"Index: {index} Counter: {counter}");
if (client.CanWrite) {
// Write something...
await client.FlushAsync().ConfigureAwait(false);
client.WaitForPipeDrain();
}
if (client.CanRead) {
// Read something...
}
}
if (counter <= max) {
await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
}
else {
Console.WriteLine($"{index} Done!");
}
}
}
}
}
Предположения:
Я ожидаю, что это сработает, для всех запросов, которые я делаю, когда звоню SendRequestAsync
выполнять одновременно, когда каждый запрос делает дополнительные запросы, пока не достигнет 6
и, наконец, должно быть напечатано "Готово!".
Примечания:
Я протестировал его на.NET Framework 4.7.1 и.NET Core 2.0 и получил те же результаты.
Связь между клиентами и сервером всегда локальна для компьютера, где клиенты являются веб-приложениями, которые могут ставить в очередь некоторые задания, такие как запуск сторонних процессов, и сервер будет развернут как служба Windows на той же машине, что и веб-сервер, который эти клиенты развернуты на.
2 ответа
При отключении, WaitForPipeDrain()
может бросить IOException
из-за сломанной трубы.
Если это происходит на вашем сервере Task
тогда он никогда не будет прослушивать следующее соединение, и все остальные клиентские соединения будут зависать ConnectAsync()
,
Если это происходит в одной из задач клиента, то он не будет продолжать повторять и увеличивать счетчик для этого индекса.
Если вы заверните звонок WaitForPipeDrain()
в try
/catch
программа будет работать вечно, потому что ваша функция HandleRequestAsync()
бесконечно рекурсивен.
Короче говоря, чтобы заставить это работать:
- Справиться
IOException
отWaitForPipeDrain()
HandleRequestAsync()
должен закончить в какой-то момент.
Вот полный код после нескольких итераций:
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private const int MAX_REQUESTS = 1000;
private static void Main()
{
var tasks = new List<Task> {
//Task.Run(() => HandleRequest(0))
HandleRequestAsync(0)
};
tasks.AddRange(Enumerable.Range(0, MAX_REQUESTS).Select(i => Task.Factory.StartNew(() => SendRequest(i), TaskCreationOptions.LongRunning)));
Task.WhenAll(tasks);
Console.ReadKey();
}
private static void HandleRequest(int counter)
{
try {
var server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
Console.WriteLine($"Waiting a client... {counter}");
server.BeginWaitForConnection(WaitForConnectionCallback, server);
}
catch (Exception ex) {
Console.WriteLine(ex);
}
void WaitForConnectionCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
int index = -1;
try {
server.EndWaitForConnection(result);
HandleRequest(++counter);
if (server.IsConnected) {
var request = new byte[4];
server.BeginRead(request, 0, request.Length, ReadCallback, server);
index = BitConverter.ToInt32(request, 0);
Console.WriteLine($"{index} Request.");
var response = BitConverter.GetBytes(index);
server.BeginWrite(response, 0, response.Length, WriteCallback, server);
server.Flush();
server.WaitForPipeDrain();
Console.WriteLine($"{index} Pong.");
server.Disconnect();
Console.WriteLine($"{index} Disconnected.");
}
}
catch (IOException ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
server.Dispose();
}
}
void ReadCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
try {
server.EndRead(result);
}
catch (IOException ex) {
Console.WriteLine(ex);
}
}
void WriteCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
try {
server.EndWrite(result);
}
catch (IOException ex) {
Console.WriteLine(ex);
}
}
}
private static async Task HandleRequestAsync(int counter)
{
NamedPipeServerStream server = null;
int index = -1;
try {
server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
Console.WriteLine($"Waiting a client... {counter}");
await server.WaitForConnectionAsync()
.ContinueWith(async t => await HandleRequestAsync(++counter).ConfigureAwait(false))
.ConfigureAwait(false);
if (server.IsConnected) {
var request = new byte[4];
await server.ReadAsync(request, 0, request.Length).ConfigureAwait(false);
index = BitConverter.ToInt32(request, 0);
Console.WriteLine($"{index} Request.");
var response = BitConverter.GetBytes(index);
await server.WriteAsync(response, 0, response.Length).ConfigureAwait(false);
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
Console.WriteLine($"{index} Pong.");
server.Disconnect();
Console.WriteLine($"{index} Disconnected.");
}
}
catch (IOException ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
server?.Dispose();
}
}
private static void SendRequest(int index)
{
NamedPipeClientStream client = null;
try {
client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.None);
client.Connect();
var request = BitConverter.GetBytes(index);
client.Write(request, 0, request.Length);
client.Flush();
client.WaitForPipeDrain();
Console.WriteLine($"{index} Ping.");
var response = new byte[4];
client.Read(response, 0, response.Length);
index = BitConverter.ToInt32(response, 0);
Console.WriteLine($"{index} Response.");
}
catch (Exception ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
client?.Dispose();
}
}
}
}
Вы можете сортировать сообщения и наблюдать следующее:
Соединения открыты и закрыты правильно.
Данные отправлены и получены правильно.
Наконец, сервер все еще ожидает дальнейших подключений.
Обновления:
ИзмененоPipeOptions.Asynchronous
вPipeOptions.None
в противном случае кажется, что он зависает на время выполнения запросов и только затем обрабатывает их сразу.PipeOptions.Asynchronous просто вызывает порядок выполнения, отличный от PipeOptions.None, и это указывает на состояние гонки / тупик в вашем коде. Вы можете увидеть эффект этого, если вы используете диспетчер задач, например, для отслеживания количества потоков вашего процесса... вы должны видеть, как он набирает скорость со скоростью приблизительно 1 поток в секунду, пока не достигнет примерно 100 потоков (может быть 110 или около того), после чего ваш код выполняется до завершения. Или если вы добавите ThreadPool.SetMinThreads(200, 200) в начале. В вашем коде есть проблема, когда в случае неправильного упорядочения (что становится более вероятным благодаря использованию асинхронного), вы создаете цикл, в котором он не может быть выполнен до тех пор, пока не будет достаточно потоков для запуска всех одновременных ConnectAsyncs, поставленных в очередь вашим основным методом. которые не являются действительно асинхронными и вместо этого просто создают рабочий элемент для вызова синхронного метода Connect (это прискорбно, и именно такие проблемы являются одной из причин, по которым я призываю людей не предоставлять асинхронные API, которые просто ставят в очередь рабочие элементы вызовите методы синхронизации). Источник
Пересмотрел и упростил пример:
Там нет истинного асинхронного
Connect
метод для труб,ConnectAsync
использованияTask.Factory.StartNew
за кулисами, чтобы вы могли использоватьConnect
а затем передать метод (SendRequest
в нашем примере), который вызывает синхронныйConnect
версия дляTask.Factory.StartNew
,Сервер теперь полностью асинхронный и, насколько я могу судить, работает без проблем.
Для сервера добавлены две реализации, одна из которых использует обратные вызовы, а другая использует преимущество над асинхронной / ожидающей функцией только потому, что я не смог найти хороший пример для этих двух.
Я надеюсь, что это помогает.