Kafka Consumer пустые записи
По этой теме много вопросов, но это НЕ повторяющийся вопрос!
Проблема, с которой я столкнулся, заключается в том, что я попытался настроить проект SpringBoot с Java 14 и Kafka 2.5.0, и мой потребитель возвращает пустой список записей. Большинство ответов здесь указывают на некоторые забытые свойства, на частый опрос или на установку режима смещения на самый ранний.
Я не вижу никакой логической разницы с docs.confluent.io, хотя мои настройки конфигурации кажутся нетрадиционными (см. Мои настройки jaas.conf во фрагменте ниже).
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
return new KafkaConsumer<>(config);
}
}
Однако это работает. Я не получаю никаких исключений (Кафка или иначе), и соединение установлено.
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};
Вот где я на самом деле опрашиваю:
try {
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
int count = 0;
Long start = System.currentTimeMillis();
Long end = System.currentTimeMillis();
while (end - start < 900_000) {
// boolean would be set to true in production
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
System.out.println("visualize number of loops made: " + ++count);
end = System.currentTimeMillis();
}
} catch (KafkaException e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
}
Я добавил отпечатки и другой беспорядок, чтобы попытаться найти проблему. Я запускаю свою программу в режиме отладки и помещаю точку останова в эту строку:
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
В результате я вижу напечатанную строку со счетом каждую секунду, как и следовало ожидать. Но поскольку мой Потребитель не возвращает записей, он никогда не входит вforEach
и поэтому никогда не срабатывает моя точка останова.
Я точно вижу свою тему в облаке, с двумя разделами. Сообщения создаются в непрерывном потоке, поэтому я знаю, что смогу что-то уловить.
Я знаю, что для подключения к кластеру потребуется некоторое время, но, если текущее время установлено на четверть часа, я должен получить хоть что-то, верно? В качестве альтернативы я попытался переключитьconsumer.subscribe()
к consumer.assign()
был указан мой TopicPartition, установив для потребителя consumer.seekToBeginning()
. Он работал нормально, но ничего не вернул.
Еще одна вещь, которой нет в наиболее распространенных примерах, - это то, что я использую свои собственные классы. Так что вместоKafkaConsumer<String, String>
, Я реализовал пользовательские (де) сериализаторы согласно этому руководству.
Может быть, это мои настройки конфигурации? Что-то не так с тайм-аутом опроса? (Де) сериализация или что-то совсем другое? Я действительно не могу точно определить причину, почему я не получаю никаких записей. Любая обратная связь будет принята с благодарностью!
1 ответ
Задача решена. Это не было чем-то, что вы могли бы определить из моего опубликованного вопроса, тем не менее, я хочу прояснить несколько вещей, если кто-то еще застрянет с подобными конфигурациями.
- убедитесь, что полученный пароль действительно правильный. Facepalm
Было так, что я думал, что он подключается к кластеру, но вместо этого мой цикл продолжал печатать счетчик, потому что .poll(Duration.ofMillis(1000))
метод выполняется -> проверяет, может ли он подключиться в течение заданного тайм-аута -> перемещается, возвращая нулевые записи, если соединение не удалось. Ошибка не возникает. Обычно соединение должно быть установлено примерно через 2 секунды.
- проверьте ваше подключение к базе данных.
Вы никогда не хотите, чтобы приложение останавливалось, поэтому я разработал myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())
метод для регистрации всех ошибок, но все исключения перехватываются. Только когда я проверил журналы, я понял, что мои разрешения на доступ к удаленной базе данных не в порядке.
- то, что известно как TimeStamp, следует десериализовать в java.util.Date
Неправильно проанализированный, он вызывает исключение, но мой метод вернул null
. Как и все замечания в этом ответе, этот также сводится к неопытности в таких настройках. Ниже вы найдете исправленные классы, которые могут служить рабочим примером (но это не совсем лучшая практика).
KafkaConfig:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
return new KafkaConsumer<>(config);
}
}
тело метода опроса:
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
while (true) {
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
}
небольшой пример MyClass с Deserializer:
@Data
@Slf4J
public class MyClass implements Deserializer<MyClass> {
@JsonProperty("UNIQUE_KEY")
private Long uniqueKey;
@JsonProperty("EVENT_TIMESTAMP")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
private Date eventTimestamp;
@JsonProperty("SOME_OTHER_FIELD")
private String someOtherField;
@Override
public MyClass deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
MyClass event = null;
try {
event = mapper
.registerModule(new JavaTimeModule())
.readValue(bytes, MyClass.class);
} catch (Exception e) {
log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
}
return event;
}
}
Я надеюсь, что это послужит кому-то еще в будущем. Я многому научился на своих неудачах и ошибках.