Проблемы с производительностью: Kafka + Storm + Trident + OpaqueTridentKafkaSpout

Мы наблюдаем некоторые проблемы с производительностью с Kafka + Storm + Trident + OpaqueTridentKafkaSpout

Упомянутые ниже детали нашей настройки:

Топология шторма:

Broker broker = Broker.fromString("localhost:9092")
    GlobalPartitionInformation info = new GlobalPartitionInformation()
    if(args[4]){
        int partitionCount = args[4].toInteger()
        for(int i =0;i<partitionCount;i++){
            info.addPartition(i, broker)
        }
    }
    StaticHosts hosts = new StaticHosts(info)
    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())


    OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig)
    TridentTopology topology = new TridentTopology()
    Stream st  = topology.newStream("spout1", kafkaSpout).parallelismHint(args[2].toInteger())
            .each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new Fields("status"))
            .parallelismHint(args[1].toInteger())
    Map conf = new HashMap()
    conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
    conf.put(Config.TOPOLOGY_DEBUG, false)

    if (args[0] == "local") {
        LocalCluster cluster = new LocalCluster()
        cluster.submitTopology("mytopology", conf, topology.build())
    } else {
        StormSubmitter.submitTopology("mytopology", conf, topology.build())
        NEO4JTridentFunction.getGraphDatabaseService().shutdown()
    }

Storm.yaml, который мы используем для Storm, выглядит следующим образом:

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "localhost"
#     - "server2"
# 
storm.zookeeper.port : 2999


storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"

nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
    -XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
    -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
    -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
    -server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
    -Xloggc:logs/gc-worker-%ID%.log -verbose:gc
    -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
    -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
    -XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
    -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"

java.library.path: "/usr/lib/jvm/jdk1.7.0_25"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
  • Размер каждого сообщения, создаваемого в Kafka: 11 КБ
  • Время выполнения каждого болта (NEO4JTridentFunction) для обработки данных: 500 мс
  • Количество работников шторма: 1
  • Подсказка параллелизма для Spout(OpaqueTridentKafkaSpout): 1
  • Подсказка параллельности для болта / функции (NEO4JTridentFunction): 50

  • Мы видим пропускную способность около 12 мсг / сек из Spout.

  • Скорость сообщений, производимых в Kafka: 150msgs/sec

И Storm, и Kafka - это развертывание с одним узлом. Мы читали о гораздо более высокой пропускной способности от Storm, но не можем произвести то же самое. Подскажите, пожалуйста, как настроить конфигурацию Storm+ Kafka + OpaqueTridentKafkaSpout для достижения более высокой пропускной способности. Любая помощь в этом отношении поможет нам безмерно.

Спасибо,

3 ответа

Вы должны установить параллелизм носика так же, как и количество разделов для указанных тем. По умолчанию, трезубец принимает одну партию для каждого выполнения, вы должны увеличить этот счет, изменив topology.max.spout.pending имущество. Поскольку Trident форсирует упорядоченное управление транзакциями, ваш метод выполнения (NEO4JTridentFunction) должен быть быстрым, чтобы достичь желаемого решения.

Кроме того, вы можете играть с "tridentConfig.fetchSizeBytes"изменив его, вы можете получать больше данных для каждого нового вызова emit в своем изливе.

Также вы должны проверить свой журнал сбора мусора, он даст вам подсказку о реальной точке зрения.

Вы можете включить журнал сбора мусора, добавив "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:{path}/gc-storm-worker-%ID%.log", в настройках worker.childopts в вашем рабочем конфиге.

И последнее, но не менее важное: вы можете использовать G1GC, если коэффициент молодости выше нормы.

Мои расчеты: если 8 ядер и 500 мс на болт -> ~16 сообщений / секесли вы оптимизируете болт, то вы увидите улучшения.

также для болтов, связанных с процессором, попробуйте Parallelism hint = 'totals cores' и увеличьте topology.trident.batch.emit.interval.millis на количество времени, необходимое для обработки всего пакета, деленное на 2. set topology.max.spout.eding к 1.

Пожалуйста, установите ваш worker.childopts в зависимости от конфигурации вашей системы. Используйте SpoutConfig.fetchSizeBytes, чтобы увеличить количество байтов, загружаемых в топологию. Увеличьте свой намек на параллельность.

Другие вопросы по тегам