Правильный способ остановить пользовательский вход в систему асинхронного приложения

Я создал приложение для входа в систему Amazon SQS и SNS, используя Amazon SDK для Java. Базовые приложения используют синхронные API-интерфейсы Java, но я также создал асинхронные версии обоих, расширив ch.qos.logback.classic.AsyncAppender учебный класс.

Остановка контекста logger logback с асинхронными дополнениями не работает должным образом. Когда контекст останавливается, все асинхронные приложения пытаются сбросить оставшиеся события перед выходом. Проблема возникает из ch.qos.logback.core.AsyncAppenderBase#stop метод, который прерывает рабочий поток. Прерывание срабатывает, когда Amazon SDK все еще обрабатывает события в очереди и выдает com.amazonaws.AbortedException, В моих тестах AbortedException произошло, когда SDK обрабатывал ответ от API, поэтому фактическое сообщение прошло, но это не всегда так.

Предполагается ли, что logback прерывает рабочий поток, даже если рабочие все еще должны обрабатывать оставшуюся очередь событий? И если так, как я могу обойти AbortedException вызвано прерыванием? Я мог бы переопределить все методы stop и удалить прерывание, но это потребовало бы копирования, вставляющего большую часть реализации.

1 ответ

Решение

Мне наконец удалось найти решение, которое, я думаю, не является оптимальным и далеко не простым, но оно работает.

Моей первой попыткой было использование асинхронных версий API-интерфейсов AWS SDK с функцией logback, предоставленной исполнителем, поскольку с внутренним исполнителем можно было избежать проблемы прерывания. Но это не сработало, потому что рабочие очереди являются общими, и в этом случае очередь должна быть привязана к конкретному приложению, чтобы ее можно было правильно остановить. Поэтому мне нужно было использовать собственного исполнителя с каждым аппендером.

Сначала мне нужен был исполнитель для клиентов AWS. Подвох исполнителя заключается в том, что предоставленная фабрика потоков должна создавать потоки демонов, в противном случае она будет блокироваться на неопределенный срок, если используется хук отключения JVM logback.

public static ExecutorService newExecutor(Appender<?> appender, int threadPoolSize) {
    final String name = appender.getName();
    return Executors.newFixedThreadPool(threadPoolSize, new ThreadFactory() {

        private final AtomicInteger idx = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName(name + "-" + idx.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    });
}

Следующая проблема заключалась в том, как правильно остановить аппендера с помощью прерывания? Это требовало обработки прерванного исключения с повторной попыткой, потому что в противном случае исполнитель пропустил бы ожидание сброса очереди.

public static void shutdown(Appender<?> appender, ExecutorService executor, long waitMillis) {
    executor.shutdown();
    boolean completed = awaitTermination(appender, executor, waitMillis);
    if (!completed) {
        appender.addWarn(format("Executor for %s did not shut down in %d milliseconds, " +
                                "logging events might have been discarded",
                                appender.getName(), waitMillis));
    }
}

private static boolean awaitTermination(Appender<?> appender, ExecutorService executor, long waitMillis) {
    long started = System.currentTimeMillis();
    try {
        return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ie1) {
        // the worker loop is stopped by interrupt, but the remaining queue should still be handled
        long waited = System.currentTimeMillis() - started;
        if (waited < waitMillis) {
            try {
                return executor.awaitTermination(waitMillis - waited, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ie2) {
                appender.addError(format("Shut down of executor for %s was interrupted",
                                         appender.getName()));
            }
        }
        Thread.currentThread().interrupt();
    }
    return false;
}

Ожидается, что нормальные приложения для входа в систему будут работать синхронно и, следовательно, не должны терять события журналирования даже без надлежащего завершения работы. Это проблема текущих асинхронных вызовов API AWS SDK. Я решил использовать защелку обратного отсчета, чтобы обеспечить блокировку поведения приложения.

public class LoggingEventHandler<REQUEST extends AmazonWebServiceRequest, RESULT> implements AsyncHandler<REQUEST, RESULT> {

    private final ContextAware contextAware;
    private final CountDownLatch latch;
    private final String errorMessage;

    public LoggingEventHandler(ContextAware contextAware, CountDownLatch latch, String errorMessage) {
        this.contextAware = contextAware;
        this.latch = latch;
        this.errorMessage = errorMessage;
    }

    @Override
    public void onError(Exception exception) {
        contextAware.addWarn(errorMessage, exception);
        latch.countDown();
    }

    @Override
    public void onSuccess(REQUEST request, RESULT result) {
        latch.countDown();
    }
}

И справиться с ожиданием с защелкой.

public static void awaitLatch(Appender<?> appender, CountDownLatch latch, long waitMillis) {
    if (latch.getCount() > 0) {
        try {
            boolean completed = latch.await(waitMillis, TimeUnit.MILLISECONDS);
            if (!completed) {
                appender.addWarn(format("Appender '%s' did not complete sending event in %d milliseconds, " +
                                        "the event might have been lost",
                                        appender.getName(), waitMillis));
            }
        } catch (InterruptedException ex) {
            appender.addWarn(format("Appender '%s' was interrupted, " +
                                    "a logging event might have been lost or shutdown was initiated",
                                    appender.getName()));
            Thread.currentThread().interrupt();
        }
    }
}

И тогда все в комплекте. Следующий пример является упрощенной версией реальной реализации, просто показывая соответствующие части для этой проблемы.

public class SqsAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

    private AmazonSQSAsyncClient sqs;

    @Override
    public void start() {
        sqs = new AmazonSQSAsyncClient(
                getCredentials(),
                getClientConfiguration(),
                Executors.newFixedThreadPool(getThreadPoolSize())
        );
        super.start();
    }

    @Override
    public void stop() {
        super.stop();
        if (sqs != null) {
            AppenderExecutors.shutdown(this, sqs.getExecutorService(), getMaxFlushTime());
            sqs.shutdown();
            sqs = null;
        }
    }

    @Override
    protected void append(final ILoggingEvent eventObject) {
        SendMessageRequest request = ...
        CountDownLatch latch = new CountDownLatch(1);
        sqs.sendMessageAsync(request, new LoggingEventHandler<SendMessageRequest, SendMessageResult>(this, latch, "Error"));
        AppenderExecutors.awaitLatch(this, latch, getMaxFlushTime());
    }
}

Все это требовалось для правильного рассмотрения следующих случаев:

  • Сбросить оставшуюся очередь событий при остановке контекста logback или перехвате отключения при использовании оболочки асинхронного приложения
  • Не блокируйте на неопределенный срок, если используется отсрочка отложенного отключения logback
  • Обеспечить поведение блокировки, когда асинхронный аппендир не используется
  • Выдержать прерывание асинхронной остановки приложения, вызвавшее прерывание всех реализаций потока AWS SDK

Вышеуказанное используется в расширениях Logback проекта с открытым исходным кодом, который я поддерживаю.

Другие вопросы по тегам