ExecutorService никогда не останавливается. Когда выполнить новое задание внутри другого выполняющегося задания
Добрый день.
У меня проблема с блокировщиком в моем проекте веб-сканера. Логика проста. Первый создает один Runnable
, он загружает HTML-документ, сканирует все ссылки, а затем по всем финансируемым ссылкам создает новые Runnable
объекты. Каждый новый создан Runnable
в свою очередь создает новые Runnable
объекты для каждой ссылки и выполнить их.
Проблема в том, что ExecutorService
никогда не останавливается
CrawlerTest.java
public class CrawlerTest {
public static void main(String[] args) throws InterruptedException {
new CrawlerService().crawlInternetResource("https://jsoup.org/");
}
}
CrawlerService.java
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
public class CrawlerService {
private Set<String> uniqueUrls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>(10000));
private ExecutorService executorService = Executors.newFixedThreadPool(8);
private String baseDomainUrl;
public void crawlInternetResource(String baseDomainUrl) throws InterruptedException {
this.baseDomainUrl = baseDomainUrl;
System.out.println("Start");
executorService.execute(new Crawler(baseDomainUrl)); //Run first thread and scan main domain page. This thread produce new threads.
executorService.awaitTermination(10, TimeUnit.MINUTES);
System.out.println("End");
}
private class Crawler implements Runnable { // Inner class that encapsulates thread and scan for links
private String urlToCrawl;
public Crawler(String urlToCrawl) {
this.urlToCrawl = urlToCrawl;
}
public void run() {
try {
findAllLinks();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void findAllLinks() throws InterruptedException {
/*Try to add new url in collection, if url is unique adds it to collection,
* scan document and start new thread for finded links*/
if (uniqueUrls.add(urlToCrawl)) {
System.out.println(urlToCrawl);
Document htmlDocument = loadHtmlDocument(urlToCrawl);
Elements findedLinks = htmlDocument.select("a[href]");
for (Element link : findedLinks) {
String absLink = link.attr("abs:href");
if (absLink.contains(baseDomainUrl) && !absLink.contains("#")) { //Check that we are don't go out of domain
executorService.execute(new Crawler(absLink)); //Start new thread for each funded link
}
}
}
}
private Document loadHtmlDocument(String internetResourceUrl) {
Document document = null;
try {
document = Jsoup.connect(internetResourceUrl).ignoreHttpErrors(true).ignoreContentType(true)
.userAgent("Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
.timeout(10000).get();
} catch (IOException e) {
System.out.println("Page load error");
e.printStackTrace();
}
return document;
}
}
}
Этому приложению требуется около 20 секунд для сканирования всех уникальных ссылок на сайте jsoup.org. Но это просто подождать 10 минут executorService.awaitTermination(10, TimeUnit.MINUTES);
и тогда я вижу мертвую главную ветку и все еще работающего исполнителя.
Как заставить ExecutorService
работать правильно?
Я думаю, проблема в том, что он вызывает executorService.execute внутри другой задачи, а не в основном потоке.
4 ответа
Я вижу ваш комментарий от ранее:
Я не могу использовать CountDownLatch, потому что заранее не знаю, сколько уникальных ссылок я соберу с ресурса.
Прежде всего, vsminkov находится на месте с ответом относительно того, почему awaitTermniation
будет сидеть и ждать 10 минут. Я предложу альтернативное решение.
Вместо использования CountDownLatch
используйте фазер. Для каждого нового задания вы можете зарегистрироваться и дождаться завершения.
Создайте один фазер и register
каждый раз execute.submit
вызывается и arrive
каждый раз Runnable
завершается.
public void crawlInternetResource(String baseDomainUrl) {
this.baseDomainUrl = baseDomainUrl;
Phaser phaser = new Phaser();
executorService.execute(new Crawler(phaser, baseDomainUrl));
int phase = phaser.getPhase();
phase.awaitAdvance(phase);
}
private class Crawler implements Runnable {
private final Phaser phaser;
private String urlToCrawl;
public Crawler(Phaser phaser, String urlToCrawl) {
this.urlToCrawl = urlToCrawl;
this.phaser = phaser;
phaser.register(); // register new task
}
public void run(){
...
phaser.arrive(); //may want to surround this in try/finally
}
Вы злоупотребляете awaitTermination
, По словам Javadoc вы должны позвонить shutdown
первый:
Блокируется до тех пор, пока все задачи не завершат выполнение после запроса на выключение, или не истечет время ожидания, или текущий поток не прервется, в зависимости от того, что произойдет раньше.
Для достижения вашей цели я бы предложил использовать CountDownLatch
(или защелка, поддерживающая приращения, подобные этой), чтобы определить точный момент, когда не осталось задач, чтобы вы могли безопасно выполнять shutdown
,
Вы не вызываете выключение.
Это может работать - переменная AtomicLong в CrawlerService. Инкремент перед каждой новой подзадачей передается в службу исполнителя.
Измените метод run(), чтобы уменьшить этот счетчик, и, если 0, отключите службу executor.
public void run() {
try {
findAllLinks();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//decrements counter
//If 0, shutdown executor from here or just notify CrawlerService who would be doing wait().
}
}
В поле "наконец" уменьшите счетчик, а когда счетчик станет равным нулю, завершите работу исполнителя или просто уведомите CrawlerService. 0 означает, что это последний, другой не запущен, ни один не находится в очереди. Ни одно задание не отправит никаких новых подзадач.
Как заставить ExecutorService работать правильно?
Я думаю, проблема в том, что он вызывает executorService.execute внутри другой задачи, а не в основном потоке.
Нет. Проблема не в ExecutorService. Вы используете API неправильно и, следовательно, не получаете правильный результат.
Вы должны использовать три API в определенном порядке, чтобы получить правильный результат.
1. shutdown
2. awaitTermination
3. shutdownNow
Рекомендуемый путь со страницы документации оракула ExecutorService:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
shutdown():
Инициирует упорядоченное завершение, при котором выполняются ранее отправленные задачи, но новые задачи не принимаются.
shutdownNow():
Пытается остановить все активно выполняющиеся задачи, останавливает обработку ожидающих задач и возвращает список задач, ожидающих выполнения.
awaitTermination():
Блокируется до тех пор, пока все задачи не завершат выполнение после запроса на выключение, или не истечет время ожидания, или текущий поток не прервется, в зависимости от того, что произойдет раньше.
С другой стороны: если вы хотите дождаться завершения всех задач, обратитесь к этому связанному вопросу SE:
ждать пока все потоки не закончат свою работу в Java
Я предпочитаю использовать invokeAll()
или же ForkJoinPool()
, которые лучше всего подходят для вашего случая использования.