Использование NamedPipeServerStream и NamedPipeClientStream асинхронно

У меня есть следующие требования к архитектуре сервер / клиент:

  1. Напишите сервер / клиент, который работает асинхронно.

  2. Связь должна быть дуплексной, т.е. читать и писать на обоих концах.

  3. Несколько клиентов могут подключиться к серверу в любой момент времени.

  4. Сервер / клиент должен подождать, пока они станут доступными, и, наконец, установить соединение.

  5. Как только клиент подключается, он должен записать в поток.

  6. Затем сервер должен прочитать из потока и записать ответ обратно клиенту.

  7. Наконец, клиент должен прочитать ответ, и связь должна прекратиться.

Итак, имея в виду следующие требования, я написал следующий код, но я не слишком уверен в этом, потому что документы по каналам в некоторой степени отсутствуют, к сожалению, и, похоже, код не работает правильно, он зависает в определенный момент,

namespace PipesAsyncAwait471
{
    using System;
    using System.Collections.Generic;
    using System.IO.Pipes;
    using System.Linq;
    using System.Threading.Tasks;

    internal class Program
    {
        private static async Task Main()
        {
            List<Task> tasks = new List<Task> {
                HandleRequestAsync(),
            };

            tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));

            await Task.WhenAll(tasks);
        }

        private static async Task HandleRequestAsync()
        {
            using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
                                                                            PipeDirection.InOut,
                                                                            NamedPipeServerStream.MaxAllowedServerInstances,
                                                                            PipeTransmissionMode.Message,
                                                                            PipeOptions.Asynchronous))
            {
                Console.WriteLine("Waiting...");

                await server.WaitForConnectionAsync().ConfigureAwait(false);

                if (server.IsConnected)
                {
                    Console.WriteLine("Connected");

                    if (server.CanRead) {
                        // Read something...
                    }

                    if (server.CanWrite) {
                        // Write something... 

                        await server.FlushAsync().ConfigureAwait(false);

                        server.WaitForPipeDrain();
                    }

                    server.Disconnect();

                    await HandleRequestAsync().ConfigureAwait(false);
                }
            }
        }

        private static async Task SendRequestAsync(int index, int counter, int max)
        {
            using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
            {
                await client.ConnectAsync().ConfigureAwait(false);

                if (client.IsConnected)
                {
                    Console.WriteLine($"Index: {index} Counter: {counter}");

                    if (client.CanWrite) {
                        // Write something...

                        await client.FlushAsync().ConfigureAwait(false);

                        client.WaitForPipeDrain();
                    }

                    if (client.CanRead) {
                        // Read something...
                    }
                }

                if (counter <= max) {
                    await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
                }
                else {
                    Console.WriteLine($"{index} Done!");
                }
            }
        }
    }
}

Предположения:

Я ожидаю, что это сработает, для всех запросов, которые я делаю, когда звоню SendRequestAsync выполнять одновременно, когда каждый запрос делает дополнительные запросы, пока не достигнет 6 и, наконец, должно быть напечатано "Готово!".

Примечания:

  1. Я протестировал его на.NET Framework 4.7.1 и.NET Core 2.0 и получил те же результаты.

  2. Связь между клиентами и сервером всегда локальна для компьютера, где клиенты являются веб-приложениями, которые могут ставить в очередь некоторые задания, такие как запуск сторонних процессов, и сервер будет развернут как служба Windows на той же машине, что и веб-сервер, который эти клиенты развернуты на.

2 ответа

Решение

При отключении, WaitForPipeDrain() может бросить IOException из-за сломанной трубы.

Если это происходит на вашем сервере Taskтогда он никогда не будет прослушивать следующее соединение, и все остальные клиентские соединения будут зависать ConnectAsync(),

Если это происходит в одной из задач клиента, то он не будет продолжать повторять и увеличивать счетчик для этого индекса.

Если вы заверните звонок WaitForPipeDrain() в try/catchпрограмма будет работать вечно, потому что ваша функция HandleRequestAsync() бесконечно рекурсивен.

Короче говоря, чтобы заставить это работать:

  1. Справиться IOException от WaitForPipeDrain()
  2. HandleRequestAsync() должен закончить в какой-то момент.

Вот полный код после нескольких итераций:

