Ошибка при сканировании через произвольный промежуток времени
Итак, у меня есть два класса, ответственных за заполнение (Injecting Urls) и ползание.
Класс ESSeedInjector:
public class ESSeedInjector extends ConfigurableTopology {
public static void main(String[] args) {
ConfigurableTopology.start(new ESSeedInjector(), new String[]{".","seeds.txt","-local","-conf", "es-conf.yaml","--sleep","5000"});
}
@Override
public int run(String[] args) {
if (args.length == 0) {
System.err.println("ESSeedInjector seed_dir file_filter");
return -1;
}
conf.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
Scheme scheme = new StringTabScheme(Status.DISCOVERED);
builder.setSpout("spout", new FileSpout(args[0], args[1], scheme));
Fields key = new Fields("url");
builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("spout",
key);
builder.setBolt("enqueue", new StatusUpdaterBolt(), 10)
.customGrouping("filter", new URLStreamGrouping());
return submit("ESSeedInjector", conf, builder);
}
Класс на гусеничном ходу:
public class ESCrawlTopology extends ConfigurableTopology {
public static void main(String[] args) {
ConfigurableTopology.start(new ESCrawlTopology(), new String[]{"-conf", "es-conf.yaml", "-local"});
}
@Override
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);
int numShards = 1;
builder.setSpout("spout", new CollapsingSpout(), numShards);
builder.setBolt("status_metrics", new StatusMetricsBolt())
.shuffleGrouping("spout");
builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers)
.shuffleGrouping("spout");
builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping(
"partitioner", new Fields("key"));
builder.setBolt("sitemap", new SiteMapParserBolt(), numWorkers)
.localOrShuffleGrouping("fetch");
builder.setBolt("parse", new JSoupParserBolt(), numWorkers)
.localOrShuffleGrouping("sitemap");
builder.setBolt("indexer", new IndexerBolt(), numWorkers)
.localOrShuffleGrouping("parse");
Fields furl = new Fields("url");
builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
.fieldsGrouping("fetch", Constants.StatusStreamName, furl)
.fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
.fieldsGrouping("parse", Constants.StatusStreamName, furl)
.fieldsGrouping("indexer", Constants.StatusStreamName, furl);
builder.setBolt("deleter", new DeletionBolt(), numWorkers)
.localOrShuffleGrouping("status",
Constants.DELETION_STREAM_NAME);
conf.registerMetricsConsumer(MetricsConsumer.class);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
return submit("crawl", conf, builder);
}
}
Поток идет -
Запустите класс ESSeedInjector (это успешно вводит URL).
Запустите класс Crawler.
Теперь это начинает ползти, но в произвольный момент времени это приведет к ошибке.
18892 [elasticsearch[_client_][listener][T#2]] ERROR c.d.s.e.p.CollapsingSpout - Exception with ES query
org.elasticsearch.transport.RemoteTransportException: [2rbuRko][127.0.0.1:9300][indices:data/read/search]
Caused by: org.elasticsearch.transport.RemoteTransportException: [2rbuRko][127.0.0.1:9300][indices:data/read/msearch]
Caused by: java.lang.IllegalArgumentException: Validation Failed: 1: no requests added;
at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:29) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.search.MultiSearchRequest.validate(MultiSearchRequest.java:90) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:131) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:64) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:54) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.TransportService.sendLocalRequest(TransportService.java:621) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.TransportService.access$000(TransportService.java:73) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.TransportService$3.sendRequest(TransportService.java:133) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:569) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:502) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.TransportService.sendChildRequest(TransportService.java:529) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.transport.TransportService.sendChildRequest(TransportService.java:520) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.search.SearchTransportService.sendExecuteMultiSearch(SearchTransportService.java:182) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:93) ~[?:?]
at org.elasticsearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:144) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:138) ~[elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.action.search.FetchSearchPhase.moveToNextPhase(FetchSearchPhase.java:207) ~[?:?]
at org.elasticsearch.action.search.FetchSearchPhase.lambda$innerRun$2(FetchSearchPhase.java:105) ~[?:?]
at org.elasticsearch.action.search.FetchSearchPhase.innerRun(FetchSearchPhase.java:117) ~[?:?]
at org.elasticsearch.action.search.FetchSearchPhase.access$000(FetchSearchPhase.java:45) ~[?:?]
at org.elasticsearch.action.search.FetchSearchPhase$1.doRun(FetchSearchPhase.java:87) ~[?:?]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:638) [elasticsearch-5.3.0.jar:5.3.0]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-5.3.0.jar:5.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Не уверен, что является причиной ошибки, но шаблоны, которые я видел, состоят в том, что, если стереть данные из ElasticSearch, запустив ESIndex.Init, а затем выполнить ESSeedInjector, а затем класс ESCrawlTopology, это создаст исключение в самом начале процесса сканирования (после анализа URL семени).
Однако, если я снова запусту ESCrawlTopology, (не делая ничего другого), она выдаст исключение, но гораздо позже.
РЕДАКТИРОВАТЬ: Когда я изменяю с CollapsingSpout() на AggregationSpout(), я теперь получаю этот журнал.
15409 [elasticsearch[_client_][listener][T#1]] INFO c.d.s.e.p.AggregationSpout - ES query returned 0 hits from 0 buckets in 2 msec with 0 already being processed
В ES больше ничего не обрабатывается и не индексируется.