Как выполнить запрос к Elasticsearch, используя PySpark, не запрашивая каждый узел?
Моя конечная цель - использовать PySpark для эффективной индексации большого объема данных в Elasticsearch (ES), а затем выполнить огромное количество запросов к индексу и записать статистику по результатам.
Elasticsearch version 5.6.5
Spark version 2.4.0
Hadoop version 2.7
Elasticsearch-Hadoop python library version: 6.6.0
Рассмотрим следующий код:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
# create our Spark Context
sc_conf = SparkConf().setAll((
("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")
sc = SparkContext(conf=sc_conf)
sqlContext = SQLContext(sc)
q ="""{
"query": {
"match_all": {}
}
}"""
es_live_conf["es.query"] = q
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_live_conf)
sqlContext.createDataFrame(es_rdd).limit(1).collect()
Я просто пытаюсь выполнить сопоставление всех запросов с индексом, и мне нужен только лучший результат. Я попытался выразить ограничение в запросе ES, но, очевидно, Spark игнорирует это, поэтому вместо этого я выразил его с помощью фильтра фрейма данных.
Я настроил Spark следующим образом:
es_live_conf = {
# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'xxx.xxxx.com',
# specify the port in case it is not the default port
"es.port" : ES_PORT,
# specify a resource in the form 'index/doc-type'
"es.resource" : 'xxxxxx/document',
"es.net.http.auth.user" : ES_USERNAME,
"es.net.http.auth.pass" : ES_PASSWORD,
"es.net.ssl":"true",
"es.net.ssl.cert.allow.self.signed": "true",
"es.nodes.discovery": "false",
"es.nodes.wan.only": "true",
"es.index.read.missing.as.empty": "true",
}
Я получаю доступ к кластеру ES за VPC, поэтому у меня есть доступ только к клиентским узлам и ни к одному из внутренних узлов данных и т. Д. Вот почему wan.only
установлено в true.
При такой настройке Spark, по-видимому, запрашивает каждый отдельный узел с полным соответствием, а затем в итоге объединяется до единственного результата, который я на самом деле хочу. Он невероятно медленный (50 осколков, 30 миллионов документов) и полностью исключает способность ES эффективно сокращать результаты для каждого узла. Даже если я изменю запрос на поиск по отдельному идентификатору документа, он запускает запрос для каждого отдельного сегмента через главный узел, указывая конкретный идентификатор сегмента при каждом вызове. Я пытался установить es.nodes.client.only
правда, но это жалуется, что настройка конфликтует с wan.only
, Если я включу client.only
и отключить wan.only
Я больше не могу подключиться к кластеру, потому что он пытается напрямую подключиться к каждому узлу, который недоступен.
Что я здесь не так делаю? Как использовать PySpark для запуска запроса к ES один раз, а не один раз для каждого шарда. Далее, как я могу использовать такие вещи, как from
, size
а также rescore
в моих запросах, если PySpark пытается выполнить полный запрос для каждого сегмента в любом случае, а затем, по-видимому, после обработки результатов?
0 ответов
Я не мог найти способ решить эту проблему с помощью библиотеки ES Hadoop. Похоже, что он больше подходит для использования Spark, когда вам нужно выполнить очень длинный и очень сложный шаг сокращения по результатам, возвращаемым из одного запроса Elasticsearch, а не выполнять миллионы быстрых запросов ES и агрегировать результаты. Чтобы решить эту проблему, я использовал этот плагин: https://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest
Я развил его дальше, чтобы каждое ядро могло использовать несколько потоков для параллельного выполнения еще большего количества запросов. Он не только позволяет выполнять DDOS-атаки для вашего кластера ES, но и для любой спокойной конечной точки для любой платформы, которая может потребоваться и агрегировать большой объем запросов.
Если я все проясню, я опубликую многопоточную версию, которую я создал, также на github.