Отправка потока (например, файла) от актера к актеру
Я хочу отправить данные из потока одного актера в поток другого актера. В конце концов это должно работать с удаленными актерами. С Akka.NET Streams это должно быть легкой задачей, но, возможно, я неправильно понял это.
Это часть SenderActor:
var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
.Run(materializer);
Замечания: this.receiver
является IActorRef
куда данные должны быть отправлены.
ReceiverActor теперь получает все ByteString
сообщения и Messages.StreamCompleted
после того как поток закончился.
Как это можно легко собрать в ReceiverActor? В лучшем случае как Stream
снова.
В ReceiverActor я пытался отправить все это ByteString
сообщения на Source
который должен заполнить MemoryStream
:
class ReceiverActor : ReceiveActor
{
private readonly IActorRef streamReceiver;
private readonly MemoryStream stream;
public ReceiverActor()
{
this.stream = new MemoryStream();
this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
.To(StreamConverters.FromOutputStream(() => this.stream, true))
.Run(Context.Materializer());
Context.Watch(this.streamReceiver);
Receive<ByteString>((message) => ReceivedStreamChunk(message));
Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
Receive<Terminated>((message) => ReceivedTerminated(message));
ReceiveAny((message) => ReceivedAnyMessage(message));
}
private void ReceivedTerminated(Terminated message)
{
Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
}
private void ReceivedStreamComplete(Messages.StreamComplete message)
{
Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
}
private void ReceivedStreamChunk(object message)
{
Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
this.streamReceiver.Forward(message);
}
private void ReceivedAnyMessage(object message)
{
Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
}
}
Но MemoryStream
заполнен асинхронно и когда streamReceiver
завершает это закрывает поток, поэтому я не могу получить данные.
Как я могу получить поток правильно?
Обновление я получил это локально работает:
Благодаря вкладу Horusiath в канал поддержки Akka.NET я смог получить ByteStrings
, не нужно использовать Akka.Stream там.
Receive<ByteString>((message) => ReceivedStreamChunk(message));
А также:
private void ReceivedStreamChunk(ByteString message)
{
var bytes = message.ToArray();
targetStream.Write(bytes, 0, bytes.Length);
Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}
Обратите внимание, что Akka.NET 1.3 добавит методы WriteTo(Stream)
а также WriteToAsync(Stream, CancellationToken)
в ByteString
,
Тем не менее, это не работает с удаленными субъектами, так как система принимающего субъекта получает эту ошибку (Serializer - это Hyperion):
Ошибка [Для этого объекта не определен конструктор без параметров.]
На самом деле ByteString
имеет конструктор без параметров, но это protected
,
Я так понимаю, что ByteString
не сериализуем?
1 ответ
Я получил это работает с удаленным актером при преобразовании ByteString
в byte[]
вставив поток преобразования между источником и приемником.
Пример:
FileIO.FromFile(...).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(...)