NetMQ PUSH сокет блокируется на неопределенный срок, когда он достигает HWM
Я использую NetMQ (Nuget 3.3.2.2) в.NET 4.5, и у меня есть один быстрый процесс генератора с сокетом PUSH и один медленный потребительский процесс, использующий сокет PULL. Если я отправляю достаточно сообщений, чтобы попасть в отправляющий HWM, процесс отправки блокирует поток на неопределенный срок.
Придуманный код (генератор), который иллюстрирует проблему:
using (var ctx = NetMQContext.Create())
using (var pushSocket = ctx.CreatePushSocket())
{
pushSocket.Connect("tcp://127.0.0.1:42404");
var template = GenerateMessageBody(i);
for (int i = 1; i <= 100000; i++)
{
pushSocket.SendMoreFrame("SampleMessage").SendFrame(Messages.SerializeToByteArray(template));
if (i % 1000 == 0)
Console.WriteLine("Sent " + i + " messages");
}
Console.WriteLine("All finished");
Console.ReadKey();
}
В моей конфигурации это обычно сообщает, что отправлено около 5000 или 6000 сообщений, а затем просто блокируется. Если я установил для HWM отправки большое значение (или 0), он отправит все сообщения, как ожидалось.
Похоже, что он ждет получения другой команды, прежде чем попытаться снова, здесь: (SocketBase.TrySend)
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
// If timeout is reached in the meantime, return EAGAIN.
while (true)
{
ProcessCommands(timeoutMillis, false);
Из того, что я прочитал в руководстве 0MQ, блокировка полной сокета PUSH - это правильное поведение (и именно это я и хочу), однако я ожидаю, что оно восстановится после того, как потребитель очистит свою очередь.
Если не считать использования какого-либо шаблона TrySend и работы с блоком самостоятельно, есть ли какая-либо опция, которую я могу установить, или какая-либо другая возможность, которую я могу использовать, чтобы PUSH-сокет пытался периодически пересылать заблокированные сообщения?