Использование портов завершения ввода-вывода из нескольких сокетов в ограниченном пуле потоков в C#
Я пытаюсь прослушивать UDP-пакеты от различных входящих портов (~20). Я хотел бы посвятить ~3-5 потоков для получения и обработки этих пакетов. Это похоже на идеальную ситуацию для портов завершения ввода-вывода в Windows. Чего я не понимаю, так это как сделать многократное сопоставление нескольких сокетов, чтобы проверить меньший набор потоков.
Следующий код создает все мои сокеты и начинает асинхронную операцию получения.
for(int ix = 0; ix < 20; ix++)
{
var socket = new Socket(AddressFamily.InterNetwork,
SocketType.Dgram, ProtocolType.Udp);
socket.Bind(new IPEndPoint(IPAddress.Any, ix+6000));
var e = new SocketAsyncEventArgs();
e.Completed+=OnReceive;
e.SetBuffer(buffer, ix*1024*1024, 1024*1024);
socket.ReceiveFromAsync(e);
_sockets.Add(socket);
}
Я понимаю, что каждое сообщение OnReceive будет вызываться при получении пакета...
static void OnReceive(object sender, SocketAsyncEventArgs e)
{
Console.WriteLine("Received {0} bytes", e.BytesTransfered);
if(!((Socket)sender).ReceiveFromAsync(e))
e_Completed(sender, e);
}
- Как я могу ограничить количество потоков, запускающих события OnReceive?
- Каков наилучший способ предотвратить переполнение стека в том редком случае, когда метод OnReceive рекурсивно вызывает себя слишком много раз?
1 ответ
Не уверен, что я понимаю, что вы имеете в виду с темами для получения и обработки. Прием происходит в фоновом режиме.
В любом случае, я бы использовал BlockingCollection.
OnReceive может выглядеть примерно так
private static BlockingCollection<byte[]> _received = new ...
static void OnReceive(object sender, SocketAsyncEventArgs e) {
byte[] data = new byte[e.BytesTransfered];
Array.Copy(e.buffer, e.Offset, data, 0, e.BytesTransfered);
_received.Add(data)
...
}
Затем вы просто используете TPL/PLinq для обработки полученных данных по запрошенному количеству потоков.
var parallelOptions = new ParalellOptions { MaxDegreeOfParallelism = 3 };
Parallel.ForEach(_received.GetConsumingPartitioner(),
parallelOptions,
data => {
// do processing
...
});