Ошибка при загрузке объемных данных в Elasticsearch
Я использую Elasticsearch в Python. У меня есть данные во фрейме pandas (3 столбца), затем я добавил два столбца _index и _type и преобразовал данные в json для каждой записи, используя встроенный метод pandas.
data = data.to_json(orient='records')
Это мои данные тогда,
[{"op_key":99140046678,"employee_key":991400459,"Revenue Results":6625.76480192,"_index":"revenueindex","_type":"revenuetype"},
{"op_key":99140045489,"employee_key":9914004258,"Revenue Results":6691.05435536,"_index":"revenueindex","_type":"revenuetype"},
......
}]
Мое отображение:
user_mapping = {
"settings" : {
"number_of_shards": 3,
"number_of_replicas": 2
},
'mappings': {
'revenuetype': {
'properties': {
'op_key':{'type':'string'},
'employee_key':{'type':'string'},
'Revenue Results':{'type':'float','index':'not_analyzed'},
}
}
}
}
Затем столкнулся с этой ошибкой при использовании helpers.bulk(es,data):
Traceback (most recent call last):
File "/Users/adaggula/Documents/workspace/ElSearchPython/sample.py", line 59, in <module>
res = helpers.bulk(client,data)
File "/Users/adaggula/workspace/python/pve/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 188, in bulk
for ok, item in streaming_bulk(client, actions, **kwargs):
File "/Users/adaggula/workspace/python/pve/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 160, in streaming_bulk
for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
File "/Users/adaggula/workspace/python/pve/lib/python2.7/site-packages/elasticsearch/helpers/__init__.py", line 89, in _process_bulk_chunk
raise e
elasticsearch.exceptions.RequestError: TransportError(400, u'action_request_validation_exception', u'Validation Failed: 1: index is
missing;2: type is missing;3: index is missing;4: type is missing;5: index is
missing;6: ....... type is missing;999: index is missing;1000: type is missing;')
Похоже, что для каждого объекта json отсутствуют индекс и тип. Как это побороть?
1 ответ
Pandas Data frame to json - это уловка, которая разрешила проблему.
data = data.to_json(orient='records')
data= json.loads(data)