Ответ на запрос в кластерном маршрутизаторе
Я создаю проект, в котором на данный момент есть Актер (Пользователь), который вызывает другого актера (Концерт) через маршрутизатор с последовательной хэш-группой. Все работает нормально, но моя проблема в том, что от актера концерта я не могу ответить на сообщение Ask. Каким-то образом сообщение теряется и в клиенте ничего не происходит. Я попробовал все без удачи
- Sender.Tell <- создает временную? отправитель
- Передача User IActorRef по ссылке в сообщении и его использование.
Вот полный код: https://github.com/pablocastilla/AkkaConcert
Основными деталями являются следующие:
Актер пользователя:
protected IActorRef concertRouter;
public User(IActorRef concertRouter, int eventId)
{
this.concertRouter = concertRouter;
this.eventId = eventId;
JobStarter = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromMilliseconds(20),
TimeSpan.FromMilliseconds(1000), Self, new AttemptToStartJob(), Self);
Receive<AttemptToStartJob>(start =>
{
var self = Self;
concertRouter.Ask<Routees>(new GetRoutees()).ContinueWith(tr =>
{
if (tr.Result.Members.Count() > 0)
{
var m = new GetAvailableSeats() { User = self, ConcertId = eventId };
self.Tell(m);
// JobStarter.Cancel();
}
}, TaskContinuationOptions.ExecuteSynchronously);
});
Receive<GetAvailableSeats>(rs =>
{
rs.User = Self;
//get free seats
concertRouter.Ask(rs).ContinueWith(t=>
{
Console.WriteLine("response received!!");
}
);
});
Конфигурация клиента HOCON:
<akka>
<hocon>
<![CDATA[
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
deployment {
/eventpool {
router = consistent-hashing-group
routees.paths = ["/user/HugeEvent"]
virtual-nodes-factor = 8
cluster {
enabled = on
max-nr-of-instances-per-node = 2
allow-local-routees = off
use-role = cluster
}
}
}
}
remote {
log-remote-lifecycle-events = DEBUG
helios.tcp {
transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
applied-adapters = []
transport-protocol = tcp
#will be populated with a dynamic host-name at runtime if left uncommented
#public-hostname = "POPULATE STATIC IP HERE"
hostname = "127.0.0.1"
port = 0
}
}
cluster {
#will inject this node as a self-seed node at run-time
seed-nodes = ["akka.tcp://akkaconcert@127.0.0.1:8080"] #manually populate other seed nodes here, i.e. "akka.tcp://lighthouse@127.0.0.1:4053", "akka.tcp://lighthouse@127.0.0.1:4044"
roles = [client]
auto-down-unreachable-after = 60s
}
}
]]>
</hocon>
На внутренней стороне:
Актер создан
private ActorSystem actorSystem;
private IActorRef event1;
public bool Start(HostControl hostControl)
{
actorSystem = ActorSystem.Create("akkaconcert");
SqlServerPersistence.Init(actorSystem);
event1 = actorSystem.ActorOf(
Props.Create(() => new Concert(1,100000)), "HugeEvent");
return true;
}
Обработка сообщений участников концерта
private void ReadyCommands()
{
Command<GetAvailableSeats>(message => GetFreeSeatsHandler(message));
Command<ReserveSeats>(message => ReserveSeatsHandler(message));
Command<BuySeats>(message => Persist(message, BuySeatsHandler));
}
private bool GetFreeSeatsHandler(GetAvailableSeats message)
{
var freeSeats = seats.Where(s => s.Value.State == Actors.Seat.SeatState.Free).Select(s2 => s2.Value).ToList();
//1. Trying passing the user actor
//message.User.Tell(new GetFreeSeatsResponse() { FreeSeats = freeSeats }, Context.Self);
//2. Trying with the sender
Context.Sender.Tell(new GetAvailableSeatsResponse() { FreeSeats = freeSeats }, Context.Self);
printMessagesPerSecond(messagesReceived++);
printfreeSeats(freeSeats);
return true;
}
Конфигурация HOCON на стороне сервера:
<akka>
<hocon>
<![CDATA[
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
}
remote {
log-remote-lifecycle-events = DEBUG
helios.tcp {
transport-class = "Akka.Remote.Transport.Helios.HeliosTcpTransport, Akka.Remote"
applied-adapters = []
transport-protocol = tcp
#will be populated with a dynamic host-name at runtime if left uncommented
#public-hostname = "POPULATE STATIC IP HERE"
hostname = "127.0.0.1"
port = 8080
}
}
cluster {
#will inject this node as a self-seed node at run-time
seed-nodes = ["akka.tcp://akkaconcert@127.0.0.1:8080"] #manually populate other seed nodes here, i.e. "akka.tcp://lighthouse@127.0.0.1:4053", "akka.tcp://lighthouse@127.0.0.1:4044"
roles = [cluster]
auto-down-unreachable-after = 10s
}
}
]]>
</hocon>
Спасибо!
1 ответ
Проблема возникает из-за размеров сообщения, сообщение было слишком большим, и оно было отброшено.
Конфигурация для получения больших сообщений:
akka {
helios.tcp {
# Maximum frame size: 4 MB
maximum-frame-size = 4000000b
}
}