ElasticSearch - требуется больше времени (8 часов) для индексации 23 ГБ PDF (20 000 документов) с использованием Java API
Индексирую PDF используя Java API. Я установил ingest-attachement processor plugin
и из моего кода Java я конвертирую PDF в base64 и индексирую закодированный формат PDF.
На самом деле, файлы PDF доступны на моей машине d: \ drive. Путь к файлу доступен в индексе ElasticSearch с именем как documents_local
, Итак, беру все записи из documents_local
Индекс и получение пути к файлу. Затем я читаю файл PDF и кодирую в base64. Затем их индексируют.
Для этого процесса я использую scrollRequest API для извлечения пути к файлу из индекса, потому что у меня больше 100k
документы. Итак, для индексации 20000
PDF это занимает 8 часов времени с приведенным ниже кодом Java.
Как я могу увеличить пропускную способность или ускорить этот процесс. Это занимает очень много времени для индексации 100k
документы.
public class DocumentIndex {
private final static String INDEX = "documents_local";
private final static String ATTACHMENT = "document_attachment";
private final static String TYPE = "doc";
private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());
private static final int BUFFER_SIZE = 3 * 1024;
public static void main(String args[]) throws IOException {
RestHighLevelClient restHighLevelClient = null;
RestHighLevelClient restHighLevelClient2 = null;
Document doc=new Document();
logger.info("Started Indexing the Document.....");
//Fetching Id, FilePath & FileName from Document Index.
SearchRequest searchRequest = new SearchRequest(INDEX);
searchRequest.types(TYPE);
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(60L)); //part of Scroll API
searchRequest.scroll(scroll); //part of Scroll API
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder qb = QueryBuilders.matchAllQuery();
searchSourceBuilder.query(qb);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = SearchEngineClient.getInstance3().search(searchRequest);
String scrollId = searchResponse.getScrollId(); //part of Scroll API
SearchHit[] searchHits = searchResponse.getHits().getHits();
//part of Scroll API -- Starts
while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = SearchEngineClient.getInstance3().searchScroll(scrollRequest);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
File all_files_path = new File("d:\\All_Files_Path.txt");
File available_files = new File("d:\\Available_Files.txt");
File missing_files = new File("d:\\Missing_Files.txt");
int totalFilePath=1;
int totalAvailableFile=1;
int missingFilecount=1;
Map<String, Object> jsonMap ;
for (SearchHit hit : searchHits) {
StringBuilder result = null;
String encodedfile = null;
File file=null;
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
if(sourceAsMap != null) {
doc.setId((int) sourceAsMap.get("id"));
doc.setApp_language(String.valueOf(sourceAsMap.get("app_language")));
}
String filepath=doc.getPath().concat(doc.getFilename());
file = new File(filepath);
if(file.exists() && !file.isDirectory()) {
//base64 conversion Starts
FileInputStream fileInputStreamReader2 = new FileInputStream(file);
try ( BufferedInputStream in = new BufferedInputStream(fileInputStreamReader2, BUFFER_SIZE); ) {
try(PrintWriter out = new PrintWriter(new FileOutputStream(available_files, true)) ){
out.println("Available File Count --->"+totalAvailableFile+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
totalAvailableFile++;
}
Base64.Encoder encoder = Base64.getEncoder();
result = new StringBuilder();
byte[] chunk = new byte[BUFFER_SIZE];
int len = 0;
while ( (len = in.read(chunk)) == BUFFER_SIZE ) {
result.append( encoder.encodeToString(chunk) );
}
if ( len > 0 ) {
chunk = Arrays.copyOf(chunk,len);
result.append( encoder.encodeToString(chunk) );
}
}
//base64 conversion Ends
}
jsonMap = new HashMap<>();
jsonMap.put("id", doc.getId());
jsonMap.put("app_language", doc.getApp_language());
jsonMap.put("fileContent", result);
String id=Long.toString(doc.getId());
IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id )
.source(jsonMap)
.setPipeline(ATTACHMENT);
PrintStream printStream = new PrintStream(new File("d:\\exception.txt"));
try {
IndexResponse response = SearchEngineClient.getInstance3().index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace(printStream);
}
totalFilePath++;
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = SearchEngineClient.getInstance3().clearScroll(clearScrollRequest);
boolean succeeded = clearScrollResponse.isSucceeded();
////part of Scroll API -- Ends
logger.info("Indexing done.....");
}
}