Настройка нового потока для Warc Bolt не удалась

Я пытаюсь настроить новый поток для соединения болта Тика с болтом варка.

import com.digitalpebble.stormcrawler.tika.ParserBolt;
import com.digitalpebble.stormcrawler.warc.WARCHdfsBolt;

builder.setBolt("tika", new ParserBolt(), numWorkers)
  .localOrShuffleGrouping("shunt","tika");

WARCHdfsBolt warcbolt = getWarcBolt("XX");

builder.setBolt("warc", warcbolt, numWorkers)
  .localOrShuffleGrouping("tika",  "warc");

В определении Tika я изменил функцию outputDeclarerFields следующим образом, чтобы определить мой новый поток "warc":

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("url", "content", "metadata", "text"));
  declarer.declareStream(StatusStreamName, new Fields("url", "metadata", "status"));
  declarer.declareStream("warc",   new Fields("url", "content", "metadata", "text"));
}

Однако, когда я запускаю топологию в локальном режиме, я получаю:

14308 [main] WARN oasdsSlot - SLOT debian8:1027 Запуск в состоянии EMPTY - нулевое назначение 14308 [main] WARN oasdsSlot - SLOT debian8:1028 Запуск в состоянии EMPTY - нулевое назначение 14308 [main] WARN oasdsSlot - SLOT debian8:1029 Запуск в состоянии EMPTY - присвоение null 14309 [main] INFO oaslAsyncLocalizer - очистка неиспользуемых топологий в /tmp/a1e3b7f5-e251-40ae-a032-b0839ca103c8/supervisor/stormdist 14318 [main] INFO oasdsSupervisor - запуск супервизора с id-9366-ffc 88564 -4b7751ed2d6a на хосте debian8. 15030 [main] WARN oasdnimbus - исключение отправки топологии. (имя топологии ='xxCrawler') #error {: причиной nil:via [{: тип org.apache.storm.generated.InvalidTopologyException: сообщение nil
: at [org.apache.storm.daemon.common $ validate_structure_BANG_ invoke common.clj 185]}]: trace [[org.apache.storm.daemon.common $ validate_structure_BANG_ invoke common.clj 185]
[org.apache.storm.daemon.common $ system_topology_BANG_ invoke common.clj 378]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782 submitTopologyWithOpts nimbus.clj 1694]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782 submitTopology nimbus.clj 1726]
[sun.reflect.NativeMethodAccessorImpl invoke0 NativeMethodAccessorImpl.java -2]
[sun.reflect.NativeMethodAccessorImpl вызывает NativeMethodAccessorImpl.java 62]
[sun.reflect.DelegatingMethodAccessorImpl invoke DelegatingMethodAccessorImpl.java 43] [java.lang.reflect.Method invoke Method.java 498] [clojure.lang.Reflector invokeMatchingMethod Reflector.java 93] возвращать рефлекторный метод.java 93 [org.apache.storm.testing $ submit_local_topology invoke testing.clj 310]
[org.apache.storm.LocalCluster $ _submitTopology invoke LocalCluster.clj 49] [org.apache.storm.LocalCluster submitTopology nil -1]
[com.digitalpebble.stormcrawler.ConfigurableTopology submit ConfigurableTopology.java 76]
[com.digitalpebble.stormcrawler.ConfigurableTopology submit ConfigurableTopology.java 65] [xx.xx.xx.xx.xxTopology run xxTopology.java 111]
[com.digitalpebble.stormcrawler.ConfigurableTopology start ConfigurableTopology.java 50] [xx.xx.xx.xx.xxTopology main xxTopology.java 53]]} 15035 [main] ОШИБКА oassoazsNIOServerCnxnFactory - Поток потока [main,5 main, главный].apache.storm.generated.InvalidTopologyException: пусто в org.apache.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:185) ~[storm-core-1.1.0.jar:1.1.0] в орг.apache.storm.daemon.common$system_topology_BANG_.invoke(common.clj:378) ~[storm-core-1.1.0.jar:1.1.0] в org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782.submitTopologyWithOpts(nimbus.clj:1694) ~[storm-core-1.1.0.jar:1.1.0] в org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782.submitTopology(nimbus.clj:1726) ~[storm-core-1.1.0.jar:1.1.0] at sun.reflect.NativeMethodAccessorImpl.invoke0(собственный метод) ~[?:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131] в java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131] в clojure.lang.Reflector.invokeMatchingMethod(Reflector. Java:93) ~[clojure-1.7.0.jar:?] в clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.7.0.jar:?] в org.apache.storm. тестирование $submit_local_topology.invoke(testing.clj:310) ~[storm-core-1.1.0.jar:1.1.0] в org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49) ~[storm-core-1.1.0.jar:1.1.0] в org.apache.storm.LocalCluster.submitTopology(неизвестный источник) ~ [storm-core-1.1.0.jar: 1.1.0] в com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:76) ~[xx-crawler-1.1.jar:?] at com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:65) ~[xx-1.1.jar:?] в xx.xx.xx.xx.xxTopology.run(xxTopology.java:111) ~[xx-crawler-1.1.jar:?] в com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50) ~[xx-crawler-1.1.jar:?] at xx.xx.xx.xx.xxTopology.main(xxTopology.java:53) ~[xx-crawler-1.1.jar:?]

Любая помощь будет высоко оценен!!

Обратите внимание, что если я использую поток StatusStreamName ("status") для соединения болтов tika и warc, то он работает нормально.

Спасибо,

Etienne

1 ответ

Решение

WARC генерируются из необработанного, не проанализированного содержимого. Вы должны подключить WARC к выходу сборщика вместо болта анализатора.

Вам не нужно объявлять новый поток только для варка, вы можете просто подключить болт варка к стандартному потоку, выходящему из болта Тика.

Я вижу в вашем коде

import com.digitalpebble.stormcrawler.tika.ParserBolt;

что указывает на то, что вы полагаетесь на реализацию по умолчанию (которая не генерирует поток 'warc'). Не могли бы вы забыть заменить это измененной реализацией?

Другие вопросы по тегам