Задержка ACK в весеннем Кафке

Я использую Spring и Spring Kafka для пакетной службы, которая собирает данные из Kafka до тех пор, пока не будут выполнены определенные условия, а затем создает дамп данных.

Я хочу подтвердить коммиты, когда данные покидают мой сервис, но они могут потенциально находиться в памяти в течение 5-10 минут.

Учитывая, что реализации Acknowledgement в Spring Kafka держатся за исходную запись (записи), кажется неразумным удерживать их до тех пор, пока я не дам свои данные, учитывая, что это будет делать с использованием памяти.

Есть ли другой способ подтвердить / зафиксировать смещения из Spring Kafka, учитывая только информацию о разделе / ​​смещении?

1 ответ

Вы могли бы использовать AckMode.TIME или же AckMode.COUNT с невероятно большим ackTime или же ackCount поэтому контейнер никогда не будет подтвержден.

Затем передайте Consumer<?, ?> в ваш метод слушателя и сделайте смещение зафиксировать себя.

Однако обратите внимание, что потребитель не является потокобезопасным, поэтому вы должны выполнить фиксацию в потоке слушателя.

Кроме того, имейте в виду, что записи не являются индивидуальными, а просто смещением. Вы не можете подтвердить "не в порядке".

Кроме того, вам, вероятно, потребуется увеличить max.poll.interval.ms по умолчанию (5 минут), чтобы избежать перебалансировки разделов.

Другие вопросы по тегам