pyspark - ошибка записи dstream в asticsearch

У меня проблема с индексацией данных от потоковой передачи искры (pyspark) до упругого поиска. данные имеют тип dstream, Ниже как это выглядит

(u'01B', 0)
(u'1A5', 1)
....

Вот эластичный индекс, который я использую: индекс = кластер и тип = данные

GET /clus/_mapping/data
{
   "clus": {
      "mappings": {
         "data": {
            "properties": {
               "content": {
                  "type": "text"
               }
            }
         }
      }
   }
}

Вот мой код:

ES_HOST = {
    "host" : "localhost", 
    "port" : 9200
}

INDEX_NAME = 'clus'
TYPE_NAME = 'data'
ID_FIELD = 'responseID' 

# create ES client
es = Elasticsearch(hosts = [ES_HOST])

# some config before sending to elastic     
if not es.indices.exists(INDEX_NAME):
    request_body = {
        "settings" : {
            "number_of_shards": 1,
            "number_of_replicas": 0
        }
    }
    res = es.indices.create(index = INDEX_NAME, body = request_body)
es_write_conf = {
        "es.nodes": "localhost",
        "es.port": "9200",
        "es.resource": INDEX_NAME+"/"+TYPE_NAME
    }
sc = SparkContext(appName="PythonStreamingKafka")
    ssc = StreamingContext(sc, 30)

# .....
#loading data to put in elastic : lines4

    lines4.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_write_conf))




    ssc.start()
    ssc.awaitTermination()

Вот ошибка:

17/07/25 15:31:31 ОШИБКА Исполнитель: Исключение в задаче 2.0 на этапе 11.0 (TID 23) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Обнаружена неисправимая ошибка [127.0.0.1:9200] возвратила неверный запрос (400) - не удалось разобрать; Выйти.. в org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) в org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) в org.elasticsearch.hadoop..RestRepository.tryFlush(RestRepository.java:220) в org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) в org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) в org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) в org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) в org.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) в org.apache.spark.util.Utils$.tryWithSAfendFile. в org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) в org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) в org.apache.sparkTashler (12)..scala:89) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) в java.lang.Thread.run(Thread.java:748) 17/07/25 15:31:31 Исполнитель ОШИБКИ: исключение в задании 0.0 на этапе 11.0 (TID 21) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Обнаружена неисправимая ошибка [127.0.0.1:9200] возвратила неверный запрос (400) - не удалось проанализировать; Выйти.. в org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) в org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) в org.elasticsearch.hadoop..RestRepository.tryFlush(RestRepository.java:220) в org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) в org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) в org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) в org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) в org.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) в org.apache.spark.util.Utils$.tryWithSAfendFile. в org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) в org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) в org.apache.sparkTashler (12)..scala:89) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) в java.lang.Thread.run(Thread.java:748) 17/07/25 15:31:31 Исполнитель ОШИБКИ: исключение в задании 1.0 на этапе 11.0 (TID 22) org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Обнаружена неисправимая ошибка [127.0.0.1:9200] возвратила неверный запрос (400) - не удалось проанализировать; Выйти.. в org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:251) в org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:203) в org.elasticsearch.hadoop..RestRepository.tryFlush(RestRepository.java:220) в org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242) в org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:267) в org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.doClose(EsOutputFormat.java:214) в org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.close(EsOutputFormat.java:196) в org.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1119) в org.apache.spark.util.Utils$.tryWithSAfendFile. в org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) в org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) в org.apache.sparkTashler (12)..scala:89) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:748)

1 ответ

Похоже, что при создании индекса произошла ошибка. Вам необходимо отправить mappingв body запроса при создании индекса. Вот рабочий пример:

from elasticsearch import Elasticsearch 

es = Elasticsearch(["http://localhost:9200"])
# create index 
index_name = "clus" 
index_mapping = {
   "clus": {
      "mappings": {
         "data": {
            "properties": {
               "content": {
                  "type": "text"
               }
            }
         }
      }
   }
}


if not es.indices.exists(index_name):
        res = es.indices.create(index=index_name, body=index_mapping)
        print res

Вы должны получить это {u'acknowledged': True} в качестве ответа, чтобы подтвердить, что ваш индекс был создан.

Затем вы перебираете свой поток данных с помощью foreachRDD и применяете функцию, которая преобразует данные в структуру json. {"content": str((u'1A5', 1))}и индексировать его следующим образом

doc = {"content": str((u'1A5', 1))}
res = es.index(index="clus", doc_type='data', body=doc)

Как примечание, не рекомендуется индексировать данные в виде списка (u'1A5', 1) вам будет трудно использовать его в другом контексте, например, для визуализации в кибане.

Другие вопросы по тегам