Запись в ConfluentCloud из Apache Beam (поток данных GCP)
Я пытаюсь написать для записи в Confluent Cloud/Kafka из потока данных (Apache Beam), используя следующее:
kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
.withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
.withTopic("testtopic").withKeySerializer(StringSerializer.class)
.withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));
где Map<String, Object> props = new HashMap<>();
(т.е. пока пусто)
В журналах получаю: send failed : 'Topic testtopic not present in metadata after 60000 ms.'
Тема действительно существует в этом кластере, поэтому я предполагаю, что существует проблема с входом в систему, что имеет смысл, поскольку я не мог найти способ передать APIKey.
Я пробовал различные комбинации, чтобы передать APIKey/Secret, который у меня есть из Confluent Cloud, для аутентификации с помощью props
выше, но мне не удалось найти рабочую установку.
1 ответ
Нашел решение, благодаря указателям в комментариях @RobinMoffatt под вопросом
Вот настройки, которые у меня есть сейчас:
Map<String, Object> props = new HashMap<>()
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<APIKEY>\" password=\"<SECRET>";");
props.put("security.protocol", "SASL_SSL");
kafkaKnowledgeGraphKVRecords.apply("Write to Kafka-TESTTOPIC", KafkaIO.<String, String>write()
.withBootstrapServers("<CLUSTER>.confluent.cloud:9092")
.withTopic("test").withKeySerializer(StringSerializer.class)
.withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));
Ключевое слово, которое я ошибся, - это sasl.jaas.config
(Обратите внимание ;
в конце!)