Проблемы с фазером (использование JSR166y с последней версией JDK версии 6)
Итак, у меня есть этот Phaser, который действительно гибкий, но, похоже, я что-то упустил.
Я успешно использовал CyclicBarrier, но теперь я также хочу что-то более гибкое, как я уже сказал. Итак, вот код:
Объявления:
private static final CountDownLatch synchronizer = new CountDownLatch(1);
private static AtomicBoolean HAS_TIMED_OUT = new AtomicBoolean(false);
Код:
try {
logger.INFO("CONNECTED - Peer ID properties: " + SYS_NEWLINE + peerSocket + SYS_NEWLINE + pID, true);
final int peerKQueries = sp.getInteger(peerSocket);
peerObjects = new String[peerKQueries];
peerValues = new BigDecimal[peerKQueries];
for ( int i = 0; i < peerObjects.length; i++ )
peerObjects[i] = sp.getString(peerSocket);
for ( int i = 0; i < peerValues.length; i++ )
peerValues[i] = sp.getBigDecimal(peerSocket);
final int phase1a = htPhaser1a.arrive();
if ( phase1a < 0 ) {
logger.ERROR("Rejecting Super Peer thread " + THREAD_ID + " because it arrived lately for Phase 1a!", true);
sp.close(peerSocket);
throw new IllegalThreadStateException();
} else {
logger.INFO(pID + " -> Arrived in HT phase 1a. Total arrivals: "+htPhaser1a.getArrivedParties(), true);
logger.INFO("Super Peer thread " + THREAD_ID + " will advance to HT Phase 1b/2 (phase number is "+phase1a+").", true);
// The last peer should also unblock the barrier.
if ( htPhaser1a.getArrivedParties() == TOTAL_PEERS.get() ) {
htPhaser1a.arrive();
synchronizer.countDown();
}
htPhaser1a.awaitAdvanceInterruptibly(phase1a, 30, TimeUnit.SECONDS);
}
} catch (IOException e) {
logger.ERROR("Super Peer thread " + THREAD_ID + " encountered an I/O error.", true);
sp.close(peerSocket);
throw new IllegalThreadStateException();
} catch (TimeoutException e) {
logger.INFO("Super Peer thread " + THREAD_ID + " timed out but will advance to HT Phase 1b/2.", true);
if ( HAS_TIMED_OUT.compareAndSet(false, true) ) {
logger.INFO("Parties NOT arrived in the timeout: "+(htPhaser1a.getUnarrivedParties()-1), true);
resetCriticalData(htPhaser1a.getArrivedParties());
htPhaser1a.forceTermination();
instantiateHTPhase1b();
instantiateHTPhase2();
instantiateHTPatch();
synchronizer.countDown();
}
} finally {
logger.INFO("Super Peer thread "+THREAD_ID+" is blocked!", true);
synchronizer.await();
logger.INFO("Super Peer thread's "+THREAD_ID+" blocking waived!", true);
}
sp.getSomething();
являются вызовами ввода / вывода.
Учтите, что этот пример кода выполняется несколькими потоками.
Вот моя проблема: я гарантировал, что не более MAX_CLIENTS прибудет в фазер, поэтому, если MAX_CLIENTS прибудет, все будет хорошо. Однако у меня возникла проблема с TimeoutException. Во-первых, это временное окно (то есть состояние гонки), когда клиент (скажем, Поток A) сможет прибыть на фазу, затем возникает исключение TimeoutException в потоке B, я динамически создаю экземпляр другого фазера в потоке B с количеством прибывающих сторон (скажем, 5), но затем поток A уже прибыл в фазу (он же был найден, что phase1a был < 0). Как я могу это исправить? Я думал об использовании семафора, но думаю, что это не стоит усилий, потому что тогда мне, вероятно, придется переосмыслить способ, которым я делаю это. Я также думал об использовании таймера и увеличении AtomicInteger
переменная и когда таймер истекает, инстанцируя динамически Phaser. Есть идеи, как бы вы подошли к этой проблеме?
РЕДАКТИРОВАТЬ:
В документации есть bulkRegister(int parties)
метод, но это отчасти странно сформулировано:
Добавляет заданное количество новых неисследованных сторон в этот фазер. Если выполняется текущий вызов onAdvance (int, int), этот метод может дождаться своего завершения перед возвратом. Если у этого фазера есть родитель, и данное число сторон больше нуля, и у этого фазера ранее не было зарегистрированных сторон, этот дочерний фазер также регистрируется с его родителем. Если этот фазер завершается, попытка регистрации не имеет никакого эффекта, и возвращается отрицательное значение.
Вопрос: слово "может" меня смущает! "Может" как в силе или "может" как в воле?
РЕДАКТИРОВАТЬ:
Решаемые. Проверьте мой ответ ниже.
1 ответ
Объявления:
private static final CountDownLatch PEER = new CountDownLatch(1);
private static AtomicBoolean HAS_TIMED_OUT = new AtomicBoolean(false);
htPeerPhaser = new Phaser();
Код:
...
htPeerPhaser.register(); // Called only once.
...
// Note: Server application has guaranteed that no more than the maximum number of peers will arrive.
try {
logger.INFO("CONNECTED - Peer ID properties: " + SYS_NEWLINE + peerSocket + SYS_NEWLINE + pID, true);
final int peerKQueries = sp.getInteger(peerSocket);
peerObjects = new String[peerKQueries];
peerValues = new BigDecimal[peerKQueries];
for ( int i = 0; i < peerObjects.length; i++ )
peerObjects[i] = sp.getString(peerSocket);
for ( int i = 0; i < peerValues.length; i++ )
peerValues[i] = sp.getBigDecimal(peerSocket);
final int registrationID = htPeerPhaser.bulkRegister(1);
if ( registrationID < 0 ) {
logger.ERROR("Rejecting Super Peer thread " + THREAD_ID + " because peer registration has stopped!", true);
sp.close(peerSocket);
throw new IllegalThreadStateException();
}
logger.INFO(pID + " -> Registered for HT phase 1.", true);
logger.INFO("Super Peer thread " + THREAD_ID + " will advance to HT Phase 1/2.", true);
// The last peer should also unblock the barrier.
if ( htPeerPhaser.getRegisteredParties() == TOTAL_PEERS.get()+1 ) {
htPeerPhaser.forceTermination();
PEER.countDown();
}
htPeerPhaser.awaitAdvanceInterruptibly(registrationID, 30, TimeUnit.SECONDS);
} catch (IOException e) {
logger.ERROR("Super Peer thread " + THREAD_ID + " encountered an I/O error.", true);
sp.close(peerSocket);
throw new IllegalThreadStateException();
} catch (TimeoutException e) {
htPeerPhaser.forceTermination();
logger.INFO("Super Peer thread " + THREAD_ID + " timed out but will advance to HT Phase 1b/2.", true);
if ( HAS_TIMED_OUT.compareAndSet(false, true) && htPeerPhaser.getRegisteredParties() < TOTAL_PEERS.get()+1 ) {
final int arrivedPeers = htPeerPhaser.getRegisteredParties()-1;
logger.INFO("Parties that arrived before timeout: "+arrivedPeers, true);
final int unarrivedPeers = TOTAL_PEERS.get()-arrivedPeers;
logger.INFO("Parties NOT arrived due to timeout: "+unarrivedPeers, true);
resetCriticalData(arrivedPeers);
instantiateHTPhase1b();
instantiateHTPhase2();
instantiateHTPatch();
PEER.countDown();
logger.INFO("Super Peer thread " + THREAD_ID + " re-instantiated critical data.", true);
}
}
logger.INFO("Super Peer thread "+THREAD_ID+" is blocked!", true);
PEER.await();
logger.INFO("Super Peer thread's "+THREAD_ID+" blocking waived!", true);