Java KafkaConsumer Poll не работает - постоянно обнаруживает, отмечает и повторно присоединяется

Мой клиент KafkaConsumer не может получить записи из моего кластера Kafka, размещенного на AWS. У меня есть KafkaProducer с той же конфигурацией, что и у потребителя, и я могу успешно отправить событие в кластер Kafka. Мой потребитель непрерывно регистрирует нижеследующее без получения записей.

2017-11-21 13:48:09.028  INFO 14025 --- [       Thread-3] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator x.x.x.x:9094 (id: 2147483480 rack: null) for group pay123.
2017-11-21 13:48:09.051  INFO 14025 --- [       Thread-3] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group pay123
offsets:{}
2017-11-21 13:48:09.051  INFO 14025 --- [       Thread-3] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group pay123
2017-11-21 13:48:27.486  INFO 14025 --- [       Thread-3] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator x.x.x.x:9094 (id: 2147483480 rack: null) dead for group pay123
2017-11-21 13:48:30.545  INFO 14025 --- [       Thread-3] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator x.x.x.x:9094 (id: 2147483480 rack: null) for group pay123.
2017-11-21 13:48:30.546  INFO 14025 --- [       Thread-3] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group pay123

У меня есть аналогичные настройки в моей локальной виртуальной машине, и все работает, как ожидалось. Не уверен, что это проблема только на стороне потребителя, когда он подключается как потребитель к кластеру kafka, размещенному в AWS.

Ниже приведен мой потребительский код.

private void initialize () {

    Properties props = new Properties();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFilePath);
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  trustStorePassword);
    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFilePath);
    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, clientKeyPassword);

    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put("session.timeout.ms", "60000");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "600000");
    props.put("auto.offset.reset", "earliest");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    consumer = new KafkaConsumer<>(props);


}

public void run() {
    try {

        consumer.subscribe(Collections.singletonList("event.t"));

        while (true) {
            System.out.println("polling kafka");
            ConsumerRecords<String, String> records = consumer.poll(6000000);
            System.out.println("polling done");
            for (ConsumerRecord<String, String> record : records) {

                currentOffsets.put(new TopicPartition(record.topic(),
                        record.partition()), new
                        OffsetAndMetadata(record.offset()+1, "no metadata"));

                System.out.println("event received from kafka - key : " + record.key() + " value : " + record.value());

            }

        }
    } catch (WakeupException e) {
        // ignore for shutdown

    } catch (Exception e) {
        System.out.println("Kafka exception :"+ e.toString());
    }
    finally {
        consumer.close();
    }
}


public void shutdown() {
    System.out.println("kafka shutdown called: ");
    LOGGER.info("kafka shutdown called: ");
    consumer.wakeup();
}

Пожалуйста, дайте мне знать, если я что-то упустил в потребительской конфигурации.

0 ответов

Другие вопросы по тегам