Akka.Net Потоки и удаленное взаимодействие (Sink.ActorRefWithAck)

Я сделал довольно простую реализацию с помощью Akka.net Streams, используя Sink.ActorRefWithAck: подписчик запрашивает большую строку у издателя, который отправляет ее по частям. Он отлично работает локально (UT), но не удаленно. И я не могу понять, что не так? Конкретно: подписчик может отправить запрос издателю, который отвечает OnInit сообщение, но затем OnInit.Ack никогда не вернется к издателю. это Ack сообщение заканчивается как мертвая буква:

INFO  Akka.Actor.EmptyLocalActorRef - Message Ack from akka.tcp://OutOfProcessTaskProcessing@localhost:12100/user/Streamer_636568240846733287 to akka://OutOfProcessTaskProcessing/user/StreamSupervisor-0/StageActorRef-0 was not delivered. 1 dead letters encountered.

Обратите внимание, что журнал получен от действующего субъекта, поэтому сообщение обрабатывается в нужном процессе. Там нет очевидной ошибки пути.

Глядя на код издателя, который не обрабатывает это сообщение, я действительно не знаю, что я делаю неправильно:

    public static void ReplyWithStreamedString(IUntypedActorContext context, string toStream, int chunkSize = 2000)
    {
        Source<string, NotUsed> source = Source.From(toStream.SplitBy(chunkSize));
        source.To(Sink.ActorRefWithAck<string>(context.Sender, new StreamMessage.OnInit(),
                new StreamMessage.OnInit.Ack(),
                new StreamMessage.Completed(),
                exception => new StreamMessage.Failure(exception.Message)))
            .Run(context.System.Materializer());
    }

Вот код подписчика:

public static Task<string> AskStreamedString(this ICanTell self, object message, ActorSystem context, TimeSpan? timeout = null)
    {
        var tcs = new TaskCompletionSource<string>();
        if (timeout.HasValue)
        {
            CancellationTokenSource ct = new CancellationTokenSource(timeout.Value);
            ct.Token.Register(() => tcs.TrySetCanceled());
        }

        var props = Props.Create(() => new StreamerActorRef(tcs));
        var tempActor = context.ActorOf(props, $"Streamer_{DateTime.Now.Ticks}");

        self.Tell(message, tempActor);

        return tcs.Task.ContinueWith(task =>
        {
            context.Stop(tempActor);
            if(task.IsCanceled)
                throw new OperationCanceledException();
            if (task.IsFaulted)                    
                throw task.Exception.GetBaseException();
            return task.Result;
        });
    }

    internal class StreamerActorRef : ReceiveActor
    {
        readonly TaskCompletionSource<string> _tcs;

        private readonly StringBuilder _stringBuilder = new StringBuilder();

        public StreamerActorRef(TaskCompletionSource<string> tcs)
        {
            _tcs = tcs;
            Ready();
        }

        private void Ready()
        {
            ReceiveAny(message =>
            {
                switch (message)
                {
                    case StreamMessage.OnInit _:
                        Sender.Tell(new StreamMessage.OnInit.Ack());
                        break;
                    case StreamMessage.Completed _:
                        string result = _stringBuilder.ToString();
                        _tcs.TrySetResult(result);
                        break;
                    case string slice:
                        _stringBuilder.Append(slice);
                        Sender.Tell(new StreamMessage.OnInit.Ack());
                        break;
                    case StreamMessage.Failure error:
                        _tcs.TrySetException(new InvalidOperationException(error.Reason));
                        break;
                }
            });
        }
    }

С сообщениями:

public class StreamMessage
{
        public class OnInit
        {
            public class Ack{}
        }

        public class Completed { }

        public class Failure
        {
            public string Reason { get; }

            public Failure(string reason)
            {
                Reason = reason;
            }
        }
    }

1 ответ

Решение

В общем случае источники и приемники, работающие с ссылками на акторов, не предназначены для работы через удаленные соединения - они не охватывают повторные попытки отправки сообщений, которые могут вызвать взаимные блокировки в вашей системе, если не будет передано какое-либо сообщение управления потоком.

Функция, которую вы ищете, называется StreamRefs (которая работает как ссылки на актеров, но для потоков) и будет поставляться как часть релиза v1.4 (более подробную информацию смотрите в запросе github pull).

Другие вопросы по тегам