Java Akka одновременное ограничение в использовании маршрутизатора?

Код является:

public class TestAkka {

    public static void main(String[] args) throws InterruptedException {

        ActorSystem system = ActorSystem.create("ExampleRouter", ConfigFactory.load().getConfig("MyRouter"));
        ActorRef router = system.actorOf(Props.create(Hello.class).withRouter(new FromConfig()), "exampleRouter");

        for (int i = 0; i < 100; i++) {
            router.tell(new Website().getNameByIndex(i), router);
        }
    }

    public static class Hello extends UntypedActor {

        @Override
        public void onReceive(Object message) throws Exception {
            if (message instanceof String) {
                System.out.println("Hello " + message);
                URL url = new URL("http://" + message + ":80");
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                System.out.println(conn.getResponseCode());
                Thread.sleep(10000);  // <-- Sim the job take a short time
            } else {
                unhandled(message);
            }
        }
    }
}

Application.conf - это:

MyRouter{
    akka {
        actor {
            deployment {
                /exampleRouter {
                    router = round-robin-pool
                    nr-of-instances = 100
                }
            }
        }
    }
}

В результате я вижу только 8 одновременно выполняемых заданий, но я ожидаю, что 100 одновременных заданий должны выполняться одновременно! все еще требуются какие-либо настройки?

ОБНОВЛЕНО в 2016/06/06: я изменил свой код, и в результате я ожидаю перезаписать application.conf, он может одновременно выполнять 100 одновременных заданий. На самом деле, как оптимизировать default-dispatcher для высокопараллельных приложений?

String s = ""
        + "akka {\n"
        + "    actor {\n"
        + "        deployment {\n"
        + "            /router {\n"
        + "                router = round-robin-pool\n"
        + "                nr-of-instances = 10000\n"
        + "            }\n"
        + "        }\n"
        + "        default-dispatcher {\n"
        + "            fork-join-executor {\n"
        + "                parallelism-min = 200\n"
        + "                parallelism-max = 5000\n"
        + "            }\n"
        + "        }\n"
        + "    }\n"
        + "}\n";
ActorSystem as = ActorSystem.create("as", ConfigFactory.parseString(s));
ActorRef ar = as.actorOf(Props.create(Hello.class).withRouter(new FromConfig()), "router");

2 ответа

Решение

Вы запутались между количеством действующих лиц и потоков диспетчера:

  • Количество действующих лиц в маршрутизаторе: это число экземпляров, созданных в памяти, которые будут обрабатывать сообщения, поступающие на маршрутизатор в соответствии с выбранной логикой.

  • Потоки диспетчера. Диспетчер - это пул потоков (или служба исполнителя), который отвечает за управление потоками для получения сообщений из почтовых ящиков субъекта и выполнения receive метод.

Максимальное количество одновременных задач, выполняемых в вашей системе, будет ограничено конфигурацией диспетчера. Наличие маршрутизаторов (и, следовательно, большего количества участников для обработки сообщений) позволит этим сообщениям обрабатываться потоками в диспетчере одновременно.

Я бы порекомендовал почитать больше о диспетчерах akka и, в частности, о диспетчере по умолчанию: http://doc.akka.io/docs/akka/current/scala/dispatchers.html

Ваш звонок в Thread.sleep блокирует поток, поэтому у вас заканчиваются потоки. Если вы хотите увидеть все 100 из них, вы должны выполнить операции блокировки в их собственных выделенных потоках.

Другие вопросы по тегам