Массовая вставка Elasticsearch с NEST возвращает es_rejected_execution_exception
Я пытаюсь сделать массовую вставку, используя .Net API
в Elasticsearch
и это ошибка, которую я получаю при выполнении операции;
Error {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService$6@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""} Nest.BulkError
Это связано с нехваткой места в моей системе или сама функция массовой вставки не работает? мой NEST
версия 5.0
а также Elasticsearch
версия также 5.0
,
Код логики массовой вставки;
public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
BulkDescriptor descriptor = new BulkDescriptor();
foreach (var j in Enumerable.Range(0, recordList.Count)) {
descriptor.Index<BaseData>(op => op.Document(recordList[j])
.Index(listOfIndexName[j]));
}
var result = clientConnection.Bulk(descriptor);
}
1 ответ
Как сказал Вэл в комментариях, вы, вероятно, отправляете за один раз больше данных, чем может обработать ваш кластер. Похоже, вы пытаетесь отправить все свои документы в одном массовом запросе, который для многих документов или больших документов может не сработать.
С _bulk
вам необходимо отправить данные в кластер в виде нескольких массовых запросов и найти оптимальное количество документов, которые вы можете отправить в каждом массовом запросе, в дополнение к количеству массовых запросов, которые вы можете отправлять одновременно в ваш кластер.
Здесь нет жестких и быстрых правил для оптимального размера, потому что он может варьироваться в зависимости от сложности ваших документов, того, как они анализируются, аппаратного обеспечения кластера, настроек кластера, настроек индекса и т. Д.
Лучше всего начать с разумного числа, скажем, 500 документов (или некоторого числа, которое имеет смысл в вашем контексте) в одном запросе, а затем перейти оттуда. Вычисление общего размера в байтах каждого массового запроса также является хорошим подходом. Если производительность и пропускная способность недостаточны, увеличьте количество документов, размер запроса в байтах или одновременных запросов, пока не начнете видеть es_rejected_execution_exception
,
NEST 5.x поставляется с удобным помощником для упрощения массовых запросов, используя IObservable<T>
и наблюдаемый шаблон дизайна
void Main()
{
var client = new ElasticClient();
// can cancel the operation by calling .Cancel() on this
var cancellationTokenSource = new CancellationTokenSource();
// set up the bulk all observable
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
// number of concurrent requests
.MaxDegreeOfParallelism(8)
// in case of 429 response, how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(5))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(2)
// number of documents to send in each request
.Size(500)
.Index("index-name")
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
var waitHandle = new ManualResetEvent(false);
Exception ex = null;
// what to do on each call, when an exception is thrown, and
// when the bulk all completes
var bulkAllObserver = new BulkAllObserver(
onNext: bulkAllResponse =>
{
// do something after each bulk request
},
onError: exception =>
{
// do something with exception thrown
ex = exception;
waitHandle.Set();
},
onCompleted: () =>
{
// do something when all bulk operations complete
waitHandle.Set();
});
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for handle to be set.
waitHandle.WaitOne();
if (ex != null)
{
throw ex;
}
}
// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
return Enumerable.Range(1, 10000).Select(x =>
new Document
{
Id = x,
Name = $"Document {x}"
}
);
}
public class Document
{
public int Id { get; set; }
public string Name { get; set; }
}