ClassNotFoundException: clojure.lang.PersistentList, компилирование:(clojure/core.clj:20:8) при отправке топологии в локальный кластер в Storm
Я новичок в Apache Storm и пытаюсь создать топологию в шторме, которую можно использовать для обработки потоков в реальном времени. Но когда я пытаюсь представить свою топологию в локальном кластере, я получаю исключения ниже
Exception in thread "main" java.lang.ExceptionInInitializerError
at clojure.lang.Namespace.<init>(Namespace.java:34)
at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
at clojure.lang.Var.internPrivate(Var.java:151)
at org.apache.storm.LocalCluster.<clinit>(Unknown Source)
at KafkaCEPTopology.main(KafkaCEPTopology.java:52)
Caused by: java.lang.ClassNotFoundException:
clojure.lang.PersistentList, compiling:(clojure/core.clj:20:8)
at clojure.lang.Compiler.analyzeSeq(Compiler.java:6730)
at clojure.lang.Compiler.analyze(Compiler.java:6524)
at clojure.lang.Compiler.access$300(Compiler.java:38)
at clojure.lang.Compiler$DefExpr$Parser.parse(Compiler.java:577)
at clojure.lang.Compiler.analyzeSeq(Compiler.java:6723)
at clojure.lang.Compiler.analyze(Compiler.java:6524)
at clojure.lang.Compiler.analyze(Compiler.java:6485)
at clojure.lang.Compiler.eval(Compiler.java:6786)
at clojure.lang.Compiler.load(Compiler.java:7227)
at clojure.lang.RT.loadResourceScript(RT.java:371)
at clojure.lang.RT.loadResourceScript(RT.java:362)
at clojure.lang.RT.load(RT.java:446)
at clojure.lang.RT.load(RT.java:412)
at clojure.lang.RT.doInit(RT.java:454)
at clojure.lang.RT.<clinit>(RT.java:330)
Немного предыстории:
Я пытаюсь разработать топологию Kafka-CEP, в которой я использую Kafka как носик, а мои экземпляры CEP - как болты в топологии шторма. Ниже приведены мой основной код KafkaCEPTopology и pom.xml для справки. Я использую IntelliJ для запуска своей топологии.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.stormadvance</groupId>
<artifactId>storm_esper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<storm.topology>com.stormadvance.storm_esper.KafkaCEPTopology</storm.topology>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.espertech</groupId>
<artifactId>esper</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto.hadoop</groupId>
<artifactId>hadoop-apache2</artifactId>
<version>2.7.3-1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo/</url>
</repository>
</repositories>
KafkaCEPTopology.java:
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
public class KafkaCEPTopology {
public static void main(String[] args) {
try {
// ZooKeeper hosts for the Kafka cluster
BrokerHosts zkHosts = new ZkHosts("localhost:2181");
//call kafka producer
KafkaEventProducer kprod = new KafkaEventProducer();
kprod.initKafkaConfig();
kprod.initFileConfig("sampleData.csv");
kprod.sendFileDataToKafka("weatherdata");
// Create the KafkaSpout configuartion
// Second argument is the topic name
// Third argument is the zookeepr root for Kafka
// Fourth argument is consumer group id
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "weatherdata", "", "weather-consumer-group");
// Specify that the kafka messages are String
// We want to consume all the first messages in the topic everytime
// we run the topology to help in debugging. In production,this
// property should be false
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
//KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
// Now we create the topology
TopologyBuilder builder = new TopologyBuilder();
// set the kafka spout class
builder.setSpout("KafkaSpout", new KafkaSpout<>(KafkaSpoutConfig.builder("localhost:9092", "weatherdata").build()),2);
// set the word and sentence bolt class
builder.setBolt("FeatureSelectionBolt", new FeatureSelectionBolt(), 1).globalGrouping("KafkaSpout");
builder.setBolt("TrendDetectionBolt", new TrendDetectionBolt(), 1).globalGrouping("FeatureSelectionBolt");
BoltDeclarer bd = builder.setBolt("PushToInfluxDbBolt", new PushToInfluxDbBolt(), 1);
bd.allGrouping("FeatureSelectionBolt", "fsBolt");
bd.globalGrouping("TrendDetectionBolt", "tdBolt");
// create an instance of LocalCluster class for executing topology
// in local mode.
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
conf.setDebug(true);
if (args.length > 0) {
conf.setNumWorkers(2);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("KafkaCEPTopology", conf,
builder.createTopology());
} else {
// Submit topology for execution
cluster.submitTopology("KafkaCEPTopology", conf,
builder.createTopology());
System.out.println("called1");
Thread.sleep(1000000);
// Wait for sometime before exiting
System.out.println("Waiting to consume from kafka");
System.out.println("called2");
// kill the KafkaCEPTopology
cluster.killTopology("KafkaCEPTopology");
System.out.println("called3");
// shutdown the storm test cluster
cluster.shutdown();
}
} catch (Exception exception) {
System.out.println("Thread interrupted exception : " +
exception);
}
}
}
Но я не понимаю, почему я получаю такие исключения, связанные с Clojure. Я пропустил некоторые зависимости? Пожалуйста помоги
0 ответов
Несколько вещей выпрыгивают из меня.
Не установлен storm-core
в compile
область действия, иначе вы не сможете запустить свою топологию в реальном кластере. Так должно быть provided
,
Не создавайте LocalCluster, который вы не используете. Переместить творение в else
ветвь и используйте try-with-resources, чтобы закрыть кластер, когда вы закончите.
Попробуйте использовать maven-shade-plugin вместо maven-assembly-plugin.
Я хотел бы начать с примера POM от штормового стартера и изменить его в соответствии с вашими потребностями. Вы можете найти пример POM по адресу https://github.com/apache/storm/blob/v1.2.3/examples/storm-starter/pom.xml. Обратите внимание, что он содержит некоторые части, которые вам не нужны, например, зависимости HBase, потому что проект storm-starter содержит множество примеров топологий.
Также maven-exec-plugin неправильно настроен как здесь, так и в шторм-стартере. Если вы действительно хотите использовать его, вам нужно переключиться на exec:java
Цель. Плагин не работал так, как мне хотелось бы (топология, казалось, запустилась, не было очевидно, обрабатывались ли кортежи), поэтому подумайте о том, чтобы не использовать его.