Использование портов завершения ввода-вывода из нескольких сокетов в ограниченном пуле потоков в C#

Я пытаюсь прослушивать UDP-пакеты от различных входящих портов (~20). Я хотел бы посвятить ~3-5 потоков для получения и обработки этих пакетов. Это похоже на идеальную ситуацию для портов завершения ввода-вывода в Windows. Чего я не понимаю, так это как сделать многократное сопоставление нескольких сокетов, чтобы проверить меньший набор потоков.

Следующий код создает все мои сокеты и начинает асинхронную операцию получения.

for(int ix = 0; ix < 20; ix++)
{
    var socket = new Socket(AddressFamily.InterNetwork,
                            SocketType.Dgram, ProtocolType.Udp);
    socket.Bind(new IPEndPoint(IPAddress.Any, ix+6000));
    var e = new SocketAsyncEventArgs();
    e.Completed+=OnReceive;
    e.SetBuffer(buffer, ix*1024*1024, 1024*1024);
    socket.ReceiveFromAsync(e);
    _sockets.Add(socket);
}

Я понимаю, что каждое сообщение OnReceive будет вызываться при получении пакета...

static void OnReceive(object sender, SocketAsyncEventArgs e)
{
    Console.WriteLine("Received {0} bytes", e.BytesTransfered);
    if(!((Socket)sender).ReceiveFromAsync(e))
        e_Completed(sender, e);
}
  1. Как я могу ограничить количество потоков, запускающих события OnReceive?
  2. Каков наилучший способ предотвратить переполнение стека в том редком случае, когда метод OnReceive рекурсивно вызывает себя слишком много раз?

1 ответ

Не уверен, что я понимаю, что вы имеете в виду с темами для получения и обработки. Прием происходит в фоновом режиме.

В любом случае, я бы использовал BlockingCollection.

OnReceive может выглядеть примерно так

private static BlockingCollection<byte[]> _received = new ...

static void OnReceive(object sender, SocketAsyncEventArgs e) {
    byte[] data = new byte[e.BytesTransfered];
    Array.Copy(e.buffer, e.Offset, data, 0, e.BytesTransfered);
    _received.Add(data)
    ...
}

Затем вы просто используете TPL/PLinq для обработки полученных данных по запрошенному количеству потоков.

var parallelOptions = new ParalellOptions { MaxDegreeOfParallelism = 3 };
Parallel.ForEach(_received.GetConsumingPartitioner(),
                parallelOptions, 
                data => {
                    // do processing
                    ...
                });
Другие вопросы по тегам