Передача большого набора данных обратно в процессор ExecuteStreamCommand
В настоящее время я анализирую большой файл данных COBOL, используя библиотеку JRecord на Java, и конвертирую его в JSON. У меня есть функция, написанная на Java, которая выполняется ExecuteCommandProcessor в NIFI для возврата объекта JSON. Поскольку файл данных COBOL состоит из более чем 1 миллиона записей, я хотел бы обрабатывать их пакетно и передавать данные порциями обратно в файл потока ExecuteStreamCommand. Я попробовал метод OutPutStream.flush для возврата частичного набора результатов после итерации по 10000 записей, но результаты не возвращаются в файл потока NIFI. Ниже приведен пример кода. Какие изменения мне нужно внести для потоковой передачи данных обратно в файл потока ExecuteStreamCommand порциями по 10000 записей за раз.
public static void main(String[] args)
{
AbstractLine line;
int lineNum = 0;
String currentStatNum = null;
String currentSeqNum = null;
JSONObject result = new JSONObject();
JSONArray dafieldArray = new JSONArray();
JSONArray dafldspcArray = new JSONArray();
try {
ICobolIOBuilder iob = JRecordInterface1.COBOL
.newIOBuilder(copyBookFile)
//.setFont("") // Think about specifying an encoding !!!
.setFileOrganization(Constants.IO_BIN_TEXT)
.setSplitCopybook(CopybookLoader.SPLIT_01_LEVEL)
.setRecordSelection(
"DAFIELD",
net.sf.JRecord.ExternalRecordSelection.ExternalFieldSelection.newFieldSelection(
false, "RRC-TAPE-RECORD-ID", "=", "03")
)
.setRecordSelection(
"DAFLDSPC",
net.sf.JRecord.ExternalRecordSelection.ExternalFieldSelection.newFieldSelection(
false, "RRC-TAPE-RECORD-ID", "=", "04") );
FieldNames.RecordDafield rDafield = FieldNames.RECORD_DAFIELD;
FieldNames.RecordDafldspc rDafldspc = FieldNames.RECORD_DAFLDSPC;
OutputStream out = new BufferedOutputStream(System.out);
AbstractLineReader reader = iob.newReader(dataFile);
while ((line = reader.read()) != null) {
lineNum += 1;
if ("03".equals(line.getFieldValue(rDafield.rrcTapeRecordId).asString().trim())) {
JSONObject dafield = new JSONObject();
dafield.put("rrcTapeRecordId",line.getFieldValue(rDafield.rrcTapeRecordId).asString().trim());
dafield.put("daFieldNumber",line.getFieldValue(rDafield.daFieldNumber).asString().trim());
dafield.put("daFieldApplicationWellCode",line.getFieldValue(rDafield.daFieldApplicationWellCode).asString().trim());
dafield.put("da1995FieldApplWellCode",line.getFieldValue(rDafield.da1995FieldApplWellCode).asString().trim());
dafield.put("daFieldRule38Flag",line.getFieldValue(rDafield.daFieldRule38Flag).asString().trim());
dafield.put("rrcTapeFiller",line.getFieldValue(rDafield.rrcTapeFiller).asString().trim());
dafieldArray.put(dafield);
}
if ("04".equals(line.getFieldValue(rDafldspc.rrcTapeRecordId).asString().trim())) {
JSONObject dafldspc = new JSONObject();
dafldspc.put("rrcTapeRecordId",line.getFieldValue(rDafldspc.rrcTapeRecordId).asString().trim());
dafldspc.put("daFieldDistrict",line.getFieldValue(rDafldspc.daFieldDistrict).asString().trim());
dafldspc.put("daFieldLeaseName",line.getFieldValue(rDafldspc.daFieldLeaseName).asString().trim());
dafldspc.put("daFieldTotalDepth",line.getFieldValue(rDafldspc.daFieldTotalDepth).asString().trim());
dafldspc.put("daFieldWellNumber",line.getFieldValue(rDafldspc.daFieldWellNumber).asString().trim());
dafldspc.put("daFieldAcres",line.getFieldValue(rDafldspc.daFieldAcres).asString().trim());
dafldspc.put("rrcTapeFiller",line.getFieldValue(rDafldspc.rrcTapeFiller).asString().trim());
result.put("DAFIELD", dafieldArray);
result.put("DAFLDSPC",dafldspcArray);
output.write(result.toString().getBytes());
out.flush();
dafieldArray.clear();
dafldspcArray.clear();
} catch (Exception e)
{
e.printStackTrace();
}
}