Apache Kafka + интеграция с Apache Camel + проблема POC + java.lang.NullPointerException в java.util.Hashtable.put(Hashtable.java:459)
Это мой строитель маршрута. Здесь я пытаюсь вставить данные из моего файла в topic.Later, я прохожу через мой основной метод и использую верблюжий контекст, я запускаю его. Я пробовал несколько кодов, но ни один не помог мне. Я работаю над POC Apache Kafka - Camel.
public class SimpleRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
String topicName = "test120";
String kafkaServer = "kafka:localhost:9092";
String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
String toKafka = "kafka:localhost:9092?topic=test120;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1";
// toKafka = new StringBuilder().append("&").append(serializerClass).toString();
/*new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
.append(zooKeeperHost).append("&").append(serializerClass).toString();*/
from("file:C:/inbox?noop=true").to(toKafka);
}
}
Это мой pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>HelloWorld</groupId>
<artifactId>Pallavi</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.20.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>2.20.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
</project>
Это мой основной класс:
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
public class MainApp {
public static void main(String[] args) {
SimpleRouteBuilder routeBuilder = new SimpleRouteBuilder();
CamelContext ctx = new DefaultCamelContext();
try {
ctx.addRoutes(routeBuilder);
ctx.start();
Thread.sleep(5 * 60 * 1000);
ctx.stop();
System.out.println("hi i am working");
}
catch (Exception e) {
e.printStackTrace();
}
}
}
ошибка:
java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:459)
at org.apache.camel.component.kafka.KafkaProducer.getProps(KafkaProducer.java:63)
at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:89)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:75)
at org.apache.camel.impl.DeferServiceStartupListener.onCamelContextStarted(DeferServiceStartupListener.java:49)
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3859)
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3638)
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3490)
at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:208)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3249)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3245)
at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3268)
at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3245)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3168)
at demo.MainApp.main(MainApp.java:13)
Picked up _JAVA_OPTIONS: -Xmx512M -Xms512M
4 ответа
Я следовал тому же учебнику. Но изначально я не смог запустить успешно (кто использовал последнюю версию сервера kafka). Я получил следующее исключение вместо исключения с нулевой точкой:
org.apache.camel.FailedToCreateRouteException: Failed to create route route1 at: >>> Split[tokenize{body() using token:
} -> [To[kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&serializerClass=kafka.serializer.StringEncoder]]] <<< in route: Route(route1)[[From[file:C:/input?noop=true]] -> [Split[toke... because of Failed to resolve endpoint: kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=test&zookeeperHost=localhost&zookeeperPort=2181 due to: Failed to resolve endpoint: kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=test&zookeeperHost=localhost&zookeeperPort=2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{zookeeperHost=localhost, zookeeperPort=2181}]
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1303)
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:204)
at org.apache.camel.impl.DefaultCamelContext.startRoute(DefaultCamelContext.java:1143)
at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:3729)
at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3443)
at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:209)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3251)
at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3247)
at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3270)
at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3247)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3163)
at com.parane.kafka.demo.DemoApplication.main(DemoApplication.java:17)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=test&zookeeperHost=localhost&zookeeperPort=2181 due to: Failed to resolve endpoint: kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=test&zookeeperHost=localhost&zookeeperPort=2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{zookeeperHost=localhost, zookeeperPort=2181}]
at org.apache.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:758)
at org.apache.camel.util.CamelContextHelper.getMandatoryEndpoint(CamelContextHelper.java:80)
at org.apache.camel.model.RouteDefinition.resolveEndpoint(RouteDefinition.java:219)
at org.apache.camel.impl.DefaultRouteContext.resolveEndpoint(DefaultRouteContext.java:115)
at org.apache.camel.impl.DefaultRouteContext.resolveEndpoint(DefaultRouteContext.java:121)
at org.apache.camel.model.SendDefinition.resolveEndpoint(SendDefinition.java:62)
at org.apache.camel.model.SendDefinition.createProcessor(SendDefinition.java:56)
at org.apache.camel.model.ProcessorDefinition.createProcessor(ProcessorDefinition.java:511)
at org.apache.camel.model.ProcessorDefinition.createOutputsProcessorImpl(ProcessorDefinition.java:474)
at org.apache.camel.model.ProcessorDefinition.createOutputsProcessor(ProcessorDefinition.java:441)
at org.apache.camel.model.ProcessorDefinition.createOutputsProcessor(ProcessorDefinition.java:185)
at org.apache.camel.model.ProcessorDefinition.createChildProcessor(ProcessorDefinition.java:204)
at org.apache.camel.model.SplitDefinition.createProcessor(SplitDefinition.java:101)
at org.apache.camel.model.ProcessorDefinition.makeProcessorImpl(ProcessorDefinition.java:562)
at org.apache.camel.model.ProcessorDefinition.makeProcessor(ProcessorDefinition.java:523)
at org.apache.camel.model.ProcessorDefinition.addRoutes(ProcessorDefinition.java:239)
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1300)
... 17 more
Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: kafka://localhost:9092?serializerClass=kafka.serializer.StringEncoder&topic=test&zookeeperHost=localhost&zookeeperPort=2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{zookeeperHost=localhost, zookeeperPort=2181}]
at org.apache.camel.impl.DefaultComponent.validateParameters(DefaultComponent.java:215)
at org.apache.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:139)
at org.apache.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:711)
... 33 more
Как уже упоминалось Клаусом Ибсеном, нужно добавить
брокеры
Пример кода
toKafka="kafka://localhost:9092?topic=test&brokers=localhost:9092";
Я надеюсь, что это будет полезно, кто следит за учебником.
Убедитесь, что каталог C:\inbox действительно существует. Кроме того, убедитесь, что вы изменили версии зависимостей в pom-файле на версии, упомянутые в руководстве, так как все изменилось со времен camel-kafka 2.17.0, и он больше не принимает значения zookeeperHost и zookeeperPort, как в строке toKafka.,
Вам необходимо настроить brokers
вариант на конечной точке. Или глобально на компоненте kafka.
Я зарегистрировал тикет, чтобы сделать отчет Camel лучшим исключением в следующих выпусках: https://issues.apache.org/jira/browse/CAMEL-12090
Здесь также есть пример Kafka: https://github.com/apache/camel/tree/master/examples/camel-example-kafka
Возможно, вы не установили свойства производителя, для которого есть конфигурации, предоставленные для производителя Kafka.
Компонент Kafka для верблюда должен быть настроен с обязательными свойствами производителя Kafka.