Как создать Kafka ZKStringSerializer в Java?
В поисках того, как создать тему Kafka через API, я нашел этот пример в Scala:
import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,
connectionTimeoutMs, ZKStringSerializer)
// Create a topic with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName,
numPartitions, replicationFactor, topicConfig)
Источник: /questions/3007083/kak-myi-mozhem-sozdat-temu-v-kafke-iz-ide-ispolzuya-api/3007097#3007097
Последний аргумент ZKStringSerializer
по-видимому, объект Scala. Мне не понятно, как заставить этот пример работать на Java.
Этот пост Как создать объект scala в clojure, задает тот же вопрос в Clojure, и ответ был:
ZKStringSerializer$/MODULE$
который в Java будет (я думаю) перевести на:
ZKStringSerializer$.MODULE$
Но когда я пытаюсь это сделать (или любое количество других вариантов), ни один из них не компилируется.
Ошибка компиляции:
KafkaTopicCreator.java:[16,18] cannot find symbol
symbol: variable ZKStringSerializer$
location: class org.sample.KafkaTopicCreator
Я использую kafka_2.9.2-0.8.1.1 и Java 8.
1 ответ
Для Java попробуйте следующее,
Первый импорт под заявлением
import kafka.utils.ZKStringSerializer$;
Создайте объект для ZkClient следующим образом,
String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181"
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Приведенный выше код не будет работать для kafka > 0.9, так как API был изменен. Используйте приведенный ниже код для kafka > 0.9
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}