PGM Получать очень медленно, вызывая удаление сообщений?
Я смотрю в ZeroMQ для его поддержки PGM. Запуск в Windows (в VirtualBox с MacOS в качестве хоста, если это может иметь значение) с использованием библиотеки NetMQ.
Тест, который я хочу сделать, очень прост: посылать сообщения от А до Б как можно быстрее...
Сначала я использовал TCP в качестве транспорта; это легко достигало>150 000 сообщений в секунду, причем два получателя не отставали. Тогда я хотел проверить PGM; все, что я сделал, это заменил адрес "tcp://*:5556" на "pgm://239.0.0.1:5557" с обеих сторон.
Теперь тесты PGM дают очень странные результаты: отправитель легко достигает>200 000 сообщений / с; хотя получателю удается обработать только около 500 сообщений в секунду!
Итак, я не понимаю, что происходит. После замедления отправителя (спит 10 мс после каждого сообщения, так как в противном случае практически невозможно исследовать поток) мне кажется, что получатель пытается не отставать, сначала видит каждое проходящее сообщение, затем задыхается, пропускает диапазон сообщений, затем снова пытается не отставать... Я играл с настройками HWM и Recovery Interval, но это, похоже, не имело большого значения (?!).
Кто-нибудь может объяснить, что происходит?
Большое спасибо, Фредерик
Примечание. Не уверен, что это важно: насколько я понимаю, я не использую OpenPGM - я просто загружаю настройку ZeroMQ и включаю поддержку многоадресной рассылки в Windows.
Это код отправителя:
class MassSender
{
private const string TOPIC_PREFIX = "Hello:";
private static int messageCounter = 0;
private static int timerCounter = 0;
public static void Main(string[] args)
{
Timer timer = new Timer(1000);
timer.Elapsed += timer_Elapsed;
SendMessages_0MQ_NetMQ(timer);
}
private static void SendMessages_0MQ_NetMQ(Timer timer)
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket publisher = context.CreateSocket(ZmqSocketType.Pub))
{
//publisher.Bind("tcp://*:5556");
publisher.Bind("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.
timer.Start();
while (true)
{
string message = GetMessage();
byte[] body = Encoding.UTF8.GetBytes(message);
publisher.Send(body);
}
}
}
}
private static string GetMessage()
{
return TOPIC_PREFIX + "Message " + (++messageCounter).ToString();
}
static void timer_Elapsed(object sender, ElapsedEventArgs e)
{
Console.WriteLine("=== SENT {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s ===", messageCounter, messageCounter / ++timerCounter);
}
}
и получатель:
class MassReceiver
{
private const string TOPIC_PREFIX = "Hello:";
private static int messageCounter = 0;
private static int timerCounter = 0;
private static string lastMessage = String.Empty;
static void Main(string[] args)
{
// Assume that sender and receiver are started simultaneously.
Timer timer = new Timer(1000);
timer.Elapsed += timer_Elapsed;
ReceiveMessages_0MQ_NetMQ(timer);
}
private static void ReceiveMessages_0MQ_NetMQ(Timer timer)
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket subscriber = context.CreateSocket(ZmqSocketType.Sub))
{
subscriber.Subscribe(""); // Subscribe to everything
//subscriber.Connect("tcp://localhost:5556");
subscriber.Connect("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.
timer.Start();
while (true)
{
messageCounter++;
byte[] body = subscriber.Receive();
string message = Encoding.UTF8.GetString(body);
lastMessage = message; // Only show message when timer elapses, otherwise throughput drops dramatically.
}
}
}
}
static void timer_Elapsed(object sender, ElapsedEventArgs e)
{
Console.WriteLine("=== RECEIVED {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s === (Last: {2})", messageCounter, messageCounter / ++timerCounter, lastMessage);
}
}
2 ответа
Каков размер каждого сообщения?
Вы не используете OpenPGM, вы используете то, что называется ms-pgm (реализация PGM от Microsoft).
В любом случае вам может потребоваться изменить MulticastRate сокета (по умолчанию он равен 100 Кбит / с).
Кроме того, какую сеть вы используете?
Я столкнулся с той же проблемой, отправитель может отправлять тысячи сообщений в секунду. Но мой получатель может получать только двести сообщений в секунду.
Я думаю, что это может быть отправка или получение ограничено. я проверяю
ZMQ_RATE:
Установите скорость многоадресной передачи данных в http://api.zeromq.org/3-0:zmq-setsockopt
Скорость по умолчанию составляет всего 100 КБ / с.
Когда я увеличиваю его до 1 Гбит / с, все в порядке.
const int rate = 1000000; // 1Gb TX- and RX- rate
m_socket.setsockopt(ZMQ_RATE, &rate, sizeof(rate));