Исключения закрытых каналов при закрытии транспортного клиента и массового процессора в asticsearch

У меня есть приложение Java SE, которое использует транспортный клиент и высокоуровневый клиент отдыха для индексации данных в asticsearch, я использую массовый процессор для массовой обработки запросов (индексации, удаления, обновления), например

 processor.add(client.prepareIndex(index, type).setSource(s).request());

Массовый процессор создается как

BulkProcessor
    .builder(client, initBulkProcessorListener())
    .setBulkActions(Integer.valueOf(getEsProperty("elasticsearch.bulkrequestsize")))
    .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
    .setFlushInterval(TimeValue.timeValueSeconds(5))
    .setConcurrentRequests(1)
    .setBackoffPolicy(BackoffPolicy.exponentialBackoff())
    .build();

я делаю поток ожидающих запросов, используя следующий метод

public void flushPendingRequests(BulkProcessor processor) {
 processor.flush();
 try {
  processor.awaitClose(10000, TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
  LOGGER.error("Interrupted Exception : " + e.getMessage());
  Thread.currentThread().interrupt();
 }
 processor.close();
 }

позже я закрываю экземпляр клиента, используя следующий метод

  public void close() {
LOGGER.info("Closing transport client");
if (client == null) {
  LOGGER.info("Transport client already closed");
  return;
}

// This is to prevent closed channel exceptions
shutdownThreadPool(client.threadPool().scheduler());
try {
  Thread.sleep(1000);
} catch (InterruptedException e) {
  e.printStackTrace();
}
client.close();

}

private static void shutdownThreadPool(ScheduledExecutorService pool) {
pool.shutdown();
try {
  boolean terminated = pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT);
  if (!terminated) {
    LOGGER.warn("Thread pool timeout elapsed before termination, wait again for " + THREADPOOL_TIMEOUT + " "
        + THREADPOOL_TIMEOUT_UNIT.toString() + "...");
    pool.shutdownNow();
    if (!pool.awaitTermination(THREADPOOL_TIMEOUT, THREADPOOL_TIMEOUT_UNIT)) {
      LOGGER.error("Thread pool did not terminate");
    }
  }
} catch (InterruptedException e) {
  LOGGER.error("Error occurred while shutting down the thread pool: " + e.getMessage(), e);
  pool.shutdownNow();
  Thread.currentThread().interrupt();
}
}

Даже если кажется, что все работает нормально, я всегда получаю исключения closeChannel в журналах эластичного поиска, когда приложение закрывается, оно исчезает, когда я добавляю Thread.sleep(1000); хотя я не понимаю, в чем именно проблема, помощь очень ценится. Ура!

0 ответов

Другие вопросы по тегам