Как уменьшить скорость приема Kafka Spout и включить противодавление?
Я использую storm-kafka-client 1.1.1 и storm-core 1.1.0.
Я настроил следующие параметры, но не смог включить противодавление и снизить скорость приема кафки-излива.
Носик потребляет 2000 сообщений в секунду.
Для обработки сообщения нисходящему болту требуется 50 мс, т.е. обрабатывается 20 сообщений в секунду.
Отставание между выбрасываемым носиком кортежем и выполненным болтом кортежем увеличивается с течением времени.
** Как я могу заставить Spout читать, скажем, 20 сообщений в секунду и поддерживать его скорость потребления такой же, как скорость выполнения Bolt **
**Topology**
topology.max.spout.pending= **5** ,
topology.message.timeout.secs= **600** ,
topology.executor.send.buffer.size=**64** ,
topology.executor.receive.buffer.size=**64** ,
topology.transfer.buffer.size=**64**
**KafkaSpoutConfig**
setPollTimeoutMs(**200**) ,
setFirstPollOffsetStrategy(latest) ,
setMaxUncommittedOffsets(**2_000_000**) ,
setGroupId(groupName) ,
setProp("fetch.max.wait.ms",**1000**) ,
setProp("max.poll.records", **100**) ,
setMaxPartitionFectchBytes(**512**) ,
setProp("send.buffer.bytes", **512**) ,
setProp("receive.buffer.bytes", **512**) ,
setPartitionRefreshPeriodMs(30_000).setProp("enable.auto.commit", "true") ,
setProp("session.timeout.ms", "**60000**") ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(**50**) ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(**5**) , 1 ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(**1**) ) ;
Я не уверен, какие значения следует установить для TOPOLOGY_SPOUT_WAIT_STRATEGY и BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK
Итак, какая комбинация вышеупомянутых параметров и значений может помочь контролировать скорость приема излива?
Любое предложение будет высоко оценено.
Спасибо Каниска
1 ответ
TOPOLOGY_SPOUT_WAIT_STRATEGY используется только в том случае, если у spout запрашивается новый кортеж, и он ничего не излучает (т. Е. Если не было новых сообщений). Это не должно иметь никакого влияния на противодавление.
Я не слишком знаком с текущей реализацией противодавления, но я уверен, что вам нужно явно включить ее с помощью TOPOLOGY_BACKPRESSURE_ENABLE.
BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK - это отношение, поэтому, если вы установите его, например, 0,9, оно будет душить носик, когда входная очередь болта заполнена на 90%. Документацию по настройкам можно найти в https://github.com/apache/storm/blob/1.1.x-branch/storm-core/src/jvm/org/apache/storm/Config.java, а также по умолчанию. по адресу https://github.com/apache/storm/blob/1.1.x-branch/conf/defaults.yaml
Чтобы избежать слишком большого количества отправляемых кортежей за раз, я думаю, что вы должны просто установить topology.max.spout.pending с некоторым разумным количеством кортежей (может быть, несколько сотен?). Убедитесь, что ваша топология настроена на активацию взлома (т. Е. Установите для topology.enable.message.timeouts значение true). В противном случае ожидание максимального расхода не оказывает влияния.
Не уверен, почему вы меняете размеры буфера исполнителя.
Вам также следует рассмотреть возможность обновления Storm и storm-kafka-client как минимум до версии 1.1.2. В последнее время было внесено множество исправлений для storm-kafka-client, и вам, возможно, будет проще, если вы обновитесь.
Я не уверен, что означают звезды в вашем коде?