Kafka Consumer не работает с транзакционной семантикой (изоляция. Level = read_commited)
Kafka Consumer не может использовать ни одно сообщение, если у него есть транзакционная семантика в свойствах. Но когда я удаляю это свойство или обновляю это свойство до read_uncommited, его сообщения потребляют.
Ниже приведены мои потребительские свойства Kafka:-
Properties props = new Properties();
props.put("bootstrap.servers", "10.2.200.15:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// Control maximum data on each poll, make sure this value is bigger than the
// maximum // single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("isolation.level","read_committed");
Kafka Producer имеет идентификатор Transactional в своих свойствах и после нажатия некоторых сообщений он совершает транзакцию в целом. Ниже приведены свойства производителя Kafka:-
log.info ("Инициализирующие свойства"); Свойства реквизита = новые свойства ();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put("linger.ms", 1000);
props.put("acks", "all");
// props.put("request.timeout.ms",30000);
props.put("retries", 3);
props.put("retry.backoff.ms", 1000);
props.put("max.in.flight.requests.per.connection", 1); // if its greater than 1, it can change the order or records. Maximum no. of unacknowledge request a client can send.
props.put("enable.idempotence", true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"Transaction8");
Ниже фрагмент кода отвечает за совершение транзакций:-
public boolean send(ProducerRecordImpl record) {
try {
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
Future<RecordMetadata> futureResult = producer
.send(new ProducerRecord<String, String>(record.getTopic(), record.getPayload()));
/*
* It will wait till the thread execution completes and return true.
*/
//RecordMetadata ack = futureResult.get();
//log.debug("RecordMetadta offset {} and partiton {} ", ack.offset(), ack.partition());
}
producer.commitTransaction();
log.info("Commited");
return true;
Я не могу понять, происходит ли коммит со стороны производителя должным образом, что приводит к тому, что Kafka Consumer не может прочитать его с транзакционной семантикой, или проблема существует со стороны Kafka Consumer.
Любая помощь будет оценена.
2 ответа
Сначала вам нужно вызвать файл sample.initTransactions(). В противном случае ваш производитель не публикует транзакционные сообщения.
Из https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
Должен вызываться перед любыми другими методами, если в конфигурации задан файл транзакции. Этот метод выполняет следующие действия: 1. Гарантирует, что все транзакции, инициированные предыдущими экземплярами производителя с тем же транзакцией.id, завершены. Если предыдущий экземпляр потерпел неудачу с проводимой транзакцией, он будет прерван. Если последняя транзакция начала завершение, но еще не завершилась, этот метод ожидает ее завершения. 2. Получает внутренний идентификатор и эпоху производителя, используемые во всех будущих транзакционных сообщениях, выпущенных производителем.
У меня была такая же проблема при тестировании транзакций на Кафке. Проблема была в операционной системе. Я использовал Windows 10 для запуска брокеров Kafka, и потребитель не мог видеть никаких подтвержденных транзакций, когда они были настроены как "read_committed", после того как я переместил брокеров в Linux, транзакция (и потребители) начала работать. Кстати, Кафка не показывал мне ошибок в логах. Надеюсь, поможет.