Vertx выполнить задачу 100 раз и проанализировать результат

Мне интересно, как я могу написать программу на Java, используя Vertx для одновременного выполнения задачи (вызова на удаленном сервере) 100 раз, дождаться завершения или сбоя всех 100 удаленных вызовов, а затем проанализировать результат. У меня есть некоторые идеи, но я не уверен, что это эффективно. Пока я просто использую random.nextBoolean вместо вызова удаленного сервера.

Я читал, что могу создать 100 статей и прослушивать ответы по шине событий, но все это мне нужно периодически запускать, поэтому у меня будут некоторые проблемы с определением того, какой ответ был на какой запрос.

Я не знаю, как правильно это построить.

Если у вас есть предложения, пожалуйста. Очень признателен за любую помощь.

public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    WorkerExecutor sharedWorkerExecutor = vertx.createSharedWorkerExecutor("netstate-pool");
    List<Future> works = new ArrayList<>(100);
    Random random = new Random();

    IntStream.iterate(0, operand -> ++operand).limit(100)
            .forEach(itaration -> {
                works.add(Future.future());
                sharedWorkerExecutor.executeBlocking(future -> {
                    future.complete(random.nextBoolean());
                }, false, asyncResult -> {
                    works.get(itaration).complete(asyncResult.result());
                });
            });

    CompositeFuture.join(works).setHandler(result -> {
        if (result.succeeded()) {
            System.out.println(works.stream().map(future -> (Boolean) future.result()).filter(res -> res.equals(true)).count());
            System.out.println(works.stream().map(future -> !(Boolean) future.result()).filter(res -> res.equals(true)).count());
        } else {
            System.out.println(works.stream().map(future -> (Boolean) future.result()).filter(res -> res.equals(true)).count());
            System.out.println("fail at least one");
        }
    });

1 ответ

Может быть, этот пример поможет вам начать:

import java.util.ArrayList;
import java.util.List;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestOptions;
import io.vertx.ext.unit.TestSuite;
import io.vertx.ext.unit.report.ReportOptions;

public class HttpConcurrencyTest {

    public static void main(String[] args) {

        Vertx vertx = Vertx.vertx();
        TestSuite suite = TestSuite.create("HttpConcurrencyTest");

        Integer requestCount = 100;
        Integer responseDelay = 200;
        Integer timeAllowed = 5000;
        String title = String.format("Sends %d HTTP requests with a response delay of %dms each within less than %dms.", requestCount,
                responseDelay, timeAllowed);

        suite.test(title, context -> {

            Async async = context.async();
            HttpServer server = vertx.createHttpServer();
            Long time = System.currentTimeMillis();

            server.requestHandler(request -> {
                vertx.setTimer(responseDelay, event -> {
                    request.response().end("ok");
                });
            });
            Integer port = 8181;
            server.listen(port, ar -> {
                context.assertTrue(ar.succeeded(), "Server not started");

                HttpClient client = vertx.createHttpClient();
                List<Future> futures = new ArrayList<>();
                for (int count = 0; count < requestCount; count++) {
                    Future future = Future.future();
                    futures.add(future);
                }
                CompositeFuture.all(futures).setHandler(result -> {
                    context.assertTrue(result.succeeded());
                    server.close();

                    Long duration = System.currentTimeMillis() - time;
                    context.assertTrue(duration < timeAllowed, duration + " >= " + timeAllowed);

                    async.complete();
                    System.exit(0);
                });

                futures.stream().forEach(future -> {
                    vertx.runOnContext(action -> {
                        client.getNow(port, "localhost", "/", result -> {
                            result.bodyHandler(body -> {
                                future.complete();
                            });
                        });
                    });
                });

            });
        });
        suite.run(new TestOptions().addReporter(new ReportOptions().setTo("console")));
    }
}

Внутри CompositeFuture.all(futures) Вы можете анализировать результаты всех отдельных звонков.

Код также можно найти готовым для запуска здесь: https://github.com/thokari/vertx-http-concurrency

Другие вопросы по тегам