Java ScheduledExecutorService - Предотвращение голодания в нескольких параллельных задачах

Я работаю над программой, которая должна проверять несколько ресурсов параллельно и периодически:

public class JobRunner {

    private final SensorService sensorService;
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    public void run() {
        sensorService.finalAll().forEach(sensor -> {
            Runnable task = () -> {
                // read and save new data to log
                List<Double> data = sensor.fetchNewData();
                this.save(data);
            };

            // execute every 10 sec
            executor.scheduleWithFixedDelay(task, 0, 10, TimeUnit.SECONDS);
        });
    }

    public void save(List<Double> data) {
        // ...
    }
}

findAll call возвращает список из примерно 50 датчиков, но когда я запускаю программу, я вижу, что, хотя все датчики запрашиваются в первый период, при последующих выполнениях вызывается только 2-3 (например, - через 20 секунд, 30 секунд и т. д.). Я думаю, что, поскольку некоторые датчики возвращаются быстрее, чем другие, они завершают цикл ожидания задачи раньше и захватываются следующим потоком в пуле, тем самым истощая другие задачи, которые завершаются медленнее.

Как я могу обеспечить одинаковое отношение ко всем задачам (датчикам)? Каковы некоторые лучшие практики здесь; я должен использовать очередь заданий или другой механизм параллелизма? Благодарю.

1 ответ

В вашем коде есть N=count service.findAll() таймеры, что затрудняет отладку и тестирование. Более того, нет никакой гарантии, что старое задание будет выполнено, а новое не выполнено в разумные сроки. Что делать, если вы

  1. Используйте один таймер, который запускает проверку датчиков через 10 секунд после последней проверки всех датчиков
  2. Проходить через датчики одновременно, когда проверка запускается таймером

Пожалуйста, посмотрите следующий код в качестве примера. Он печатает 50 целых чисел каждые 10 секунд, а затем EOL. Параллелизм достигается с Stream API

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        IntStream.range(0, 50).parallel().forEach(i -> System.out.print(i + " "));
        System.out.println();
    }
}, 0, 10, TimeUnit.SECONDS);

Вы можете заменить ScheduledExecutorService с Timer чтобы сделать код более понятным. И, как вариант, вместо использования параллельных потоков вы можете использовать другой ExecutorService, отправив следующий N задачи к нему на Timer и ждем, пока они не будут завершены:

ExecutorService workerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        List<Future<Void>> futures = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            final int index = i;
            Future<Void> future = workerExecutor.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    System.out.print(index + " ");
                    return null;
                }
            });
            futures.add(future);
        }
        for (Future<Void> future : futures) {
            try {
                future.get();
            } catch (InterruptedException|ExecutionException e) {
                throw new RuntimeException();
            }
        }
        System.out.println();
    }
}, 0, 10_000);
Другие вопросы по тегам