Почему NetMQ DealerSocket в Mono не отправляет сообщения на сервер в Debian Wheezy, а в Windows?

У меня есть проблемы с NetMQ 4.0.0.1 на Mono 4.8 на Debian Wheezy.

Где сокет Дилер не отправляет никаких сообщений, пока я не перестану звонить, чтобы отправить новое сообщение. Когда буду ставить Thread.Sleep( 1000 ) между созданием задач с чем все ок. Я хотел бы признать, что все работает на Windows в.Net Framework 4.5 и.Net Core 1.1 без каких-либо Thread.Sleep(),

У меня есть шаблон, как это: введите описание изображения здесь

Я добавил отладочные сообщения и вижу, что создаю 100 REQ Сокеты в Задачах в цикле, и Маршрутизатор получает запросы в очереди, чем отправляет их через Дилера, и ничего не происходит на другой стороне TCP, пока я не остановлю отправку вызова на REQ-сокетах. Просто Thread.Sleep() на каждые 5 заданий работает. Это похоже на ошибку Поллера, или ошибку Дилера, или я делаю что-то не так.

Вот код среднего поля:

public class CollectorDevice : IDisposable
{
    private NetMQPoller _poller;
    private RouterSocket _frontendSocket;
    private DealerSocket _backendSocket;
    private readonly string _backEndAddress;
    private readonly string _frontEndAddress;
    private readonly int _expectedFrameCount;
    private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false);
    private readonly Thread _localThread;
    private static Logger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="backEndAddress"></param>
    /// <param name="frontEndAddress"></param>
    /// <param name="expectedFrameCount"></param>
    public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
    {
        _expectedFrameCount = expectedFrameCount;

        _backEndAddress = backEndAddress;
        _frontEndAddress = frontEndAddress;

        _localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
    }

    public void Start()
    {
        _localThread.Start();
        _startSemaphore.WaitOne();


    }

    public void Stop()
    {
        _poller.Stop();
    }

    #region Implementation of IDisposable

    public void Dispose()
    {
        Stop();
    }

    #endregion


    #region Private Methods
    private void DoWork()
    {
        try
        {
            using (_poller = new NetMQPoller())
            using (_frontendSocket = new RouterSocket(_frontEndAddress))
            using (_backendSocket = new DealerSocket(_backEndAddress))
            {
                _backendSocket.ReceiveReady += OnBackEndReady;
                _frontendSocket.ReceiveReady += OnFrontEndReady;


                _poller.Add(_frontendSocket);
                _poller.Add(_backendSocket);

                _startSemaphore.Set();

                _poller.Run();
            }
        }
        catch (Exception e)
        {
            _logger.Error(e);
        }
    }

    private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _frontendSocket.SendMultipartMessage(message);
    }

    private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _backendSocket.SendMultipartMessage(message);
    }

    #endregion
}

Вот клиентская сторона:

class Program
{
    private static Logger _logger = LogManager.GetCurrentClassLogger();


    private static void Main(string[] args)
    {
        Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
        Console.ReadKey();


        using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3))
        {
            collectorDevice.Start();

            List<Task> tasks = new List<Task>();
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine(i);
                int j = i;       
                Task t = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        using (var req = new RequestSocket("inproc://broker"))
                        {
                            req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
                            _logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
                            Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));

                            string responseMessage = req.ReceiveFrameString();
                            _logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                            Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                        }
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                        _logger.Error(e);
                    }
                });
                tasks.Add(t);
                //Thread.Sleep (100);//<- This thread sleep is fixing problem?
            }

            Task.WaitAll(tasks.ToArray());
        }

    }
}

И на стороне сервера:

class Program
{
    private static Logger _logger = LogManager.GetCurrentClassLogger();

    static void Main(string[] args)
    {
        try{
        using (var routerSocket = new RouterSocket("@tcp://*:5556"))
        {
            var poller = new NetMQPoller();
            routerSocket.ReceiveReady += RouterSocketOnReceiveReady;
            poller.Add(routerSocket);
            poller.Run();
        }
        }
        catch(Exception e) 
        {      
            Console.WriteLine (e);
        }

        Console.ReadKey ();
    }

    private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
    {
        NetMQMessage clientMessage = new NetMQMessage();
        bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
            ref clientMessage, 5);

        if (result == false)
        {
            Console.WriteLine ("Something went wrong?!");
        }

        var address = clientMessage[0];
        var address2 = clientMessage[1];
        var clientMessageString = clientMessage[3].ConvertToString();

        //_logger.Debug("Message from client received: '{0}'", clientMessageString);
        Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString));

        netMqSocketEventArgs
            .Socket.SendMoreFrame(address.Buffer)
            .SendMoreFrame(address2.Buffer)
            .SendMoreFrameEmpty()
            .SendFrame("I have received your message");
    }
}

У кого-нибудь есть идеи?

Я думал, что, возможно, я использую сокет из разных потоков, поэтому я упаковал его в структуру ThreadLocal, но это не помогло. Тогда я прочитал о некоторых проблемах в единстве с NetMQ, поэтому я добавил 'AsyncIO.ForceDotNet.Force();" перед каждым вызовом конструктора сокета, и это тоже не помогло. Затем я обновил свой моно до 4.8 с 4.4, и он все еще выглядит так же.

Я проверил, что Thread.Sleep(100) между задачами решает проблему. Это странно

1 ответ

Решение

Я проверил код, это занимает много времени, но в конечном итоге сервер получает все сообщения (занимает около минуты).

Проблема заключается в количестве потоков, все асинхронные операции, которые должны выполняться на портах завершения портов ввода-вывода, занимают много времени при наличии 100 потоков. Я смог воспроизвести его без NetMQ с помощью следующего кода

    public static void Main(string[] args)
    {
        ManualResetEvent resetEvent = new ManualResetEvent(false);

        List<Task> tasks = new List<Task>();

        for (int i = 0; i < 100; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                resetEvent.WaitOne();
            }));
        }

        Thread.Sleep(100);

        Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        listener.Bind(new IPEndPoint(IPAddress.Any, 5556));
        listener.Listen(1);

        SocketAsyncEventArgs args1 = new SocketAsyncEventArgs();
        args1.Completed += (sender, eventArgs) =>
        {
            Console.WriteLine($"Accepted {args1.SocketError}");
            resetEvent.Set();
        };
        listener.AcceptAsync(args1);

        SocketAsyncEventArgs args2 = new SocketAsyncEventArgs();
        args2.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 5556);
        args2.Completed += (sender, eventArgs) => Console.WriteLine("Connected");
        Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        client.ConnectAsync(args2);

        Task.WaitAll(tasks.ToArray());

        Console.WriteLine("all tasks completed");
    }

Вы можете видеть, что это также занимает около минуты. Только с 5 нитями это закончено немедленно.

В любом случае вы можете захотеть запустить меньше потоков и / или восстановить ошибку в моно-проекте.

Другие вопросы по тегам