Как завершить поток в службе исполнителя, если поток занимает слишком много времени?

Образец услуги исполнителя

 static class MyRunnable implements Runnable {

    private String serverName;

    public MyRunnable(String serverName) {
        super();
        this.serverName = serverName;
    }

    @Override
    public void run() {
        ...
        conn = new ch.ethz.ssh2.Connection(serverName);
        conn.connect();

        boolean isAuthenticated = conn.authenticateWithPassword(user, pass);
        logger.info("Connecting to " + server);

        if (isAuthenticated == false) {
            logger.info(server + " Please check credentials");
        }

        sess = conn.openSession();
        ...

    }

}

public static void main(String[] args) {
    List<String> serverList = ...;
    ExecutorService executor = Executors.newFixedThreadPool(20);

    for (String serverName : serverList) {
        MyRunnable r = new MyRunnable(serverName);
        executor.execute(r);
    }

    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.HOURS);
}

Вот пример кода моей службы исполнителя. Но с такой логикой, когда я встречаю сервер, который не может подключиться или требует слишком много времени для подключения, это создает время зависания в моем приложении. Я хочу завершить / убить поток, если для подключения требуется больше времени, чем x. Как я могу завершить задачу потока, если она не подключается к серверу в течение 2 секунд.

попытка

       ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10, 25, 500, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1));

Я добавил следующий код, но, по-видимому, он не завершает поток, если он занимает более 2000 миллисекунд.

Попытка 2

Future<?> future = executor.submit( new task));
            try {
                future.get(2000, TimeUnit.MILLISECONDS); // This waits timeout seconds; returns null
            }

            catch(TimeoutException e) {
                future.cancel(true);
               // System.out.println(server + "name");
            } 

4 ответа

Решение

Как я могу завершить задачу потока, если она не подключается к серверу в течение 2 секунд.

Обычно это трудно сделать, потому что даже если вы прервете поток (как упоминалось в других ответах), нет гарантии, что поток остановится. Прерывание просто устанавливает флаг в потоке, и это зависит от кода для определения состояния и остановки. Это означает, что тонна потоков может находиться в фоновом режиме в ожидании соединений.

Однако в вашем случае вы используете ch.ethz.ssh2.Connection.connect() метод. Оказывается, есть метод подключения, который требует времени ожидания. Я думаю, что вы хотите следующее:

// try to connect for 2 seconds
conn.connect(null, 2000, 0);

Чтобы процитировать из метода подключения Javadocs:

В случае таймаута (connectTimeout или kexTimeout) генерируется исключение SocketTimeoutException.

Ты должен сделать awaitTermination() сначала, затем проверьте возвращаемое значение, а затем выполните shutdownNow(), shutdown() не гарантирует мгновенную остановку службы, она просто прекращает прием новых заданий и ожидает завершения всех заданий в порядке. shutdownNow() с другой стороны, прекращает принимать новые задания, активно пытается остановить все запущенные задачи и не запускает новые, возвращая список всех ожидающих выполнения заданий.

Из JavaDocs:

Следующий метод завершает работу ExecutorService в два этапа: сначала вызывая shutdown для отклонения входящих задач, а затем вызывая shutdownNow, если необходимо, чтобы отменить любые устаревшие задачи:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

Вы всегда можете вызвать future.get(timeout...). Он вернет исключение тайм-аута, если оно еще не закончилось... тогда вы можете вызвать future.cancel().

Пока вы имеете дело с потоками в Java, единственный безопасный способ остановить поток - это прервать его. Ты можешь позвонить shutdown() сначала а потом ждать. Этот метод не прерывает потоки.

Если это не поможет, тогда вы звоните shutdownNow() который пытается отменить задачи, устанавливая флаг прерывания каждого потока в true. В этом случае, если потоки заблокированы / ожидают, тогда InterruptedException будет брошен. Если вы проверите флаг прерывания где-то внутри ваших задач, значит, вы тоже хороши.

Но если у вас нет другого выбора, кроме как остановить потоки, вы все равно можете это сделать. Одним из возможных решений получения доступа к рабочим является отслеживание всех созданных потоков внутри ThreadPoolExecutor с помощью фабрики пользовательских потоков.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class TestThreadPoolEx {

    static class CustomThreadFactory implements ThreadFactory {
        private List<Thread> threads = new ArrayList<>();

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            threads.add(t);
            return t;
        }

        public List<Thread> getThreads() {
            return threads;
        }

        public void stopThreads() {
            for(Thread t : threads) {
                if(t.isAlive()) {
                    try {
                        t.stop();
                    } catch (Exception e) {
                        //NOP
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CustomThreadFactory factory = new CustomThreadFactory();
        ExecutorService ex = Executors.newFixedThreadPool(1, factory);
        ex.submit(() -> {
            while(true);
        });
        ex.shutdown();
        ex.awaitTermination(5, TimeUnit.SECONDS);
        ex.shutdownNow();
        ex.awaitTermination(5, TimeUnit.SECONDS);
        factory.stopThreads();
    }
}

Это, конечно, небезопасно, но должно соответствовать вашим требованиям. В этом случае он может остановить цикл while(true). Отмена задач не сможет этого сделать.

Другие вопросы по тегам