Kafka Consumer не работает с транзакционной семантикой (изоляция. Level = read_commited)

Kafka Consumer не может использовать ни одно сообщение, если у него есть транзакционная семантика в свойствах. Но когда я удаляю это свойство или обновляю это свойство до read_uncommited, его сообщения потребляют.

Ниже приведены мои потребительские свойства Kafka:-

Properties props = new Properties();
    props.put("bootstrap.servers", "10.2.200.15:9092");
    String consumeGroup = "cg3";
    props.put("group.id", consumeGroup);
    // Below is a key setting to turn off the auto commit.
    props.put("enable.auto.commit", "false");
    props.put("heartbeat.interval.ms", "2000");
    props.put("session.timeout.ms", "6001");
    // Control maximum data on each poll, make sure this value is bigger than the
    // maximum // single message size
    props.put("max.partition.fetch.bytes", "140");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("isolation.level","read_committed");

Kafka Producer имеет идентификатор Transactional в своих свойствах и после нажатия некоторых сообщений он совершает транзакцию в целом. Ниже приведены свойства производителя Kafka:-

log.info ("Инициализирующие свойства"); Свойства реквизита = новые свойства ();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
    props.put("linger.ms", 1000);
    props.put("acks", "all");
    // props.put("request.timeout.ms",30000);
    props.put("retries", 3);
    props.put("retry.backoff.ms", 1000);
    props.put("max.in.flight.requests.per.connection", 1); // if its greater than 1, it can change the order or records. Maximum no. of unacknowledge request a client can send.
    props.put("enable.idempotence", true);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"Transaction8");

Ниже фрагмент кода отвечает за совершение транзакций:-

public boolean send(ProducerRecordImpl record) {
    try {
        producer.beginTransaction();
        for (int i = 0; i < 10; i++) {


            Future<RecordMetadata> futureResult = producer
                    .send(new ProducerRecord<String, String>(record.getTopic(), record.getPayload()));
            /*
             * It will wait till the thread execution completes and return true.
             */
            //RecordMetadata ack = futureResult.get();
            //log.debug("RecordMetadta offset {} and partiton {} ", ack.offset(), ack.partition());
        }
        producer.commitTransaction();
        log.info("Commited");


        return true;

Я не могу понять, происходит ли коммит со стороны производителя должным образом, что приводит к тому, что Kafka Consumer не может прочитать его с транзакционной семантикой, или проблема существует со стороны Kafka Consumer.

Любая помощь будет оценена.

2 ответа

Сначала вам нужно вызвать файл sample.initTransactions(). В противном случае ваш производитель не публикует транзакционные сообщения.

Из https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

Должен вызываться перед любыми другими методами, если в конфигурации задан файл транзакции. Этот метод выполняет следующие действия: 1. Гарантирует, что все транзакции, инициированные предыдущими экземплярами производителя с тем же транзакцией.id, завершены. Если предыдущий экземпляр потерпел неудачу с проводимой транзакцией, он будет прерван. Если последняя транзакция начала завершение, но еще не завершилась, этот метод ожидает ее завершения. 2. Получает внутренний идентификатор и эпоху производителя, используемые во всех будущих транзакционных сообщениях, выпущенных производителем.

У меня была такая же проблема при тестировании транзакций на Кафке. Проблема была в операционной системе. Я использовал Windows 10 для запуска брокеров Kafka, и потребитель не мог видеть никаких подтвержденных транзакций, когда они были настроены как "read_committed", после того как я переместил брокеров в Linux, транзакция (и потребители) начала работать. Кстати, Кафка не показывал мне ошибок в логах. Надеюсь, поможет.

Другие вопросы по тегам