Java KafkaConsumer Poll не работает - постоянно обнаруживает, отмечает и повторно присоединяется
Мой клиент KafkaConsumer не может получить записи из моего кластера Kafka, размещенного на AWS. У меня есть KafkaProducer с той же конфигурацией, что и у потребителя, и я могу успешно отправить событие в кластер Kafka. Мой потребитель непрерывно регистрирует нижеследующее без получения записей.
2017-11-21 13:48:09.028 INFO 14025 --- [ Thread-3] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator x.x.x.x:9094 (id: 2147483480 rack: null) for group pay123.
2017-11-21 13:48:09.051 INFO 14025 --- [ Thread-3] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group pay123
offsets:{}
2017-11-21 13:48:09.051 INFO 14025 --- [ Thread-3] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group pay123
2017-11-21 13:48:27.486 INFO 14025 --- [ Thread-3] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator x.x.x.x:9094 (id: 2147483480 rack: null) dead for group pay123
2017-11-21 13:48:30.545 INFO 14025 --- [ Thread-3] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator x.x.x.x:9094 (id: 2147483480 rack: null) for group pay123.
2017-11-21 13:48:30.546 INFO 14025 --- [ Thread-3] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group pay123
У меня есть аналогичные настройки в моей локальной виртуальной машине, и все работает, как ожидалось. Не уверен, что это проблема только на стороне потребителя, когда он подключается как потребитель к кластеру kafka, размещенному в AWS.
Ниже приведен мой потребительский код.
private void initialize () {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFilePath);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFilePath);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, clientKeyPassword);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put("session.timeout.ms", "60000");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "600000");
props.put("auto.offset.reset", "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(props);
}
public void run() {
try {
consumer.subscribe(Collections.singletonList("event.t"));
while (true) {
System.out.println("polling kafka");
ConsumerRecords<String, String> records = consumer.poll(6000000);
System.out.println("polling done");
for (ConsumerRecord<String, String> record : records) {
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
System.out.println("event received from kafka - key : " + record.key() + " value : " + record.value());
}
}
} catch (WakeupException e) {
// ignore for shutdown
} catch (Exception e) {
System.out.println("Kafka exception :"+ e.toString());
}
finally {
consumer.close();
}
}
public void shutdown() {
System.out.println("kafka shutdown called: ");
LOGGER.info("kafka shutdown called: ");
consumer.wakeup();
}
Пожалуйста, дайте мне знать, если я что-то упустил в потребительской конфигурации.