ClassNotFoundException: clojure.lang.PersistentList, компилирование:(clojure/core.clj:20:8) при отправке топологии в локальный кластер в Storm

Я новичок в Apache Storm и пытаюсь создать топологию в шторме, которую можно использовать для обработки потоков в реальном времени. Но когда я пытаюсь представить свою топологию в локальном кластере, я получаю исключения ниже

Exception in thread "main" java.lang.ExceptionInInitializerError
at clojure.lang.Namespace.<init>(Namespace.java:34)
at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
at clojure.lang.Var.internPrivate(Var.java:151)
at org.apache.storm.LocalCluster.<clinit>(Unknown Source)
at KafkaCEPTopology.main(KafkaCEPTopology.java:52)
Caused by: java.lang.ClassNotFoundException: 
clojure.lang.PersistentList, compiling:(clojure/core.clj:20:8)
at clojure.lang.Compiler.analyzeSeq(Compiler.java:6730)
at clojure.lang.Compiler.analyze(Compiler.java:6524)
at clojure.lang.Compiler.access$300(Compiler.java:38)
at clojure.lang.Compiler$DefExpr$Parser.parse(Compiler.java:577)
at clojure.lang.Compiler.analyzeSeq(Compiler.java:6723)
at clojure.lang.Compiler.analyze(Compiler.java:6524)
at clojure.lang.Compiler.analyze(Compiler.java:6485)
at clojure.lang.Compiler.eval(Compiler.java:6786)
at clojure.lang.Compiler.load(Compiler.java:7227)
at clojure.lang.RT.loadResourceScript(RT.java:371)
at clojure.lang.RT.loadResourceScript(RT.java:362)
at clojure.lang.RT.load(RT.java:446)
at clojure.lang.RT.load(RT.java:412)
at clojure.lang.RT.doInit(RT.java:454)
at clojure.lang.RT.<clinit>(RT.java:330)

Немного предыстории:

Я пытаюсь разработать топологию Kafka-CEP, в которой я использую Kafka как носик, а мои экземпляры CEP - как болты в топологии шторма. Ниже приведены мой основной код KafkaCEPTopology и pom.xml для справки. Я использую IntelliJ для запуска своей топологии.

pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
     <project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
     http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>

<groupId>com.stormadvance</groupId>
<artifactId>storm_esper</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>

        <storm.topology>com.stormadvance.storm_esper.KafkaCEPTopology</storm.topology>

</properties>
<build>
    <plugins>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>7</source>
                <target>7</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
                <manifest>
                    <mainClass></mainClass>
                </manifest>
            </archive>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
        <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
                <execution>
                    <goals>
                        <goal>exec</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <executable>java</executable>
                <includeProjectDependencies>true</includeProjectDependencies>
                <includePluginDependencies>false</includePluginDependencies>
                <classpathScope>compile</classpathScope>
                <mainClass>${storm.topology}</mainClass>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
        </plugin>
    </plugins>
</build>
<dependencies>
<dependency>
    <groupId>com.espertech</groupId>
    <artifactId>esper</artifactId>
    <version>5.3.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>compile</scope>
</dependency>
    <dependency>
        <groupId>com.facebook.presto.hadoop</groupId>
        <artifactId>hadoop-apache2</artifactId>
        <version>2.7.3-1</version>
    </dependency>

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>1.0.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>1.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.influxdb</groupId>
        <artifactId>influxdb-java</artifactId>
        <version>2.5</version>
    </dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <exclusions>
        <exclusion>
            <groupId>com.sun.jdmk</groupId>
            <artifactId>jmxtools</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.sun.jmx</groupId>
            <artifactId>jmxri</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>commons-collections</groupId>
    <artifactId>commons-collections</artifactId>
    <version>3.2.1</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>15.0</version>
</dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>connect-json</artifactId>
        <version>1.1.1</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-csv</artifactId>
        <version>1.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>
<repositories>
    <repository>
        <id>clojars</id>
        <url>http://clojars.org/repo/</url>
    </repository>
</repositories>

