Почему NetMQ DealerSocket в Mono не отправляет сообщения на сервер в Debian Wheezy, а в Windows?
У меня есть проблемы с NetMQ 4.0.0.1 на Mono 4.8 на Debian Wheezy.
Где сокет Дилер не отправляет никаких сообщений, пока я не перестану звонить, чтобы отправить новое сообщение. Когда буду ставить Thread.Sleep( 1000 )
между созданием задач с чем все ок. Я хотел бы признать, что все работает на Windows в.Net Framework 4.5 и.Net Core 1.1 без каких-либо Thread.Sleep()
,
Я добавил отладочные сообщения и вижу, что создаю 100 REQ
Сокеты в Задачах в цикле, и Маршрутизатор получает запросы в очереди, чем отправляет их через Дилера, и ничего не происходит на другой стороне TCP, пока я не остановлю отправку вызова на REQ-сокетах. Просто Thread.Sleep()
на каждые 5 заданий работает. Это похоже на ошибку Поллера, или ошибку Дилера, или я делаю что-то не так.
Вот код среднего поля:
public class CollectorDevice : IDisposable
{
private NetMQPoller _poller;
private RouterSocket _frontendSocket;
private DealerSocket _backendSocket;
private readonly string _backEndAddress;
private readonly string _frontEndAddress;
private readonly int _expectedFrameCount;
private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false);
private readonly Thread _localThread;
private static Logger _logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// Constructor
/// </summary>
/// <param name="backEndAddress"></param>
/// <param name="frontEndAddress"></param>
/// <param name="expectedFrameCount"></param>
public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
{
_expectedFrameCount = expectedFrameCount;
_backEndAddress = backEndAddress;
_frontEndAddress = frontEndAddress;
_localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
}
public void Start()
{
_localThread.Start();
_startSemaphore.WaitOne();
}
public void Stop()
{
_poller.Stop();
}
#region Implementation of IDisposable
public void Dispose()
{
Stop();
}
#endregion
#region Private Methods
private void DoWork()
{
try
{
using (_poller = new NetMQPoller())
using (_frontendSocket = new RouterSocket(_frontEndAddress))
using (_backendSocket = new DealerSocket(_backEndAddress))
{
_backendSocket.ReceiveReady += OnBackEndReady;
_frontendSocket.ReceiveReady += OnFrontEndReady;
_poller.Add(_frontendSocket);
_poller.Add(_backendSocket);
_startSemaphore.Set();
_poller.Run();
}
}
catch (Exception e)
{
_logger.Error(e);
}
}
private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
{
NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
_frontendSocket.SendMultipartMessage(message);
}
private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
{
NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
_backendSocket.SendMultipartMessage(message);
}
#endregion
}
Вот клиентская сторона:
class Program
{
private static Logger _logger = LogManager.GetCurrentClassLogger();
private static void Main(string[] args)
{
Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
Console.ReadKey();
using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3))
{
collectorDevice.Start();
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
Console.WriteLine(i);
int j = i;
Task t = Task.Factory.StartNew(() =>
{
try
{
using (var req = new RequestSocket("inproc://broker"))
{
req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
_logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
string responseMessage = req.ReceiveFrameString();
_logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
}
}
catch (Exception e)
{
Console.WriteLine(e);
_logger.Error(e);
}
});
tasks.Add(t);
//Thread.Sleep (100);//<- This thread sleep is fixing problem?
}
Task.WaitAll(tasks.ToArray());
}
}
}
И на стороне сервера:
class Program
{
private static Logger _logger = LogManager.GetCurrentClassLogger();
static void Main(string[] args)
{
try{
using (var routerSocket = new RouterSocket("@tcp://*:5556"))
{
var poller = new NetMQPoller();
routerSocket.ReceiveReady += RouterSocketOnReceiveReady;
poller.Add(routerSocket);
poller.Run();
}
}
catch(Exception e)
{
Console.WriteLine (e);
}
Console.ReadKey ();
}
private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
{
NetMQMessage clientMessage = new NetMQMessage();
bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
ref clientMessage, 5);
if (result == false)
{
Console.WriteLine ("Something went wrong?!");
}
var address = clientMessage[0];
var address2 = clientMessage[1];
var clientMessageString = clientMessage[3].ConvertToString();
//_logger.Debug("Message from client received: '{0}'", clientMessageString);
Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString));
netMqSocketEventArgs
.Socket.SendMoreFrame(address.Buffer)
.SendMoreFrame(address2.Buffer)
.SendMoreFrameEmpty()
.SendFrame("I have received your message");
}
}
У кого-нибудь есть идеи?
Я думал, что, возможно, я использую сокет из разных потоков, поэтому я упаковал его в структуру ThreadLocal, но это не помогло. Тогда я прочитал о некоторых проблемах в единстве с NetMQ, поэтому я добавил 'AsyncIO.ForceDotNet.Force();" перед каждым вызовом конструктора сокета, и это тоже не помогло. Затем я обновил свой моно до 4.8 с 4.4, и он все еще выглядит так же.
Я проверил, что Thread.Sleep(100) между задачами решает проблему. Это странно
1 ответ
Я проверил код, это занимает много времени, но в конечном итоге сервер получает все сообщения (занимает около минуты).
Проблема заключается в количестве потоков, все асинхронные операции, которые должны выполняться на портах завершения портов ввода-вывода, занимают много времени при наличии 100 потоков. Я смог воспроизвести его без NetMQ с помощью следующего кода
public static void Main(string[] args)
{
ManualResetEvent resetEvent = new ManualResetEvent(false);
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
tasks.Add(Task.Run(() =>
{
resetEvent.WaitOne();
}));
}
Thread.Sleep(100);
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Any, 5556));
listener.Listen(1);
SocketAsyncEventArgs args1 = new SocketAsyncEventArgs();
args1.Completed += (sender, eventArgs) =>
{
Console.WriteLine($"Accepted {args1.SocketError}");
resetEvent.Set();
};
listener.AcceptAsync(args1);
SocketAsyncEventArgs args2 = new SocketAsyncEventArgs();
args2.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 5556);
args2.Completed += (sender, eventArgs) => Console.WriteLine("Connected");
Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.ConnectAsync(args2);
Task.WaitAll(tasks.ToArray());
Console.WriteLine("all tasks completed");
}
Вы можете видеть, что это также занимает около минуты. Только с 5 нитями это закончено немедленно.
В любом случае вы можете захотеть запустить меньше потоков и / или восстановить ошибку в моно-проекте.