Как взаимодействовать с Akka Actors через Akka HTTP (Java)
Тема
Я хотел бы взаимодействовать с Akka Actor через Akka HTTP. Идея состоит в том, чтобы создать систему, в которой HTTP-клиент вызывает метод HTTP-сервера Akka, который обрабатывает запрос к Akka Actor. Актер обрабатывает сообщение и отвечает обратно вызывающей стороне (Akka HTTP), которая отвечает HTTP-клиенту. Мне удалось сделать, как описано выше, но я думаю, что я делаю это неправильно, так как моя реализация кажется блокирующей.
Я объясню лучше: если я делаю много одновременных HTTP-запросов, я вижу, что Akka HTTP "создает очередь", ожидая, пока субъект обработает запрос, прежде чем отправить его следующим образом.
Вместо этого я хотел бы получить то, что HTTP-сервер Akka перенаправляет запросы, поступающие от HTTP-клиентов, немедленно целевому субъекту akka, не дожидаясь, пока субъект завершит разработку. Я хотел бы использовать параметр размера почтового ящика субъекта, чтобы определить, насколько велика очередь сообщений, и отклонить сообщения, если их слишком много.
Таким образом, мне нужен способ заставить Akka HTTP асинхронно ждать ответа от актера.
Я знаю, что емкость почтового ящика работает правильно, потому что если я вместо этого делаю много запросов своему актеру, используя простой actor2.tell ("Prova1", system.deadLetters ()) (только для тестирования), запросы, превышающие размер почтового ящика, выполняются правильно отвергнуто.
Рекомендации
Чтобы протестировать мою систему, я создал простую конфигурацию, следуя минимальным примерам, предоставленным документацией akka. Это для Акки http: https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html
и следующее для создания моего актера: https://doc.akka.io/docs/akka/current/actors.html
Мой код
Сначала я создал систему с одним актером (actor1), настроив akka HTTP следующим образом:
public class TestActor {
private static ActorSystem system;
public static void main(String[] args) throws InterruptedException
{
String httpBindAddress = "0.0.0.0";
int httpPort = 8086;
system = ActorSystem.create("deupnp");
ActorMaterializer materializer = ActorMaterializer.create(system);
Http http = Http.get(system);
AllDirectives app = new AllDirectives() {
};
Route routeActor = app.get(() ->
app.pathPrefix("mysuburl", () ->
app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
app.onSuccess(() ->
CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
app.complete(StatusCodes.get(200), response))))));
Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);
// create system with one actor
ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
}
private static String actorFunctionCall(String actor, String message)
{
try {
Inbox inbox = Inbox.create(system);
system.actorSelection("user/"+actor).tell(message, inbox.getRef());
String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
return response;
} catch (Exception e) {
//return new ResponseMessage(204,"Error");
e.printStackTrace();
return null;
}
}
}
где мой ActorTest выглядит следующим образом:
public class ActorTest extends AbstractActor {
private String myName = "";
public ActorTest(String nome){
this.myName = nome;
}
@Override
public void preStart()
{
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class,
message -> {
Thread.sleep(5000l);
System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
})
.matchAny(mex->{
System.out.println("Error");
})
.build();
}
}
мое application.conf очень просто:
akka
{
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
default-dispatcher {
throughput = 10
}
}
}
my-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1
}
Ожидаемые результаты
Как вы можете видеть, с mailbox-acity = 1 я бы ожидал, что, если я сделаю более 1 одновременных запросов, обрабатывается только один, а остальные отбрасываются.
Я думаю, что приведенный выше код не является правильным для того, что я хочу получить, так как я использую маршрутизацию Akka HTTP для получения HTTP-запросов на http://127.0.0.1/mysuburl/actor1/my_msg и затем использую Inbox для отправки сообщения Актер и ждите ответа.
Итак, мой вопрос: как правильно связать мой HTTP-запрос Akka с моим актером Akka Actor 1 асинхронным способом?
Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.
Заметка
Я даже прочитал следующую статью: https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html
в котором объясняется, как создать конечное число потоков для обработки нескольких запросов на блокировку, но я думаю, что это только "смягчает" последствия моего кода, который блокирует, но должен быть написан так, чтобы не блокировать.