Можно ли записать в динамически созданный индекс Elasticsearch с отформатированной датой использование asticsearch-hadoop/spark?
В рамках отдельной искры я пытаюсь записать данные из фрейма в Elasticsearch. Хотя я могу заставить это работать, я не могу понять, как записать в динамически именованный индекс, отформатированный как "index_name-{ts_col:{YYYY-mm-dd}}", где "ts_col" является поле datetime в наборе данных.
Я видел все виды сообщений о том, что этот тип синтаксиса должен работать, но когда я пытаюсь это сделать, я получаю ошибки, включенные внизу. Кажется, сначала проверяется, существует ли индекс перед его созданием, но ему передается неформатированное имя индекса, а не динамически созданное. Сначала я попытался создать индекс с тем же синтаксисом, используя модуль pythonasticsearch, но он не может обрабатывать имена динамических индексов.
Есть ли какое-либо решение, доступное мне, или мне нужно циклически проходить через мой набор данных в пределах spark, чтобы найти каждую из представленных дат, создать нужные мне индексы и затем записать в каждый индекс по одному за раз? Я что-то упускаю из виду? Logstash делает это легко, я не понимаю, почему я не могу заставить его работать в Spark.
Вот команда write, которую я использую (тоже пробовал разные варианты):
df.write.format("org.elasticsearch.spark.sql")
.option('es.index.auto.create', 'true')
.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
.option('es.mapping.id', 'es_id')
.save()
Вот банка, которую я использую:
elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar
Вот ошибка, которую я получаю, когда использую команду записи выше:
ОШИБКА NetworkClient: Node [##.##.##.##:9200] не удалось (недопустимый целевой заголовок URI @null/index_name-{ts_col:{YYYY.mm.dd}}/type_name); выбран следующий узел [##. ##. ##. ##: 9200]
...
...
Py4JJavaError: Произошла ошибка при вызове o114.save.: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: ошибка соединения (проверьте настройки сети и / или прокси-сервера)- все узлы вышли из строя;
И если я установлю перезапись на True, я получу:
Py4JJavaError: Произошла ошибка при вызове o58.save.: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: такой индекс не имеет значения null в org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) в org.elasticsearch.hadoop.rest.RestClient.lientjj (:446) в org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) в org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) в org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92) в org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) в org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) в org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:94) в org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:442) в org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) в org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) в sun.reflect.NativeMethodAccessorImpl.invoke0(собственный метод) в su n.reflect..MethodInvoker.invoke(MethodInvoker.java:237) в py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) в py4j.Gateway.invoke(Gateway.java:280) в py4j.commands.AbstractCommand.intoke. Java:132) в py4j.commands.CallCommand.execute(CallCommand.java:79) в py4j.GatewayConnection.run(GatewayConnection.java:214) в java.lang.Thread.run(Thread.java:745)
И если я пытаюсь использовать Python-клиент Elasticsearch для создания индекса заранее, я получаю:
RequestError: TransportError (400, u'invalid_index_name_exception ', u'Неверное имя индекса [index_name-{ts_col:YYYY.MM.dd}], должно быть в нижнем регистре')
1 ответ
Вам не нужно снова помещать формат даты в фигурные скобки. Вы можете прочитать об этом больше здесь
.option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
Измените выше, как показано ниже:
.option('es.resource', 'index_name-{ts_col:YYYY.mm.dd}/type_name')
Примечание: убедитесь, что ваш ts_col
поле имеет правильный формат даты.