Индексирование на Amazon Elasticsearch Service - массовая вставка

У меня есть экземпляр Amazon Elasticsearch, который активен, и я могу подключаться и выполнять операторы через "Sense" из Chrome. Но когда я пытаюсь выполнить массовую вставку, появляется ошибка "timeout". Я пробовал и Python (массовый помощник), и модуль logstash, получая одинаковую ошибку в обоих направлениях.

Ниже приведен код

import psycopg2
from elasticsearch import Elasticsearch, helpers
import time

connection = psycopg2.connect(database='dbname', user='username', password='password', host='abc.def.com', port=5432)
es = Elasticsearch('elasticsearchinstance.amazonaws.com', max_retries=3, retry_on_timeout=True, request_timeout='10m')
cursor = connection.cursor()

query = """
select column1,column2,column3 from table
"""
cursor.execute(query)
rows = cursor.fetchall()
dict_list = []
for i in range(len(rows)):
    dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

print len(dict_list)

es.indices.delete(index='es_index', ignore=[400, 404])

time.sleep(2)

mapping = "{\"settings\" : {\"analysis\" : { \"analyzer\" : { \"my_ngram_analyzer\" : { \"tokenizer\" : \"my_ngram_tokenizer\" }},\"tokenizer\" : {\"my_ngram_tokenizer\" : {\"type\" : \"nGram\" , \"min_gram\" : \"2\" , \"max_gram\" : \"50\" }}}}, \"mappings\": { \"doc\": { \"_id\" : { \"path\" : \"id\" }, \"properties\": { \"column2\": { \"type\": \"string\", \"analyzer\": \"my_ngram_analyzer\" }, \"id\": { \"type\": \"long\" }, \"column3\": { \"type\": \"integer\" }}}}}"
es.indices.create(index='es_index', ignore=400, body=mapping)

helpers.bulk(es, dict_list)

Ошибка, полученная через Python Bulk helper, выглядит следующим образом

Traceback (most recent call last):
File "D:\Python\refresh_data.py", line 21, in <module>
es.indices.delete(index='es_index', ignore=[400, 404])
File "C:\Python27\lib\site-packages\elasticsearch\client\utils.py", line 69, in _wrapped
return func(*args, params=params, **kwargs)
File "C:\Python27\lib\site-packages\elasticsearch\client\indices.py", line 198, in delete
params=params)
File "C:\Python27\lib\site-packages\elasticsearch\transport.py", line 307, in perform_request
status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
File "C:\Python27\lib\site-packages\elasticsearch\connection\http_urllib3.py", line 89, in perform_request
raise ConnectionError('N/A', str(e), e)

elasticsearch.exceptions.ConnectionError:
ConnectionError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)')) 
caused by:
ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)'))

Аналогичная ошибка тайм-аута с Logstash (для массовой вставки), а также (будет редактировать и обновлять ошибку logstash при необходимости).

Требуется помощь для решения этой проблемы с сервисом Amazon Elasticsearch.

Заранее спасибо.

Редактировать:

Вот ошибка, которую я получаю с Logstash, когда я выполняю массовую вставку в Amazon ES

C:\logstash-1.5.4\bin>logstash agent -f feed_load_amazon_es.conf
io/console not supported; tty will not be manipulated
←[31mFailed to install template: connect timed out {:level=>:error}←[0m
Logstash startup completed
←[31mGot error to send bulk of actions: connect timed out {:level=>:error}←[0m
←[33mFailed to flush outgoing items {:outgoing_count=>3, :exception=>"Manticore::ConnectTimeout", 
:backtrace=>["C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:35:in `initialize'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:70:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:245:in `call_once'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:148:in `code'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:71:in `perform_request'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/base.rb:190:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:54:in `perform_request'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/client.rb:119:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-api-1.0.12/lib/elasticsearch/api/actions/bulk.rb:80:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch/protocol.rb:104:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:542:in `submit'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:566:in `flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:219:in `buffer_flush'", 
"org/jruby/RubyHash.java:1341:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:216:in `buffer_flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:600:in `teardown'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:248:in `outputworker'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:247:in `outputworker'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:166:in `start_outputs'"], :level=>:warn}←[0m

1 ответ

Вы делаете это неправильно, я думаю.

Массовый запрос представляет собой комбинацию из 2 строк в поле "body" метода объемного запроса.

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }

Вот что вы должны иметь в поле своего тела.

Первая строка содержит тип запроса, индекс, по которому вы набираете объем, и множество других параметров, которые вы можете устанавливать или не устанавливать (см. Документацию). Добавьте \ r \ n в конце первой строки.

Вторая строка должна содержать то, что вы пытаетесь вставить.

Если вы проверите, что вы помещаете в dict_list, вы забыли вызов метода index.

Неправильная структура:

dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

Правильная структура:

{ "index" : {'_type':'doc', '_index':'es_index', '_id':rows[i][0]} }

А затем добавьте свой документ во второй строке.

Другие вопросы по тегам