Исключения закрытых каналов при закрытии транспортного клиента и массового процессора в asticsearch
У меня есть приложение Java SE, которое использует транспортный клиент и высокоуровневый клиент отдыха для индексации данных в asticsearch, я использую массовый процессор для массовой обработки запросов (индексации, удаления, обновления), например
processor.add(client.prepareIndex(index, type).setSource(s).request());
Массовый процессор создается как
BulkProcessor
.builder(client, initBulkProcessorListener())
.setBulkActions(Integer.valueOf(getEsProperty("elasticsearch.bulkrequestsize")))
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff())
.build();
я делаю поток ожидающих запросов, используя следующий метод
public void flushPendingRequests(BulkProcessor processor) {
processor.flush();
try {
processor.awaitClose(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("Interrupted Exception : " + e.getMessage());
Thread.currentThread().interrupt();
}
processor.close();
}
позже я закрываю экземпляр клиента, используя следующий метод
public void close() {
LOGGER.info("Closing transport client");
if (client == null) {
LOGGER.info("Transport client already closed");
return;
}
// This is to prevent closed channel exceptions
shutdownThreadPool(client.threadPool().scheduler());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.close();
}
private static void shutdownThreadPool(ScheduledExecutorService pool) {
pool.shutdown();
try {
boolean terminated = pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT);
if (!terminated) {
LOGGER.warn("Thread pool timeout elapsed before termination, wait again for " + THREADPOOL_TIMEOUT + " "
+ THREADPOOL_TIMEOUT_UNIT.toString() + "...");
pool.shutdownNow();
if (!pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT)) {
LOGGER.error("Thread pool did not terminate");
}
}
} catch (InterruptedException e) {
LOGGER.error("Error occurred while shutting down the thread pool: " + e.getMessage(), e);
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
Даже если кажется, что все работает нормально, я всегда получаю исключения closeChannel в журналах эластичного поиска, когда приложение закрывается, оно исчезает, когда я добавляю Thread.sleep(1000); хотя я не понимаю, в чем именно проблема, помощь очень ценится. Ура!