Прекращение выполнения потока Java и роль объединения ()
Моя программа анализирует большое количество документов и иногда получает страницу, которая вызывает бесконечный или очень длинный цикл. Это не анализируется заранее. Я хочу убить определенную страницу и перейти к следующей (отбрасывая любые результаты для оскорбительной страницы). Я прочитал ТАК ответы , как этот Как остановить выполнение после определенного времени в Java? и написал следующий код:
// main program
private void runThread() throws InterruptedException {
long timeout = 15000L;
RunPageAnalyzer runPageAnalyzer = new RunPageAnalyzer(this);
Thread t = new Thread(runPageAnalyzer);
long startTime = System.currentTimeMillis();
t.start();
while (t.isAlive()) {
t.join(1000);
long delta = System.currentTimeMillis() - startTime;
LOG.debug("delta: "+delta);
if (delta > timeout && t.isAlive()) {
t.interrupt();
t.join;
break;
}
}
}
метод в том же классе, вызываемый потоком
void runActions() {
// variable length calculation which should be abandoned if too long
}
и Runnable:
class RunPageAnalyzer implements Runnable {
private PageAnalyzerAction pageAnalyzerAction;
public RunPageAnalyzer(PageAnalyzerAction pageAnalyzerAction) {
this.pageAnalyzerAction = pageAnalyzerAction;
}
public void run() {
try {
pageAnalyzerAction.runActions();
} catch (Exception e) {
LOG.debug("Exception running thread ", e);
}
}
Вывод для нормального завершения runActions() выглядит нормально:
=========== page 1 =============
13863 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - pageActions: 24 on page 0
14863 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 1000
15864 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 2001
16864 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 3001
16975 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 3112
16975 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - finished page
но когда превышен лимит времени, процесс зависает t.join()
,
=========== page 2 =============
16975 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - pageActions: 24 on page 0
17976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 1001
18976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 2001
// ...
30976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 14001
31976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction - delta: 15001
Если я опущу t.join()
затем процесс ведет себя так, как я ожидал, но я беспокоюсь, что это может быть просто создание огромного количества потоков, что позже станет проблемой.
ОБНОВЛЕНИЕ: ответы до сих пор предположили, что это нетривиально (и я не нашел стандартные примеры / руководства по Java очень полезными). Ключевым моментом является то, что runActions()
должен четко знать, что это может быть прервано. join()
это не основная проблема, потому что потоки просто продолжают идти.
ДАЛЬНЕЙШИЙ ВОПРОС: Должен ли я вставить Thread.currentThread().isInterrupted()
во всех местах в runActions()
который я ожидаю в непредсказуемо длинных циклах?
3 ответа
Я предполагаю, что здесь pageAnalyzerAction.runActions();
может быть прервано (то есть обрабатывает прерывания путем довольно быстрого выхода).
Если вас не устраивает API-интерфейс потока низкого уровня, вы можете использовать executor и futures из пакета java.concurrent, чтобы иметь дело с управлением потоками и политикой тайм-аута:
- исполнитель будет обрабатывать управление потоками с помощью пула потоков, используя их при необходимости
- будущее, возвращенное при отправке задачи, может быть запрошено с тайм-аутом - если задача не завершится в течение тайм-аута, будущее сгенерирует исключение TimeOutException, и вы сможете отменить свою задачу
Придуманный пример будет:
//declare an executor somewhere in your code, at a high level to recycle threads
ExecutorService executor = Executors.newFixedThreadPool(10); //number of threads: to be adjusted
private void runThread() throws InterruptedException {
long timeout = 15000L;
RunPageAnalyzer runPageAnalyzer = new RunPageAnalyzer(this);
Future future = executor.submit(runPageAnalyzer);
try {
future.get(timeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
//the runnable threw an exception: handle it
} catch (TimeoutException e) {
//the task could not complete before the timeout
future.cancel(true); //interrrupt it
}
}
Есть еще кое-что, что здесь не упоминается в ответах. Если вы хотите отменить ввод / вывод из потока, вы просто не можете "отменить" его и ожидать, что фактический ввод / вывод будет отменен. В основном вам нужно обработать исключение прерывания в вашей "задаче" и обработать его соответствующим образом, возможно, даже закрыть соединение с сокетом. У меня есть небольшой фрагмент, посвященный "остановке" задач, выполняемых с использованием потоков, которые могут оказаться полезными (извините, если в нем есть опечатки, он был написан очень давно).
public class ThreadStopTest {
public static void main(String[] args) {
testSqlThreadStop();
}
private static void testSocketReadStop() {
ExecutorService executor = Executors.newFixedThreadPool(2);
SocketTask task = new SocketTask("http://www.yahoo.com", 80);
Future<Integer> future = executor.submit(task);
try {
Integer result = future.get(1, TimeUnit.SECONDS);
System.out.println("Computation complete; result: " + result);
} catch(TimeoutException te) {
future.cancel(true);
task.cleanupAfterCancel();
System.out.println("Computation cancelled");
} catch(Exception e) {
e.printStackTrace();
}
executor.shutdown();
}
}
class SocketTask implements CleanableTask<Integer> {
private final String host;
private final int port;
private Socket socket;
public SocketTask(final String host, final int port) {
this.host = host;
this.port = port;
}
@Override
public Integer call() throws Exception {
InputStream in = null;
// TODO: Actually update the count and cleanly handle exceptions
int bytesRead = 0;
try {
this.socket = new Socket(this.host, this.port);
in = this.socket.getInputStream();
byte[] bytes = new byte[1000000];
System.out.println("Started reading bytes");
// The below behavior of waiting for a forceful close can be avoided
// if we modify the FutureTask class (the default Future impl)
// by passing in a CleanupHandler whose cleanup() method would be
// invoked after invoking the `cancel` method or by making all
// your tasks implement a CancelledTask interface which has a
// `cleanupAfterCancel` method to do the same. :)
try {
in.read(bytes);
} catch(SocketException se) {
if(Thread.currentThread().isInterrupted()) {
System.out.println("All OK; this socket was forcefully closed");
} else {
se.printStackTrace(); // something was seriously wrong
}
}
} catch(Exception e) {
e.printStackTrace();
} finally {
if(in != null) in.close();
}
return Integer.valueOf(bytesRead);
}
@Override
public void cleanupAfterCancel() {
try {
this.socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Это звучит как твой runActions
-method не реагирует на состояние прерывания устанавливаемого потока. Последний join
звонок после звонка interrupt
не имеет таймаута, и будет ждать в течение неопределенного времени t
умереть. Вы должны проверить состояние прерывания в вашем runActions
-метод и реагирование путем прекращения операции, если установлен прерванный статус (Thread.interrupted()
возвращает истину).