Простой TCP-сервер с Akka.NET

Привет, я учусь использовать Akka.net, и я хочу создать простой TCP-сервер, который будет периодически отправлять данные в TCP-соединение. (который затем будет выбран клиентом processingjs и отображен в выводе)

Не уверен, что мне здесь не хватает. Не мог бы кто-нибудь из вас, экспертов, пролить свет на эту проблему?

Это мои актеры:

      using Akka.Actor;
using Akka.IO;
using System;
using System.Text;
using System.Threading.Tasks;

namespace ActorTcpProcessing
{
    public class ProcessingServer : ReceiveActor
    {  
        public ProcessingServer()
        { 
            Receive<Tcp.Bound>(TcpBoundHandler);
            Receive<Tcp.Connected>(TcpConnectedHandler);
        }

        internal static Props Create()
        {
            return Props.Create(() => new ProcessingServer());
        }

        private void TcpBoundHandler(Tcp.Bound bound)
        {
            Console.WriteLine("Listening on {0}", bound.LocalAddress);
        }
         
        private void TcpConnectedHandler(Tcp.Connected connected)
        { 
            var tcpConnection = Sender; 

            var connectionHandler = Context.ActorOf(SensorReader.CreateReader(tcpConnection));
           
            tcpConnection.Tell(new Tcp.Register(connectionHandler));
        }
    }

    internal class SensorReader : ReceiveActor
    {
        private readonly IActorRef _tcpConnection;

        public SensorReader(IActorRef tcpConnection)
        {
            _tcpConnection = tcpConnection;

            Receive<SensorDataReceived>(TcpWriteSensorData);
        }

        internal static Props CreateReader(IActorRef connectionHandler)
        {
            return Props.Create(() => new SensorReader(connectionHandler));
        }

        private void TcpWriteSensorData(SensorDataReceived receivedData)
        {
            _tcpConnection.Tell(Tcp.Write.Create(ByteString.FromString(receivedData.SensorData + "\n")));
        }
    }
}

У меня сложилось впечатление, что когда я регистрирую сообщение, все, что мне нужно сделать, это отправить SensorDataсообщение в поток. как таковой: _sys.EventStream.Publish(new SensorData("sensor-data[x]"));

Однако, похоже, это не работает. Я получаю следующее сообщение журнала:

      [INFO][11/09/2021 10:22:05 AM][Thread 0004][akka://tcp-processing/deadLetters] Message [Bound] from [akka://tcp-processing/system/IO-TCP/$a#1070400244] to [akka://tcp-processing/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://tcp-processing/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Вот как я отправляю сообщение Tcp.Bind своему серверу.

      using System;
using Akka.IO;
using Akka.Actor;
using System.Net;
using Topshelf;

namespace ActorTcpProcessing
{
    class StreamTcpService : ServiceControl
    {
        ActorSystem _sys;

        public bool Start(HostControl hostControl)
        {
            _sys = ActorSystem.Create("tcp-processing");
            var server = _sys.ActorOf(ProcessingServer.Create(), nameof(ProcessingServer));
            var tcpManager = _sys.Tcp();

            
            tcpManager.Tell(new Tcp.Bind(
                server, 
                new IPEndPoint(IPAddress.Any, 10002)));

            _sys.EventStream.Publish(new SensorData("sensor-data[x]"));

            return true;
        }

        public bool Stop(HostControl hostControl)
        {
            _sys.Terminate().ContinueWith(task =>
            {
                if (task.IsCompleted && !task.IsCanceled)
                    Console.WriteLine("Stream Tcp stopped");
            });

            return true;
        }
    }
}

Как видите, я указываю serverкак обработчик связанного сообщения. Или я совершенно не прав?

Кроме того, я должен упомянуть, что когда я запускаю свой клиент processingjs, я получаю Tcp.Connected сообщение для моего ProcessingServer, что, в свою очередь, создает SensorReader и регистрируется как обработчик.

Я также хотел бы знать, использую ли я EventStream правильно опубликовать?

      _sys.EventStream.Publish(new SensorData("sensor-data[x]"));

Любая помощь в этом приветствуется. Спасибо.

1 ответ

Ваша проблема в строке:

      tcpManager.Tell(new Tcp.Bind(server,new IPEndPoint(IPAddress.Any, 10002)));

Это немного вводит в заблуждение, но server в конструкторе находится обработчик всех входящих подключений в виде Tcp.Connected, а не актер, который получит сообщение.

В Tcp актер ответит на Sender принадлежащий Tcp.Bindсообщение. Чтобы решить вашу проблему, вы должны пройти IActorRef ссылка Актёра, ожидающего получения Tcp.Boundсообщение. В твоем случае:

      tcpManager.Tell(new Tcp.Bind(server, new IPEndPoint(IPAddress.Any, 10002)), server);
Другие вопросы по тегам