Java ScheduledExecutorService - Предотвращение голодания в нескольких параллельных задачах
Я работаю над программой, которая должна проверять несколько ресурсов параллельно и периодически:
public class JobRunner {
private final SensorService sensorService;
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
public void run() {
sensorService.finalAll().forEach(sensor -> {
Runnable task = () -> {
// read and save new data to log
List<Double> data = sensor.fetchNewData();
this.save(data);
};
// execute every 10 sec
executor.scheduleWithFixedDelay(task, 0, 10, TimeUnit.SECONDS);
});
}
public void save(List<Double> data) {
// ...
}
}
findAll
call возвращает список из примерно 50 датчиков, но когда я запускаю программу, я вижу, что, хотя все датчики запрашиваются в первый период, при последующих выполнениях вызывается только 2-3 (например, - через 20 секунд, 30 секунд и т. д.). Я думаю, что, поскольку некоторые датчики возвращаются быстрее, чем другие, они завершают цикл ожидания задачи раньше и захватываются следующим потоком в пуле, тем самым истощая другие задачи, которые завершаются медленнее.
Как я могу обеспечить одинаковое отношение ко всем задачам (датчикам)? Каковы некоторые лучшие практики здесь; я должен использовать очередь заданий или другой механизм параллелизма? Благодарю.
1 ответ
В вашем коде есть N=count service.findAll()
таймеры, что затрудняет отладку и тестирование. Более того, нет никакой гарантии, что старое задание будет выполнено, а новое не выполнено в разумные сроки. Что делать, если вы
- Используйте один таймер, который запускает проверку датчиков через 10 секунд после последней проверки всех датчиков
- Проходить через датчики одновременно, когда проверка запускается таймером
Пожалуйста, посмотрите следующий код в качестве примера. Он печатает 50 целых чисел каждые 10 секунд, а затем EOL. Параллелизм достигается с Stream API
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
IntStream.range(0, 50).parallel().forEach(i -> System.out.print(i + " "));
System.out.println();
}
}, 0, 10, TimeUnit.SECONDS);
Вы можете заменить ScheduledExecutorService
с Timer
чтобы сделать код более понятным. И, как вариант, вместо использования параллельных потоков вы можете использовать другой ExecutorService
, отправив следующий N
задачи к нему на Timer
и ждем, пока они не будут завершены:
ExecutorService workerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < 50; i++) {
final int index = i;
Future<Void> future = workerExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
System.out.print(index + " ");
return null;
}
});
futures.add(future);
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException|ExecutionException e) {
throw new RuntimeException();
}
}
System.out.println();
}
}, 0, 10_000);