Простой TCP-сервер с Akka.NET
Привет, я учусь использовать Akka.net, и я хочу создать простой TCP-сервер, который будет периодически отправлять данные в TCP-соединение. (который затем будет выбран клиентом processingjs и отображен в выводе)
Не уверен, что мне здесь не хватает. Не мог бы кто-нибудь из вас, экспертов, пролить свет на эту проблему?
Это мои актеры:
using Akka.Actor;
using Akka.IO;
using System;
using System.Text;
using System.Threading.Tasks;
namespace ActorTcpProcessing
{
public class ProcessingServer : ReceiveActor
{
public ProcessingServer()
{
Receive<Tcp.Bound>(TcpBoundHandler);
Receive<Tcp.Connected>(TcpConnectedHandler);
}
internal static Props Create()
{
return Props.Create(() => new ProcessingServer());
}
private void TcpBoundHandler(Tcp.Bound bound)
{
Console.WriteLine("Listening on {0}", bound.LocalAddress);
}
private void TcpConnectedHandler(Tcp.Connected connected)
{
var tcpConnection = Sender;
var connectionHandler = Context.ActorOf(SensorReader.CreateReader(tcpConnection));
tcpConnection.Tell(new Tcp.Register(connectionHandler));
}
}
internal class SensorReader : ReceiveActor
{
private readonly IActorRef _tcpConnection;
public SensorReader(IActorRef tcpConnection)
{
_tcpConnection = tcpConnection;
Receive<SensorDataReceived>(TcpWriteSensorData);
}
internal static Props CreateReader(IActorRef connectionHandler)
{
return Props.Create(() => new SensorReader(connectionHandler));
}
private void TcpWriteSensorData(SensorDataReceived receivedData)
{
_tcpConnection.Tell(Tcp.Write.Create(ByteString.FromString(receivedData.SensorData + "\n")));
}
}
}
У меня сложилось впечатление, что когда я регистрирую сообщение, все, что мне нужно сделать, это отправить
SensorData
сообщение в поток. как таковой:
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
Однако, похоже, это не работает. Я получаю следующее сообщение журнала:
[INFO][11/09/2021 10:22:05 AM][Thread 0004][akka://tcp-processing/deadLetters] Message [Bound] from [akka://tcp-processing/system/IO-TCP/$a#1070400244] to [akka://tcp-processing/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://tcp-processing/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Вот как я отправляю сообщение Tcp.Bind своему серверу.
using System;
using Akka.IO;
using Akka.Actor;
using System.Net;
using Topshelf;
namespace ActorTcpProcessing
{
class StreamTcpService : ServiceControl
{
ActorSystem _sys;
public bool Start(HostControl hostControl)
{
_sys = ActorSystem.Create("tcp-processing");
var server = _sys.ActorOf(ProcessingServer.Create(), nameof(ProcessingServer));
var tcpManager = _sys.Tcp();
tcpManager.Tell(new Tcp.Bind(
server,
new IPEndPoint(IPAddress.Any, 10002)));
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
return true;
}
public bool Stop(HostControl hostControl)
{
_sys.Terminate().ContinueWith(task =>
{
if (task.IsCompleted && !task.IsCanceled)
Console.WriteLine("Stream Tcp stopped");
});
return true;
}
}
}
Как видите, я указываю
server
как обработчик связанного сообщения. Или я совершенно не прав?
Кроме того, я должен упомянуть, что когда я запускаю свой клиент processingjs, я получаю
Tcp.Connected
сообщение для моего
ProcessingServer
, что, в свою очередь, создает
SensorReader
и регистрируется как обработчик.
Я также хотел бы знать, использую ли я
EventStream
правильно опубликовать?
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
Любая помощь в этом приветствуется. Спасибо.
1 ответ
Ваша проблема в строке:
tcpManager.Tell(new Tcp.Bind(server,new IPEndPoint(IPAddress.Any, 10002)));
Это немного вводит в заблуждение, но
server
в конструкторе находится обработчик всех входящих подключений в виде
Tcp.Connected
, а не актер, который получит сообщение.
В
Tcp
актер ответит на
Sender
принадлежащий
Tcp.Bind
сообщение. Чтобы решить вашу проблему, вы должны пройти
IActorRef
ссылка Актёра, ожидающего получения
Tcp.Bound
сообщение. В твоем случае:
tcpManager.Tell(new Tcp.Bind(server, new IPEndPoint(IPAddress.Any, 10002)), server);