Отправка потока (например, файла) от актера к актеру

Я хочу отправить данные из потока одного актера в поток другого актера. В конце концов это должно работать с удаленными актерами. С Akka.NET Streams это должно быть легкой задачей, но, возможно, я неправильно понял это.

Это часть SenderActor:

var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
    .Run(materializer);

Замечания: this.receiver является IActorRef куда данные должны быть отправлены.

ReceiverActor теперь получает все ByteString сообщения и Messages.StreamCompleted после того как поток закончился.

Как это можно легко собрать в ReceiverActor? В лучшем случае как Stream снова.

В ReceiverActor я пытался отправить все это ByteString сообщения на Source который должен заполнить MemoryStream:

class ReceiverActor : ReceiveActor
{
    private readonly IActorRef streamReceiver;
    private readonly MemoryStream stream;

    public ReceiverActor()
    {
        this.stream = new MemoryStream();
        this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
            .To(StreamConverters.FromOutputStream(() => this.stream, true))
            .Run(Context.Materializer());
        Context.Watch(this.streamReceiver);

        Receive<ByteString>((message) => ReceivedStreamChunk(message));
        Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
        Receive<Terminated>((message) => ReceivedTerminated(message));
        ReceiveAny((message) => ReceivedAnyMessage(message));
    }
    private void ReceivedTerminated(Terminated message)
    {
        Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
    }
    private void ReceivedStreamComplete(Messages.StreamComplete message)
    {
        Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
    }
    private void ReceivedStreamChunk(object message)
    {
        Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
        this.streamReceiver.Forward(message);
    }
    private void ReceivedAnyMessage(object message)
    {
        Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
    }
}

Но MemoryStream заполнен асинхронно и когда streamReceiver завершает это закрывает поток, поэтому я не могу получить данные.

Как я могу получить поток правильно?


Обновление я получил это локально работает:

Благодаря вкладу Horusiath в канал поддержки Akka.NET я смог получить ByteStrings, не нужно использовать Akka.Stream там.

Receive<ByteString>((message) => ReceivedStreamChunk(message));

А также:

private void ReceivedStreamChunk(ByteString message)
{
    var bytes = message.ToArray();
    targetStream.Write(bytes, 0, bytes.Length);
    Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}

Обратите внимание, что Akka.NET 1.3 добавит методы WriteTo(Stream) а также WriteToAsync(Stream, CancellationToken) в ByteString,

Тем не менее, это не работает с удаленными субъектами, так как система принимающего субъекта получает эту ошибку (Serializer - это Hyperion):

Ошибка [Для этого объекта не определен конструктор без параметров.]

На самом деле ByteString имеет конструктор без параметров, но это protected,

Я так понимаю, что ByteString не сериализуем?

1 ответ

Я получил это работает с удаленным актером при преобразовании ByteString в byte[] вставив поток преобразования между источником и приемником.

Пример:

FileIO.FromFile(...).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(...)

Другие вопросы по тегам