Как выполнить запрос к Elasticsearch, используя PySpark, не запрашивая каждый узел?

Моя конечная цель - использовать PySpark для эффективной индексации большого объема данных в Elasticsearch (ES), а затем выполнить огромное количество запросов к индексу и записать статистику по результатам.

Elasticsearch version 5.6.5
Spark version 2.4.0
Hadoop version 2.7
Elasticsearch-Hadoop python library version: 6.6.0

Рассмотрим следующий код:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

# create our Spark Context  
sc_conf = SparkConf().setAll((
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")

sc = SparkContext(conf=sc_conf)

sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_live_conf["es.query"] = q

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_live_conf)

sqlContext.createDataFrame(es_rdd).limit(1).collect()

Я просто пытаюсь выполнить сопоставление всех запросов с индексом, и мне нужен только лучший результат. Я попытался выразить ограничение в запросе ES, но, очевидно, Spark игнорирует это, поэтому вместо этого я выразил его с помощью фильтра фрейма данных.

Я настроил Spark следующим образом:

es_live_conf = {

# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'xxx.xxxx.com',

# specify the port in case it is not the default port
"es.port" : ES_PORT,

# specify a resource in the form 'index/doc-type'
"es.resource" : 'xxxxxx/document',

"es.net.http.auth.user" : ES_USERNAME,

"es.net.http.auth.pass" : ES_PASSWORD,

"es.net.ssl":"true",

"es.net.ssl.cert.allow.self.signed": "true",

"es.nodes.discovery": "false",

"es.nodes.wan.only": "true",

"es.index.read.missing.as.empty": "true",

}

Я получаю доступ к кластеру ES за VPC, поэтому у меня есть доступ только к клиентским узлам и ни к одному из внутренних узлов данных и т. Д. Вот почему wan.only установлено в true.

При такой настройке Spark, по-видимому, запрашивает каждый отдельный узел с полным соответствием, а затем в итоге объединяется до единственного результата, который я на самом деле хочу. Он невероятно медленный (50 осколков, 30 миллионов документов) и полностью исключает способность ES эффективно сокращать результаты для каждого узла. Даже если я изменю запрос на поиск по отдельному идентификатору документа, он запускает запрос для каждого отдельного сегмента через главный узел, указывая конкретный идентификатор сегмента при каждом вызове. Я пытался установить es.nodes.client.only правда, но это жалуется, что настройка конфликтует с wan.only, Если я включу client.only и отключить wan.only Я больше не могу подключиться к кластеру, потому что он пытается напрямую подключиться к каждому узлу, который недоступен.

Что я здесь не так делаю? Как использовать PySpark для запуска запроса к ES один раз, а не один раз для каждого шарда. Далее, как я могу использовать такие вещи, как from, size а также rescore в моих запросах, если PySpark пытается выполнить полный запрос для каждого сегмента в любом случае, а затем, по-видимому, после обработки результатов?

0 ответов

Я не мог найти способ решить эту проблему с помощью библиотеки ES Hadoop. Похоже, что он больше подходит для использования Spark, когда вам нужно выполнить очень длинный и очень сложный шаг сокращения по результатам, возвращаемым из одного запроса Elasticsearch, а не выполнять миллионы быстрых запросов ES и агрегировать результаты. Чтобы решить эту проблему, я использовал этот плагин: https://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest

Я развил его дальше, чтобы каждое ядро ​​могло использовать несколько потоков для параллельного выполнения еще большего количества запросов. Он не только позволяет выполнять DDOS-атаки для вашего кластера ES, но и для любой спокойной конечной точки для любой платформы, которая может потребоваться и агрегировать большой объем запросов.

Если я все проясню, я опубликую многопоточную версию, которую я создал, также на github.

Другие вопросы по тегам