Исключение в KafkaSpout
Я получаю следующую исключительную топологию шторма.
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[stormjar.jar:?]
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:81) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:71) ~[stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:135) ~[stormjar.jar:?]
at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:110) ~[stormjar.jar:?]
at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:71) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__10727$fn__10742$fn__10773.invoke(executor.clj:654) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Конфигурация POM:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<!-- <version>0.10.0</version> -->
<version>1.2.2</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>log4j-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<!-- <version>0.10.0</version> -->
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
Я использую библиотеку Storm-Kafka, которая устарела. Если это является причиной вышеупомянутого исключения, тогда дайте мне знать, как создать носик kafka, используя библиотеку storm-kafka-client, и передать ему собственную схему.
Благодарю.
1 ответ
Не могли бы вы попытаться также поставить org.apache.kafka:kafka-clients
артефакт в ваших зависимостях, в той же версии, что и kafka_2.11
?
Относительно того, как использовать storm-kafka-client, есть документация на странице Storm по адресу https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html и примеры на https: // github..com / Apache / ливневый / блоб / ведущий / примеры / ливневые Кафка-клиент-примеры / SRC / Основной / Java / орг / Apache / ливневый / Kafka / излив / KafkaSpoutTopologyMainNamedTopics.java
В частности, что вы хотите, это RecordTranslator
,
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM);
trans.forTopic(TOPIC_2,
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM);
return KafkaSpoutConfig.builder(bootstrapServers, new String[]{TOPIC_0, TOPIC_1, TOPIC_2})
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setRetry(getRetryService())
.setRecordTranslator(trans)
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
.build();
Этот, например, будет выводить тему, раздел, смещение, ключ и значение из каждой записи в перечисленных полях и будет генерировать кортежи из TOPIC_2 в другой поток из других подписанных тем. Если вам не нужны разные схемы для разных тем, вы можете использовать SimpleRecordTranslator
вместо.
Ссылаясь на этот https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html.
Я попытался запустить топологию с помощью storm-kafka-client. но опять получаю исключение:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/storm/kafka/spout/KafkaSpoutConfig
at BuildTopology.buidTopology(BuildTopology.java:16)
at BuildTopology.runTopology(BuildTopology.java:34)
at Main.main(Main.java:6)
Caused by: java.lang.ClassNotFoundException: org.apache.storm.kafka.spout.KafkaSpoutConfig
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 3 more
Конфигурация POM такая же, как и выше, только что добавлена зависимость storm-kafka-client вместо зависимости stom-kafka
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
Образец кода:
//spouts
topologyBuilder.setSpout("spout-1", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:9092", Pattern.compile("test-topic-1")).build()), 1);
//bolts
topologyBuilder.setBolt("test-bolt-1", new TestBolt(), 1).shuffleGrouping("spout-1");
Я что-то пропустил?