PGM Получать очень медленно, вызывая удаление сообщений?

Я смотрю в ZeroMQ для его поддержки PGM. Запуск в Windows (в VirtualBox с MacOS в качестве хоста, если это может иметь значение) с использованием библиотеки NetMQ.

Тест, который я хочу сделать, очень прост: посылать сообщения от А до Б как можно быстрее...

Сначала я использовал TCP в качестве транспорта; это легко достигало>150 000 сообщений в секунду, причем два получателя не отставали. Тогда я хотел проверить PGM; все, что я сделал, это заменил адрес "tcp://*:5556" на "pgm://239.0.0.1:5557" с обеих сторон.

Теперь тесты PGM дают очень странные результаты: отправитель легко достигает>200 000 сообщений / с; хотя получателю удается обработать только около 500 сообщений в секунду!

Итак, я не понимаю, что происходит. После замедления отправителя (спит 10 мс после каждого сообщения, так как в противном случае практически невозможно исследовать поток) мне кажется, что получатель пытается не отставать, сначала видит каждое проходящее сообщение, затем задыхается, пропускает диапазон сообщений, затем снова пытается не отставать... Я играл с настройками HWM и Recovery Interval, но это, похоже, не имело большого значения (?!).

Кто-нибудь может объяснить, что происходит?

Большое спасибо, Фредерик

Примечание. Не уверен, что это важно: насколько я понимаю, я не использую OpenPGM - я просто загружаю настройку ZeroMQ и включаю поддержку многоадресной рассылки в Windows.

Это код отправителя:

class MassSender
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;

    public static void Main(string[] args)
    {
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        SendMessages_0MQ_NetMQ(timer);
    }

    private static void SendMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket publisher = context.CreateSocket(ZmqSocketType.Pub))
            {
                //publisher.Bind("tcp://*:5556");
                publisher.Bind("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    string message = GetMessage();

                    byte[] body = Encoding.UTF8.GetBytes(message);
                    publisher.Send(body);
                }
            }
        }
    }

    private static string GetMessage()
    {
        return TOPIC_PREFIX + "Message " + (++messageCounter).ToString();
    }
    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== SENT {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s ===", messageCounter, messageCounter / ++timerCounter);
    }
}

и получатель:

class MassReceiver
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;
    private static string lastMessage = String.Empty;

    static void Main(string[] args)
    {
        // Assume that sender and receiver are started simultaneously.
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        ReceiveMessages_0MQ_NetMQ(timer);
    }

    private static void ReceiveMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket subscriber = context.CreateSocket(ZmqSocketType.Sub))
            {
                subscriber.Subscribe(""); // Subscribe to everything

                //subscriber.Connect("tcp://localhost:5556");
                subscriber.Connect("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    messageCounter++;

                    byte[] body = subscriber.Receive();

                    string message = Encoding.UTF8.GetString(body);                        
                    lastMessage = message; // Only show message when timer elapses, otherwise throughput drops dramatically.  
                }
            }
        }
    }

    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== RECEIVED {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s === (Last: {2})", messageCounter, messageCounter / ++timerCounter, lastMessage);
    }
}

2 ответа

Решение

Каков размер каждого сообщения?

Вы не используете OpenPGM, вы используете то, что называется ms-pgm (реализация PGM от Microsoft).

В любом случае вам может потребоваться изменить MulticastRate сокета (по умолчанию он равен 100 Кбит / с).

Кроме того, какую сеть вы используете?

Я столкнулся с той же проблемой, отправитель может отправлять тысячи сообщений в секунду. Но мой получатель может получать только двести сообщений в секунду.

Я думаю, что это может быть отправка или получение ограничено. я проверяю

ZMQ_RATE: Установите скорость многоадресной передачи данных в http://api.zeromq.org/3-0:zmq-setsockopt

Скриншот документа

Скорость по умолчанию составляет всего 100 КБ / с.

Когда я увеличиваю его до 1 Гбит / с, все в порядке.

const int rate = 1000000;                              // 1Gb TX- and RX- rate
m_socket.setsockopt(ZMQ_RATE, &rate, sizeof(rate));
Другие вопросы по тегам