Как мы можем запустить несколько потребителей кафки через командную строку?
Я тестирую производительность kafka с помощью сценария оболочки, который они уже предоставили в пакете kafka. Я создал тему с 10 разделами и перекачкой данных, как показано ниже:
./bin/kafka-producer-perf-test.sh --topic test-topic --num-records 9000000 --record-size 300 --throughput 250000 --producer-props bootstrap.servers=110.17.14.302:9092 acks=1 max.in.flight.requests.per.connection=1 batch.size=5000
Теперь я хочу использовать данные, которые я выкачиваю, как показано выше, от нескольких потребителей, а не только от одного. Итак, я начал использовать kafka-consumer-perf-test.sh
, Это то, что я делал:
./bin/kafka-consumer-perf-test.sh --zookeeper localhost:2181 --topic test-topic --group test1
Есть ли способ, с помощью которого мы можем запустить несколько потребителей kafka в одной группе потребителей через командную строку, и каждый из этих потребителей, работающих на разных разделах, использует kafka-consumer-perf-test.sh
? Я работаю с кафкой версией 0.10.1.0
Я видел этот пост, но там не сказано, где настраивать, сколько потребителей мы хотим запустить и на каком разделе они будут работать?
Обновить:
Это ошибка, которую я увидел:
./bin/kafka-consumer-perf-test.sh --zookeeper 110.27.14.10:2181 --messages 50 --topic test-topic --threads 1
[2017-01-11 22:34:09,785] WARN [ConsumerFetcherThread-perf-consumer-14195_kafka-cluster-3098529006-zeidk-1484174043509-46a51434-2-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@54fb48b6 (kafka.consumer.ConsumerFetcherThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:109)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
1 ответ
Просто запустите ту же команду (т.е. ./bin/kafka-consumer-perf-test.sh
) несколько раз в разных консолях.
О назначении раздела: Кафка сделает это автоматически для вас. Если вы используете группы потребителей.
Если вы хотите назначить разделы вручную, вы не можете использовать группы потребителей. Для этого вы не можете использовать kafka-consumer-perf-test.sh, но вам нужно написать свой собственный.
Прочитайте JavaDoc здесь: https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html