HTTP POST эластичный поток событий поиска

У меня есть программа node.js, которая использует потоки для чтения файла ( поток событий nodejs устанавливает переменную для каждого потока)

Я хотел бы использовать ту же программу для записи этих данных в эластичный поиск. Я написал небольшую функцию записи

var writeFunction = function(data) {
    //console.log(data);
    var client = request.newClient("http://localhost:9200");
    client.post('/newtest3/1',data,function(err,res,body) {
        return console.log(res.statusCode);
    });
};

и подключил это с потоковым

var processMyFile = function(file) {
    var stream = getStream(file);
    var nodeName = stream.nodeName;
    stream
        .pipe(es.split())
        .on('end',endFunction)
        .pipe(es.map(function(data,cb) {
            processFunction(nodeName,data,cb);
        }))
        .pipe(es.map(function(data,cb) {
            writeFunction(data);
        }));

}

Вышеприведенное работает асинхронно, как и ожидалось, и записывает данные, за исключением того, что это занимает много времени. Похоже, что оно также работает как буфер, поскольку запись занимает намного больше времени, чем чтение.(Преимущество использования канала) Я знаю, что есть основной интерфейс в упругом поиске, и я могу импортировать с помощью этого. Пример shakesphere.json в руководстве по началу работы в Kibana ( http://www.elasticsearch.org/guide/en/kibana/current/using-kibana-for-the-first-time.html)

Это означает, что мне нужно создать файл в формате, необходимом для массового импорта, а затем запустить программу curl и т. Д. Я бы хотел избежать создания временного файла.

Существует ли более простой способ быстрее импортировать данные в эластичный поиск в рамках процесса потоковой передачи

1 ответ

asticsearch-streams поможет вам использовать массовый интерфейс с потоковой передачей, без необходимости сначала записывать файл json.

Я считаю, что ваш код будет более или менее так:

var TransformToBulk = require('elasticsearch-streams').TransformToBulk
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var client = new require('elasticsearch').Client();

var bulkExec = function(bulkCmds, callback) {
  client.bulk({
    index : 'newtest3',
    type  : '1',
    body  : bulkCmds
  }, callback);
};

var ws = new WritableBulk(bulkExec);
var toBulk = new TransformToBulk(function getIndexTypeId(doc) { return { _id: doc.id }; });

var processMyFile = function(file) {
  var stream = getStream(file);

  stream
    .pipe(toBulk)
    .pipe(ws)
    .on('close', endFunction)
    .on('err', endFunction);
}
Другие вопросы по тегам