Планирование задач с максимальной / минимальной продолжительностью между задачами
Обновление записей из БД. Мы либо получаем явное уведомление для обновления, либо опрашиваем каждые 60 секунд. Не более одного обновления в секунду.
Если поступает запрос, он должен поставить в очередь немедленное обновление, если оно не произошло в течение одной секунды. В противном случае следует запланировать обновление на 1 секунду после окончания последнего обновления, если только такая задача не запланирована на это время или раньше.
После одной минуты без явного обновления таймер должен включиться и обновить, если уведомления не были отправлены.
Может появиться большое количество уведомлений (несколько сотен в секунду).
Обновление может быть выполнено отдельным потоком.
Какой элегантный способ создать это?
Вот что у меня есть, но это может привести к слишком большому количеству запросов:
private NotificationCenter() {
recordFetchService = Executors.newSingleThreadScheduledExecutor();
recordFetchService.scheduleWithFixedDelay(refreshCommand, minTimeBetweenRefresh, maxTimeBetweenRefresh, TimeUnit.MILLISECONDS);
}
private void queueRefresh() {
// explicit refresh requested. Schedule a refreshCommand to fire immediately, unless that would break our contract
if (!pending.isDone() && pending.getDelay(TimeUnit.MILLISECONDS) < minTimeBetweenRefresh) {
// a refresh is already scheduled
} else {
pending = recordFetchService.schedule(refreshCommand, 0L, TimeUnit.MILLISECONDS);
}
}
1 ответ
С "сотнями уведомлений в секунду" AtomicBoolean
приходит в голову переключить состояние ровно один раз с "ничего не делать" на "что-то делать" и наоборот. Соедините состояние "собираюсь что-то сделать" с Semaphore
и у вас есть возможность определить точный момент, когда происходит "что-то делать".
Ниже (работоспособный) пример реализации / дизайна, который объединяет AtomicBoolean
а также Semaphore
регулярно обновлять данные при использовании уведомлений. Вероятно, это не самый элегантный способ, но я думаю, что он достигает цели относительно простым способом.
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class RefreshTask {
private static final long MIN_WAIT_MS = 100L;
private static final long MAX_WAIT_MS = 1000L;
private ScheduledExecutorService scheduler;
private ExecutorService executor;
private volatile boolean stopping;
private final Semaphore refreshLock = new Semaphore(0);
private final AtomicBoolean refreshing = new AtomicBoolean();
private volatile long lastRefresh;
public void start() {
stopping = false;
refreshing.set(true);
lastRefresh = System.currentTimeMillis();
executor = Executors.newSingleThreadExecutor();
executor.execute(new RefreshLoop());
scheduler = Executors.newSingleThreadScheduledExecutor();
}
public void stop() {
stopping = true;
if (executor != null) {
refreshLock.release();
scheduler.shutdownNow();
executor.shutdownNow();
}
}
/** Trigger a (scheduled) refresh of data. */
public void refresh() {
if (refreshing.compareAndSet(false, true)) {
final long dataAge = System.currentTimeMillis() - lastRefresh;
if (dataAge >= MIN_WAIT_MS) {
refreshLock.release();
// println("Refresh lock released.");
} else {
long waitTime = MIN_WAIT_MS - dataAge;
scheduler.schedule(new RefreshReleaser(), waitTime, TimeUnit.MILLISECONDS);
println("Refresh scheduled in " + waitTime + " ms.");
}
} else {
// println("Refresh already triggered.");
}
}
protected void refreshData() {
// Refresh data from database
println("DATA refresh");
}
class RefreshLoop implements Runnable {
@Override
public void run() {
while (!stopping) {
try {
refreshData();
} catch (Exception e) {
e.printStackTrace();
}
lastRefresh = System.currentTimeMillis();
refreshing.set(false);
try {
if (!refreshLock.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS)) {
if (!refreshing.compareAndSet(false, true)) {
// Unlikely state, but can happen if "dataAge" in the refresh-method is around MAX_WAIT_MS.
// Resolve the race-condition by removing the extra permit.
if (refreshLock.tryAcquire()) {
println("Refresh lock race-condition detected, removed additional permit.");
} else {
println("Refresh lock race-condition detected, but no additional permit found.");
}
}
println("Refreshing after max waiting time.");
} // else refreshing already set to true
} catch (InterruptedException ie) {
if (!stopping) {
ie.printStackTrace();
}
}
}
println("Refresh loop stopped.");
}
}
class RefreshReleaser implements Runnable {
@Override
public void run() {
if (refreshing.get()) {
refreshLock.release();
println("Scheduled refresh lock release.");
} else {
println("Programming error, scheduled refresh lock release can only be done in refreshing state.");
}
}
}
/* *** some testing *** */
public static void main(String[] args) {
RefreshTask rt = new RefreshTask();
try {
println("Starting");
rt.start();
Thread.sleep(2 * MIN_WAIT_MS);
println("Triggering refresh");
rt.refresh();
Thread.sleep(MAX_WAIT_MS + (MIN_WAIT_MS / 2));
println("Triggering refresh 2");
rt.refresh();
Thread.sleep(MIN_WAIT_MS);
} catch (Exception e) {
e.printStackTrace();
} finally {
rt.stop();
}
}
public static final long startTime = System.currentTimeMillis();
public static void println(String msg) {
println(System.currentTimeMillis() - startTime, msg);
}
public static void println(long tstamp, String msg) {
System.out.println(String.format("%05d ", tstamp) + msg);
}
}