Задержка ACK в весеннем Кафке
Я использую Spring и Spring Kafka для пакетной службы, которая собирает данные из Kafka до тех пор, пока не будут выполнены определенные условия, а затем создает дамп данных.
Я хочу подтвердить коммиты, когда данные покидают мой сервис, но они могут потенциально находиться в памяти в течение 5-10 минут.
Учитывая, что реализации Acknowledgement в Spring Kafka держатся за исходную запись (записи), кажется неразумным удерживать их до тех пор, пока я не дам свои данные, учитывая, что это будет делать с использованием памяти.
Есть ли другой способ подтвердить / зафиксировать смещения из Spring Kafka, учитывая только информацию о разделе / смещении?
1 ответ
Вы могли бы использовать AckMode.TIME
или же AckMode.COUNT
с невероятно большим ackTime
или же ackCount
поэтому контейнер никогда не будет подтвержден.
Затем передайте Consumer<?, ?>
в ваш метод слушателя и сделайте смещение зафиксировать себя.
Однако обратите внимание, что потребитель не является потокобезопасным, поэтому вы должны выполнить фиксацию в потоке слушателя.
Кроме того, имейте в виду, что записи не являются индивидуальными, а просто смещением. Вы не можете подтвердить "не в порядке".
Кроме того, вам, вероятно, потребуется увеличить max.poll.interval.ms
по умолчанию (5 минут), чтобы избежать перебалансировки разделов.