Как использовать скрипты Elasticsearch для частичного обновления 500 миллионов документов как можно быстрее

Я веду индекс примерно с 500 миллионами документов. Среди прочего, каждый документ имеет строковое поле, которое содержит от 1 до 10 слов. Я хотел бы проанализировать это поле в каждом документе с точки зрения количества слов и сохранить результат в соответствующем документе в поле "wordCount".

Я знаю, что здесь есть функция partal_update: документация ES для Part_update.

Интересно, можно ли использовать сценарий part_update (возможно, с продвинутым скриптом Groovy), чтобы значительно увеличить скорость выполнения вышеуказанной задачи. И если да, может кто-нибудь подсказать как начать?

В настоящее время я использую приведенный ниже скрипт на python, но он чертовски медленный (с точки зрения больших данных, из-за большого количества сетевых обращений и размеров полезной нагрузки)

#!/usr/bin/env python
#-*- coding: utf-8 -*-

import elasticsearch
from elasticsearch import helpers
import pyes
from unidecode import unidecode
from datetime import datetime


def getKeywordLength(text):
    text = text.strip()
    return text.count(" ")+1

indices = ["corpus"]

uri2 = "%s:%d" % ("http://localhost", 9200)
connection2 = pyes.ES([uri2], timeout=2000000)
es = elasticsearch.Elasticsearch(timeout=2000000)

def start():
    elasticSearchIndexName = index

    ###build search query to iterate over all records
    squery ='{"sort": [{"timestampUpdated": {"order": "asc","ignore_unmapped": true}}],"query": {"filtered": {"query": {"bool": {"should": [{"query_string": {"query": "*"}}]}}}}}'

    ###fetch a scrolling handle over all records
    items = helpers.scan(es,query=squery.encode('utf8'),index=elasticSearchIndexName,scroll='360s', size='1000', timeout=2000000)

    ###iterate over all records
    for i in items:
        try:
            indexName = i["_index"]
            timestamp = datetime.now().isoformat()
            keyword = i["_source"]["keyword"]
            i["_source"]["keywordLength"] = getKeywordLength(keyword)
            i["_source"]["timestampUpdated"] =  timestamp
            result = connection2.index(i["_source"], indexName, "items", id=i['_id'])
            print result
        except:
            start()
            return
start()

2 ответа

Решение

То, что я обычно делаю, когда у меня достаточно данных для массового обновления миллионов документов и я не могу себе позволить использовать двустороннюю подписку, использует плагин обновления по запросу. Принцип очень прост, он позволяет вам выполнить запрос с DSL-запросом и на всех соответствующих документах запустить скрипт, чтобы делать все что угодно.

В вашем случае это будет выглядеть так:

curl -XPOST localhost:9200/corpus/update_by_query -d '{
    "query": {
        "match_all": {}
    }, 
    "script": "ctx._source.keywordLength = ctx._source.keyword.split(\" \").size() + 1; ctx._source.timestampUpdated = new Date().format(\"yyyy-MM-dd\");"
}'

Также обратите внимание, что для того, чтобы запустить это, вам нужно включить сценарии в вашем elasticsearch.yml файл:

# before ES 1.6
script.disable_dynamic: false

# since ES 1.6
script.inline: on

Я нашел только небольшую информацию о контексте, предоставленную скрипту Groovy, работающему в ElasticSearch.

Исходя из этого, вот Groovy-эквивалент установки / обновления двух полей:

ctx._source.keywordLength = ctx._source.keyword.split(' ').size()
ctx._source.timestampUpdated = new Date().format('yyyy-MM-dd')

Я не мог понять, как поиск и итерация вступают в игру.

Это также может помочь.

Другие вопросы по тегам