Kafka Consumer пустые записи

По этой теме много вопросов, но это НЕ повторяющийся вопрос!

Проблема, с которой я столкнулся, заключается в том, что я попытался настроить проект SpringBoot с Java 14 и Kafka 2.5.0, и мой потребитель возвращает пустой список записей. Большинство ответов здесь указывают на некоторые забытые свойства, на частый опрос или на установку режима смещения на самый ранний.

Я не вижу никакой логической разницы с docs.confluent.io, хотя мои настройки конфигурации кажутся нетрадиционными (см. Мои настройки jaas.conf во фрагменте ниже).

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

Однако это работает. Я не получаю никаких исключений (Кафка или иначе), и соединение установлено.

// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};

Вот где я на самом деле опрашиваю:

try {
            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            int count = 0;
            Long start = System.currentTimeMillis();
            Long end = System.currentTimeMillis();

            while (end - start < 900_000) { 
                // boolean would be set to true in production
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
               
                consumer.commitSync();

                System.out.println("visualize number of loops made: " + ++count);
                end = System.currentTimeMillis();
            }
        } catch (KafkaException e) {
            e.printStackTrace();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

Я добавил отпечатки и другой беспорядок, чтобы попытаться найти проблему. Я запускаю свою программу в режиме отладки и помещаю точку останова в эту строку:

MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());

В результате я вижу напечатанную строку со счетом каждую секунду, как и следовало ожидать. Но поскольку мой Потребитель не возвращает записей, он никогда не входит вforEach и поэтому никогда не срабатывает моя точка останова.

Я точно вижу свою тему в облаке, с двумя разделами. Сообщения создаются в непрерывном потоке, поэтому я знаю, что смогу что-то уловить.

Я знаю, что для подключения к кластеру потребуется некоторое время, но, если текущее время установлено на четверть часа, я должен получить хоть что-то, верно? В качестве альтернативы я попытался переключитьconsumer.subscribe() к consumer.assign() был указан мой TopicPartition, установив для потребителя consumer.seekToBeginning(). Он работал нормально, но ничего не вернул.

Еще одна вещь, которой нет в наиболее распространенных примерах, - это то, что я использую свои собственные классы. Так что вместоKafkaConsumer<String, String>, Я реализовал пользовательские (де) сериализаторы согласно этому руководству.

Может быть, это мои настройки конфигурации? Что-то не так с тайм-аутом опроса? (Де) сериализация или что-то совсем другое? Я действительно не могу точно определить причину, почему я не получаю никаких записей. Любая обратная связь будет принята с благодарностью!

1 ответ

Решение

Задача решена. Это не было чем-то, что вы могли бы определить из моего опубликованного вопроса, тем не менее, я хочу прояснить несколько вещей, если кто-то еще застрянет с подобными конфигурациями.

  1. убедитесь, что полученный пароль действительно правильный. Facepalm

Было так, что я думал, что он подключается к кластеру, но вместо этого мой цикл продолжал печатать счетчик, потому что .poll(Duration.ofMillis(1000))метод выполняется -> проверяет, может ли он подключиться в течение заданного тайм-аута -> перемещается, возвращая нулевые записи, если соединение не удалось. Ошибка не возникает. Обычно соединение должно быть установлено примерно через 2 секунды.

  1. проверьте ваше подключение к базе данных.

Вы никогда не хотите, чтобы приложение останавливалось, поэтому я разработал myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())метод для регистрации всех ошибок, но все исключения перехватываются. Только когда я проверил журналы, я понял, что мои разрешения на доступ к удаленной базе данных не в порядке.

  1. то, что известно как TimeStamp, следует десериализовать в java.util.Date

Неправильно проанализированный, он вызывает исключение, но мой метод вернул null. Как и все замечания в этом ответе, этот также сводится к неопытности в таких настройках. Ниже вы найдете исправленные классы, которые могут служить рабочим примером (но это не совсем лучшая практика).

KafkaConfig:

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

тело метода опроса:

            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            while (true) {
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
                consumer.commitSync();
            }

небольшой пример MyClass с Deserializer:

@Data
@Slf4J
public class MyClass implements Deserializer<MyClass> {

    @JsonProperty("UNIQUE_KEY")
    private Long uniqueKey;
    @JsonProperty("EVENT_TIMESTAMP")
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
    private Date eventTimestamp;
    @JsonProperty("SOME_OTHER_FIELD")
    private String someOtherField;

@Override
    public MyClass deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        MyClass event = null;
        try {
            event = mapper
                    .registerModule(new JavaTimeModule())
                    .readValue(bytes, MyClass.class);
        } catch (Exception e) {
            log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
        }
        return event;
    }
}

Я надеюсь, что это послужит кому-то еще в будущем. Я многому научился на своих неудачах и ошибках.

Другие вопросы по тегам