Apache Storm: исключение при отправке топологии: [x] подписывается из несуществующего потока

Извините, если вопрос решен, но я попытался найти его, но у меня не было успеха. Есть некоторые похожие, но я не нашел помощи там, где видел. У меня есть следующая проблема:

603  [main] WARN  b.s.StormSubmitter - Topology submission exception: 
    Component: [escribirFichero] subscribes from non-existent stream: 
               [default] of component [buscamosEnKlout]
Exception in thread "main" java.lang.RuntimeException: 
    InvalidTopologyException(msg:Component: 
               [escribirFichero] subscribes from non-existent stream: 
                   [default] of component [buscamosEnKlout])

Я не могу понять, почему у меня есть это исключение. Я объявляю болт "buscamosEnKlout", прежде чем использовать "escribirFichero". Рядом с моей топологией я поставлю элементарные линии болтов. Я знаю, что носик в порядке, потому что метод проб и ошибок.

Код моей топологии:

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.stats.RollingWindow;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import bolt.*;
import spout.TwitterSpout;
import twitter4j.FilterQuery;

public class TwitterTopologia {
    private static String consumerKey = "xxx1";
    private static String consumerSecret = "xxx2";
    private static String accessToken = "yyy1";
    private static String accessTokenSecret="yyy2";

    public static void main(String[] args) throws Exception {
        /**************** SETUP ****************/
        String remoteClusterTopologyName = null;
        if (args!=null) { ... } 

        TopologyBuilder builder = new TopologyBuilder();
        FilterQuery tweetFilterQuery = new FilterQuery();
        tweetFilterQuery.track(new String[]{"Vacaciones","Holy Week", "Semana Santa","Holidays","Vacation"});
        tweetFilterQuery.language(new String[]{"en","es"});


        TwitterSpout spout = new TwitterSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, tweetFilterQuery);

        KloutBuscador buscamosEnKlout = new KloutBuscador();
        FileWriterBolt fileWriterBolt = new FileWriterBolt("idUsuarios.txt");

        builder.setSpout("spoutLeerTwitter",spout,1);
        builder.setBolt("buscamosEnKlout",buscamosEnKlout,1).shuffleGrouping("spoutLeerTwitter");
        builder.setBolt("escribirFichero",fileWriterBolt,1).shuffleGrouping("buscamosEnKlout");


        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("twitter-fun", conf, builder.createTopology());
            Thread.sleep(460000);
            cluster.shutdown();
        }
    }
}

Болт "KloutBuscador", псевдоним "buscamosEnKlout", является следующим кодом:

String text = tuple.getStringByField("id");

String cadenaUrl;

cadenaUrl = "http://api.klout.com/v2/identity.json/twitter?screenName=";
cadenaUrl += text.replaceAll("\\[", "").replaceAll("\\]","");
cadenaUrl += "&key=" + kloutKey;
URL url = new URL(cadenaUrl);
HttpURLConnection c = (HttpURLConnection) url.openConnection();
        ...........c.setRequestMethod("GET");c.setRequestProperty("Content-length", "0");c.setUseCaches(false);c.setAllowUserInteraction(false);c.connect();
int status = c.getResponseCode();
StringBuilder sb = new StringBuilder();
switch (status) {
    case 200:
    case 201:
       BufferedReader br = new BufferedReader(new InputStreamReader(c.getInputStream()));
       String line;
       while ((line = br.readLine()) != null) sb.append(line + "\n");
           br.close();
       }

JSONObject jsonResponse = new JSONObject(sb.toString());
//getJSONArray("id");
String results = jsonResponse.toString();
_collector.emit(new Values(text,results));

И второй болт, fileWriterBolt, псевдоним "escribirFichero", является следующим:

public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    _collector = outputCollector;
    try {
        writer = new PrintWriter(filename, "UTF-8");...}...}

    public void execute(Tuple tuple) {
        writer.println((count++)+":::"+tuple.getValues());

 //+"+++"+tweet.getUser().getId()+"__FINAL__"+tweet.getUser().getName()
        writer.flush();
        // Confirm that this tuple has been treated.
        //_collector.ack(tuple);

    }

Если я перейду через болт Klous и только напишу результат из носика, он работает. Я не понимаю, почему болт Клоуса вызывает эту неудачу

1 ответ

Ваш болт buscamosEnKlout должен объявить формат кортежей, которые он будет излучать, а также какие потоки он будет излучать. Скорее всего, в этом болте вы не правильно реализовали DeclareOutputFields. Он должен содержать что-то вроде declarer.declare(new Fields("your-text-field", "your-results-field"))

Другие вопросы по тегам