namespace PipesAsyncAwait471
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.IO.Pipes;
    using System.Linq;
    using System.Threading.Tasks;

    internal class Program
    {
        private const int MAX_REQUESTS = 1000;

        private static void Main()
        {
            var tasks = new List<Task> {
                //Task.Run(() => HandleRequest(0))
                HandleRequestAsync(0)
            };

            tasks.AddRange(Enumerable.Range(0, MAX_REQUESTS).Select(i => Task.Factory.StartNew(() => SendRequest(i), TaskCreationOptions.LongRunning)));

            Task.WhenAll(tasks);

            Console.ReadKey();
        }

        private static void HandleRequest(int counter)
        {
            try {
                var server = new NamedPipeServerStream("MyPipe",
                                                    PipeDirection.InOut,
                                                    NamedPipeServerStream.MaxAllowedServerInstances,
                                                    PipeTransmissionMode.Message,
                                                    PipeOptions.Asynchronous);

                Console.WriteLine($"Waiting a client... {counter}");

                server.BeginWaitForConnection(WaitForConnectionCallback, server);
            }
            catch (Exception ex) {
                Console.WriteLine(ex);
            }

            void WaitForConnectionCallback(IAsyncResult result)
            {
                var server = (NamedPipeServerStream)result.AsyncState;

                int index = -1;

                try {
                    server.EndWaitForConnection(result);

                    HandleRequest(++counter);

                    if (server.IsConnected) {
                        var request = new byte[4];
                        server.BeginRead(request, 0, request.Length, ReadCallback, server);
                        index = BitConverter.ToInt32(request, 0);
                        Console.WriteLine($"{index} Request.");

                        var response = BitConverter.GetBytes(index);
                        server.BeginWrite(response, 0, response.Length, WriteCallback, server);
                        server.Flush();
                        server.WaitForPipeDrain();
                        Console.WriteLine($"{index} Pong.");

                        server.Disconnect();
                        Console.WriteLine($"{index} Disconnected.");
                    }
                }
                catch (IOException ex) {
                    Console.WriteLine($"{index}\n\t{ex}");
                }
                finally {
                    server.Dispose();
                }
            }

            void ReadCallback(IAsyncResult result) 
            {
                var server = (NamedPipeServerStream)result.AsyncState;

                try {
                    server.EndRead(result);
                }
                catch (IOException ex) {
                    Console.WriteLine(ex);
                }
            }

            void WriteCallback(IAsyncResult result) 
            {
                var server = (NamedPipeServerStream)result.AsyncState;

                try {
                    server.EndWrite(result);
                }
                catch (IOException ex) {
                    Console.WriteLine(ex);
                }
            }
        }

        private static async Task HandleRequestAsync(int counter)
        {
            NamedPipeServerStream server = null;

            int index = -1;

            try {
                server = new NamedPipeServerStream("MyPipe",
                                                PipeDirection.InOut,
                                                NamedPipeServerStream.MaxAllowedServerInstances,
                                                PipeTransmissionMode.Message,
                                                PipeOptions.Asynchronous);

                Console.WriteLine($"Waiting a client... {counter}");

                await server.WaitForConnectionAsync()
                            .ContinueWith(async t => await HandleRequestAsync(++counter).ConfigureAwait(false))
                            .ConfigureAwait(false);

                if (server.IsConnected) {
                    var request = new byte[4];
                    await server.ReadAsync(request, 0, request.Length).ConfigureAwait(false);
                    index = BitConverter.ToInt32(request, 0);
                    Console.WriteLine($"{index} Request.");

                    var response = BitConverter.GetBytes(index);
                    await server.WriteAsync(response, 0, response.Length).ConfigureAwait(false);
                    await server.FlushAsync().ConfigureAwait(false);
                    server.WaitForPipeDrain();
                    Console.WriteLine($"{index} Pong.");

                    server.Disconnect();
                    Console.WriteLine($"{index} Disconnected.");
                }
            }
            catch (IOException ex) {
                Console.WriteLine($"{index}\n\t{ex}");
            }
            finally {
                server?.Dispose();
            }
        }

        private static void SendRequest(int index)
        {
            NamedPipeClientStream client = null;

            try {
                client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.None);

                client.Connect();

                var request = BitConverter.GetBytes(index);
                client.Write(request, 0, request.Length);
                client.Flush();
                client.WaitForPipeDrain();
                Console.WriteLine($"{index} Ping.");

                var response = new byte[4];
                client.Read(response, 0, response.Length);
                index = BitConverter.ToInt32(response, 0);
                Console.WriteLine($"{index} Response.");
            }
            catch (Exception ex) {
                Console.WriteLine($"{index}\n\t{ex}");
            }
            finally {
                client?.Dispose();
            }
        }
    }
}

Вы можете сортировать сообщения и наблюдать следующее:

  1. Соединения открыты и закрыты правильно.

  2. Данные отправлены и получены правильно.

  3. Наконец, сервер все еще ожидает дальнейших подключений.

Обновления:

  1. Изменено PipeOptions.Asynchronous в PipeOptions.None в противном случае кажется, что он зависает на время выполнения запросов и только затем обрабатывает их сразу.

    PipeOptions.Asynchronous просто вызывает порядок выполнения, отличный от PipeOptions.None, и это указывает на состояние гонки / тупик в вашем коде. Вы можете увидеть эффект этого, если вы используете диспетчер задач, например, для отслеживания количества потоков вашего процесса... вы должны видеть, как он набирает скорость со скоростью приблизительно 1 поток в секунду, пока не достигнет примерно 100 потоков (может быть 110 или около того), после чего ваш код выполняется до завершения. Или если вы добавите ThreadPool.SetMinThreads(200, 200) в начале. В вашем коде есть проблема, когда в случае неправильного упорядочения (что становится более вероятным благодаря использованию асинхронного), вы создаете цикл, в котором он не может быть выполнен до тех пор, пока не будет достаточно потоков для запуска всех одновременных ConnectAsyncs, поставленных в очередь вашим основным методом. которые не являются действительно асинхронными и вместо этого просто создают рабочий элемент для вызова синхронного метода Connect (это прискорбно, и именно такие проблемы являются одной из причин, по которым я призываю людей не предоставлять асинхронные API, которые просто ставят в очередь рабочие элементы вызовите методы синхронизации). Источник

  2. Пересмотрел и упростил пример:

    1. Там нет истинного асинхронного Connect метод для труб, ConnectAsync использования Task.Factory.StartNew за кулисами, чтобы вы могли использовать Connect а затем передать метод (SendRequest в нашем примере), который вызывает синхронный Connect версия для Task.Factory.StartNew,

    2. Сервер теперь полностью асинхронный и, насколько я могу судить, работает без проблем.

    3. Для сервера добавлены две реализации, одна из которых использует обратные вызовы, а другая использует преимущество над асинхронной / ожидающей функцией только потому, что я не смог найти хороший пример для этих двух.

Я надеюсь, что это помогает.

Другие вопросы по тегам