Как ограничить количество записей в Кафке-потребителе
Я использую слитный продукт Kafka-rest, чтобы использовать записи из темы. Мое намерение состоит в том, чтобы потреблять только первые 100 записей из темы. Я использую следующий API REST для получения записей
GET /consumers/testgroup/instances/my_consumer/records
Как этого добиться? Любая идея?
2 ответа
Насколько я знаю, в настоящее время это невозможно. Как упоминалось в другом ответе, вы можете указать максимальный размер в байтах (хотя в некоторых случаях это может быть проигнорировано посредниками), но вы не можете указать желаемое количество сообщений.
Однако такая функция может быть легко реализована в вашем клиентском коде. Вы можете угадать приблизительный размер, запросить REST API и посмотреть, сколько сообщений вы получили. Если оно меньше 100, запросите его снова, чтобы получить следующие несколько сообщений, пока не достигнете 100.
Можно использовать собственность ConsumerConfig.MAX_POLL_RECORDS_CONFIG
для настройки вашего KafkaConsumer
, Пожалуйста, смотрите документ
Если вы пытаетесь использовать новые пакеты из 100 сообщений из вашей группы потребителей, вы должны установить для max_bytes значение, которое для вашей модели данных всегда будет возвращать примерно 100 записей. Вы можете иметь более консервативную логику (получить меньше, а затем получить немного больше, пока не достигнете 100), или вы можете получить всегда больше, а затем игнорировать. В обоих случаях вы должны принять ручное управление смещением для вашей группы потребителей.
GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000
Если вы получаете более 100 сообщений и по какой-то причине игнорируете их, вы не будете получать их снова в этой группе потребителей, если включена автоматическая фиксация смещения (это определяется при создании вашего потребителя). Вы, вероятно, не хотите, чтобы это произошло!
Если вы вручную фиксируете смещения, вы можете игнорировать все, что захотите, если затем зафиксируете правильные смещения, чтобы гарантировать, что вы не потеряете ни одно сообщение. Вы можете вручную зафиксировать свои смещения следующим образом:
POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": <calculated offset ending where you stopped consuming for this partition>
},
{
"topic": "test",
"partition": 1,
"offset": <calculated offset ending where you stopped consuming for this partition>
}
]
}
Если вы пытаетесь получить ровно первые 100 записей по этой теме, вам необходимо сбросить смещения групп потребителей для этой темы и каждого раздела, прежде чем использовать их один раз. Вы можете сделать это так ( взято из слияния):
POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": 0
},
{
"topic": "test",
"partition": 1,
"offset": 0
}
]
}