KafkaCEPTopology.java:

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
public class KafkaCEPTopology {
    public static void main(String[] args) {
    try {
        // ZooKeeper hosts for the Kafka cluster
        BrokerHosts zkHosts = new ZkHosts("localhost:2181");

        //call kafka producer

        KafkaEventProducer kprod = new KafkaEventProducer();
        kprod.initKafkaConfig();
        kprod.initFileConfig("sampleData.csv");
        kprod.sendFileDataToKafka("weatherdata");

        // Create the KafkaSpout configuartion
        // Second argument is the topic name
        // Third argument is the zookeepr root for Kafka
        // Fourth argument is consumer group id
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "weatherdata", "", "weather-consumer-group");
        // Specify that the kafka messages are String
        // We want to consume all the first messages in the topic everytime
        // we run the topology to help in debugging. In production,this
        // property should be false
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        //kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
        //KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
        // Now we create the topology
        TopologyBuilder builder = new TopologyBuilder();
        // set the kafka spout class
        builder.setSpout("KafkaSpout", new KafkaSpout<>(KafkaSpoutConfig.builder("localhost:9092", "weatherdata").build()),2);
         // set the word and sentence bolt class
        builder.setBolt("FeatureSelectionBolt", new FeatureSelectionBolt(), 1).globalGrouping("KafkaSpout");
        builder.setBolt("TrendDetectionBolt", new TrendDetectionBolt(), 1).globalGrouping("FeatureSelectionBolt");
        BoltDeclarer bd = builder.setBolt("PushToInfluxDbBolt", new PushToInfluxDbBolt(), 1);
        bd.allGrouping("FeatureSelectionBolt", "fsBolt");
        bd.globalGrouping("TrendDetectionBolt", "tdBolt");
        // create an instance of LocalCluster class for executing topology
        // in local mode.
        LocalCluster cluster = new LocalCluster();
        Config conf = new Config();
        conf.setDebug(true);
        if (args.length > 0) {
            conf.setNumWorkers(2);
            conf.setMaxSpoutPending(5000);
            StormSubmitter.submitTopology("KafkaCEPTopology", conf, 
            builder.createTopology());
        } else {
            // Submit topology for execution
            cluster.submitTopology("KafkaCEPTopology", conf, 
            builder.createTopology());
            System.out.println("called1");
            Thread.sleep(1000000);
            // Wait for sometime before exiting
            System.out.println("Waiting to consume from kafka");
            System.out.println("called2");
            // kill the KafkaCEPTopology
            cluster.killTopology("KafkaCEPTopology");
            System.out.println("called3");
            // shutdown the storm test cluster
            cluster.shutdown();
        }
    } catch (Exception exception) {
        System.out.println("Thread interrupted exception : " +
                exception);
       }
    }
  }

Но я не понимаю, почему я получаю такие исключения, связанные с Clojure. Я пропустил некоторые зависимости? Пожалуйста помоги

0 ответов

Несколько вещей выпрыгивают из меня.

Не установлен storm-core в compile область действия, иначе вы не сможете запустить свою топологию в реальном кластере. Так должно быть provided,

Не создавайте LocalCluster, который вы не используете. Переместить творение в else ветвь и используйте try-with-resources, чтобы закрыть кластер, когда вы закончите.

Попробуйте использовать maven-shade-plugin вместо maven-assembly-plugin.

Я хотел бы начать с примера POM от штормового стартера и изменить его в соответствии с вашими потребностями. Вы можете найти пример POM по адресу https://github.com/apache/storm/blob/v1.2.3/examples/storm-starter/pom.xml. Обратите внимание, что он содержит некоторые части, которые вам не нужны, например, зависимости HBase, потому что проект storm-starter содержит множество примеров топологий.

Также maven-exec-plugin неправильно настроен как здесь, так и в шторм-стартере. Если вы действительно хотите использовать его, вам нужно переключиться на exec:java Цель. Плагин не работал так, как мне хотелось бы (топология, казалось, запустилась, не было очевидно, обрабатывались ли кортежи), поэтому подумайте о том, чтобы не использовать его.

Другие вопросы по тегам