StormCrawler: лучшая топология для кластера

Я использую stormcrawler для сканирования 40k сайтов, с max_depth=2, и я хочу сделать это как можно быстрее. У меня 5 штормовых узлов (с разными статическими ips) и 3 упругих узла. На данный момент моя лучшая топология:

spouts:
  - id: "spout"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.CollapsingSpout"
    parallelism: 10

bolts:
  - id: "partitioner"
    className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
    parallelism: 1
  - id: "fetcher"
    className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
    parallelism: 5
  - id: "sitemap"
    className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
    parallelism: 5
  - id: "parse"
    className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
    parallelism: 100
  - id: "index"
    className: "com.digitalpebble.stormcrawler.elasticsearch.bolt.IndexerBolt"
    parallelism: 25
  - id: "status"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
    parallelism: 25
  - id: "status_metrics"
    className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
    parallelism: 5

и конфиг сканера:

config: 
  topology.workers: 5
  topology.message.timeout.secs: 300
  topology.max.spout.pending: 250
  topology.debug: false
  fetcher.threads.number: 500
  worker.heap.memory.mb: 4096

Вопросы: 1) В чем разница между AggreationsSpout или CollapsingSpout? Я попробовал AggregationSpout, но производительность была равна производительности 1 машины с конфигурацией по умолчанию.

2) Правильно ли это сочетание параллелизма?

3) Я обнаружил, что "ОШИБКА ВЫБОРА" увеличилась на ~20%, и многие сайты не были выбраны правильно, когда я перешел с 1 узла на 5 узлов конфигурации. Что может быть причиной?

ОБНОВИТЬ:

эс-conf.yaml:

# configuration for Elasticsearch resources

config:
  # ES indexer bolt
  # adresses can be specified as a full URL
  # if not we assume that the protocol is http and the port 9200
  es.indexer.addresses: "1.1.1.1"
  es.indexer.index.name: "index"
  es.indexer.doc.type: "doc"
  es.indexer.create: false
  es.indexer.settings:
    cluster.name: "webcrawler-cluster"

  # ES metricsConsumer
  es.metrics.addresses: "http://1.1.1.1:9200"
  es.metrics.index.name: "metrics"
  es.metrics.doc.type: "datapoint"
  es.metrics.settings:
    cluster.name: "webcrawler-cluster"

  # ES spout and persistence bolt
  es.status.addresses: "http://1.1.1.1:9200"
  es.status.index.name: "status"
  es.status.doc.type: "status"
  #es.status.user: "USERNAME"
  #es.status.password: "PASSWORD"
  # the routing is done on the value of 'partition.url.mode'
  es.status.routing: true
  # stores the value used for the routing as a separate field
  # needed by the spout implementations
  es.status.routing.fieldname: "metadata.hostname"
  es.status.bulkActions: 500
  es.status.flushInterval: "5s"
  es.status.concurrentRequests: 1
  es.status.settings:
    cluster.name: "webcrawler-cluster"

  ################
  # spout config #
  ################

  # positive or negative filter parsable by the Lucene Query Parser
  # es.status.filterQuery: "-(metadata.hostname:stormcrawler.net)"

  # time in secs for which the URLs will be considered for fetching after a ack of fail
  es.status.ttl.purgatory: 30

  # Min time (in msecs) to allow between 2 successive queries to ES
  es.status.min.delay.queries: 2000

  es.status.max.buckets: 50
  es.status.max.urls.per.bucket: 2
  # field to group the URLs into buckets
  es.status.bucket.field: "metadata.hostname"
  # field to sort the URLs within a bucket
  es.status.bucket.sort.field: "nextFetchDate"
  # field to sort the buckets
  es.status.global.sort.field: "nextFetchDate"

  # Delay since previous query date (in secs) after which the nextFetchDate value will be reset
  es.status.reset.fetchdate.after: -1

  # CollapsingSpout : limits the deep paging by resetting the start offset for the ES query 
  es.status.max.start.offset: 500

  # AggregationSpout : sampling improves the performance on large crawls
  es.status.sample: false

  # AggregationSpout (expert): adds this value in mins to the latest date returned in the results and
  # use it as nextFetchDate
  es.status.recentDate.increase: -1
  es.status.recentDate.min.gap: -1

  topology.metrics.consumer.register:
       - class: "com.digitalpebble.stormcrawler.elasticsearch.metrics.MetricsConsumer"
         parallelism.hint: 1
         #whitelist:
         #  - "fetcher_counter"
         #  - "fetcher_average.bytes_fetched"
         #blacklist:
         #  - "__receive.*"

1 ответ

Решение

1) В чем разница между AggreationsSpout или CollapsingSpout? Я попробовал AggregationSpout, но производительность была равна производительности 1 машины с конфигурацией по умолчанию.

Как следует из названия, AggregationSpout использует агрегаты в качестве механизма группировки URL-адресов по хосту (или домену или IP-адресу или какому-либо другому), тогда как CollapsingSpout использует свертывание. Последнее, вероятно, будет медленнее, если вы настроите его так, чтобы оно имело более 1 URL на одну корзину (es.status.max.urls.per.bucket), так как оно выдает подзапросы для каждой корзины. AggregationSpout должен иметь хорошую производительность, особенно если для es.status.sample задано значение true. CollapsingSpouts являются экспериментальными на данном этапе.

2) Правильна ли эта конфигурация параллелизма?

Это, вероятно, больше JSoupParserBolts, чем необходимо. На практике соотношение 1:4 по сравнению с Fetcherbolts хорошо даже с 500 нитями извлечения. Интерфейс Storm полезен для выявления узких мест и компонентов, которые необходимо масштабировать. Все остальное выглядит хорошо, но практически вы должны взглянуть на пользовательский интерфейс Storm и метрики, чтобы настроить топологию в соответствии с наилучшими настройками сканирования.

3) Я обнаружил, что "ОШИБКА ВЫБОРА" увеличилась на ~20%, и многие сайты не были выбраны правильно, когда я перешел с 1 узла на 5 узлов конфигурации. Что может быть причиной?

Это может указывать на то, что вы насыщаете свое сетевое соединение, но это не должно иметь место при использовании большего количества узлов, напротив. Возможно, проверьте с помощью Storm UI, как FetcherBolts распределяется по узлам. Один рабочий работает на всех экземплярах или все они получают одинаковое число? Посмотрите журналы, чтобы увидеть, что происходит, например, существует ли множество исключений тайм-аута?

Другие вопросы по тегам