Как завершить поток в службе исполнителя, если поток занимает слишком много времени?
Образец услуги исполнителя
static class MyRunnable implements Runnable {
private String serverName;
public MyRunnable(String serverName) {
super();
this.serverName = serverName;
}
@Override
public void run() {
...
conn = new ch.ethz.ssh2.Connection(serverName);
conn.connect();
boolean isAuthenticated = conn.authenticateWithPassword(user, pass);
logger.info("Connecting to " + server);
if (isAuthenticated == false) {
logger.info(server + " Please check credentials");
}
sess = conn.openSession();
...
}
}
public static void main(String[] args) {
List<String> serverList = ...;
ExecutorService executor = Executors.newFixedThreadPool(20);
for (String serverName : serverList) {
MyRunnable r = new MyRunnable(serverName);
executor.execute(r);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
}
Вот пример кода моей службы исполнителя. Но с такой логикой, когда я встречаю сервер, который не может подключиться или требует слишком много времени для подключения, это создает время зависания в моем приложении. Я хочу завершить / убить поток, если для подключения требуется больше времени, чем x. Как я могу завершить задачу потока, если она не подключается к серверу в течение 2 секунд.
попытка
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 25, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1));
Я добавил следующий код, но, по-видимому, он не завершает поток, если он занимает более 2000 миллисекунд.
Попытка 2
Future<?> future = executor.submit( new task));
try {
future.get(2000, TimeUnit.MILLISECONDS); // This waits timeout seconds; returns null
}
catch(TimeoutException e) {
future.cancel(true);
// System.out.println(server + "name");
}
4 ответа
Как я могу завершить задачу потока, если она не подключается к серверу в течение 2 секунд.
Обычно это трудно сделать, потому что даже если вы прервете поток (как упоминалось в других ответах), нет гарантии, что поток остановится. Прерывание просто устанавливает флаг в потоке, и это зависит от кода для определения состояния и остановки. Это означает, что тонна потоков может находиться в фоновом режиме в ожидании соединений.
Однако в вашем случае вы используете ch.ethz.ssh2.Connection.connect()
метод. Оказывается, есть метод подключения, который требует времени ожидания. Я думаю, что вы хотите следующее:
// try to connect for 2 seconds
conn.connect(null, 2000, 0);
Чтобы процитировать из метода подключения Javadocs:
В случае таймаута (connectTimeout или kexTimeout) генерируется исключение SocketTimeoutException.
Ты должен сделать awaitTermination()
сначала, затем проверьте возвращаемое значение, а затем выполните shutdownNow()
, shutdown()
не гарантирует мгновенную остановку службы, она просто прекращает прием новых заданий и ожидает завершения всех заданий в порядке. shutdownNow()
с другой стороны, прекращает принимать новые задания, активно пытается остановить все запущенные задачи и не запускает новые, возвращая список всех ожидающих выполнения заданий.
Из JavaDocs:
Следующий метод завершает работу ExecutorService в два этапа: сначала вызывая shutdown для отклонения входящих задач, а затем вызывая shutdownNow, если необходимо, чтобы отменить любые устаревшие задачи:
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } }
Вы всегда можете вызвать future.get(timeout...). Он вернет исключение тайм-аута, если оно еще не закончилось... тогда вы можете вызвать future.cancel().
Пока вы имеете дело с потоками в Java, единственный безопасный способ остановить поток - это прервать его. Ты можешь позвонить shutdown()
сначала а потом ждать. Этот метод не прерывает потоки.
Если это не поможет, тогда вы звоните shutdownNow()
который пытается отменить задачи, устанавливая флаг прерывания каждого потока в true. В этом случае, если потоки заблокированы / ожидают, тогда InterruptedException будет брошен. Если вы проверите флаг прерывания где-то внутри ваших задач, значит, вы тоже хороши.
Но если у вас нет другого выбора, кроме как остановить потоки, вы все равно можете это сделать. Одним из возможных решений получения доступа к рабочим является отслеживание всех созданных потоков внутри ThreadPoolExecutor с помощью фабрики пользовательских потоков.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestThreadPoolEx {
static class CustomThreadFactory implements ThreadFactory {
private List<Thread> threads = new ArrayList<>();
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
threads.add(t);
return t;
}
public List<Thread> getThreads() {
return threads;
}
public void stopThreads() {
for(Thread t : threads) {
if(t.isAlive()) {
try {
t.stop();
} catch (Exception e) {
//NOP
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
CustomThreadFactory factory = new CustomThreadFactory();
ExecutorService ex = Executors.newFixedThreadPool(1, factory);
ex.submit(() -> {
while(true);
});
ex.shutdown();
ex.awaitTermination(5, TimeUnit.SECONDS);
ex.shutdownNow();
ex.awaitTermination(5, TimeUnit.SECONDS);
factory.stopThreads();
}
}
Это, конечно, небезопасно, но должно соответствовать вашим требованиям. В этом случае он может остановить цикл while(true). Отмена задач не сможет этого сделать.