Передача большого набора данных обратно в процессор ExecuteStreamCommand

В настоящее время я анализирую большой файл данных COBOL, используя библиотеку JRecord на Java, и конвертирую его в JSON. У меня есть функция, написанная на Java, которая выполняется ExecuteCommandProcessor в NIFI для возврата объекта JSON. Поскольку файл данных COBOL состоит из более чем 1 миллиона записей, я хотел бы обрабатывать их пакетно и передавать данные порциями обратно в файл потока ExecuteStreamCommand. Я попробовал метод OutPutStream.flush для возврата частичного набора результатов после итерации по 10000 записей, но результаты не возвращаются в файл потока NIFI. Ниже приведен пример кода. Какие изменения мне нужно внести для потоковой передачи данных обратно в файл потока ExecuteStreamCommand порциями по 10000 записей за раз.

        
        public static void main(String[] args) 
        
        {
             AbstractLine line;
            int lineNum = 0;
          String currentStatNum = null;
          String currentSeqNum = null;
            JSONObject result = new JSONObject();
            JSONArray dafieldArray = new JSONArray();
            JSONArray dafldspcArray = new JSONArray();
         
            try {
                ICobolIOBuilder iob = JRecordInterface1.COBOL
                                    .newIOBuilder(copyBookFile)
                                       //.setFont("") // Think about specifying an encoding !!!
                                       .setFileOrganization(Constants.IO_BIN_TEXT)
                                       .setSplitCopybook(CopybookLoader.SPLIT_01_LEVEL)
                                     
            
               .setRecordSelection(
                          "DAFIELD", 
                           net.sf.JRecord.ExternalRecordSelection.ExternalFieldSelection.newFieldSelection(
             false, "RRC-TAPE-RECORD-ID", "=", "03")

                      )
                      .setRecordSelection(
                          "DAFLDSPC", 
                           net.sf.JRecord.ExternalRecordSelection.ExternalFieldSelection.newFieldSelection(
             false, "RRC-TAPE-RECORD-ID", "=", "04") );
             
             
                FieldNames.RecordDafield rDafield = FieldNames.RECORD_DAFIELD;
                FieldNames.RecordDafldspc rDafldspc = FieldNames.RECORD_DAFLDSPC;
                
                 OutputStream out = new BufferedOutputStream(System.out);
                
                 AbstractLineReader reader = iob.newReader(dataFile);
                while ((line = reader.read()) != null) {
                    lineNum += 1;
                    
                   
                    
                    
                    if ("03".equals(line.getFieldValue(rDafield.rrcTapeRecordId).asString().trim())) {
                         JSONObject dafield = new JSONObject();
        
                         dafield.put("rrcTapeRecordId",line.getFieldValue(rDafield.rrcTapeRecordId).asString().trim());
                         dafield.put("daFieldNumber",line.getFieldValue(rDafield.daFieldNumber).asString().trim());
                         dafield.put("daFieldApplicationWellCode",line.getFieldValue(rDafield.daFieldApplicationWellCode).asString().trim());
                         dafield.put("da1995FieldApplWellCode",line.getFieldValue(rDafield.da1995FieldApplWellCode).asString().trim());
                         dafield.put("daFieldRule38Flag",line.getFieldValue(rDafield.daFieldRule38Flag).asString().trim());
                         dafield.put("rrcTapeFiller",line.getFieldValue(rDafield.rrcTapeFiller).asString().trim());
                        dafieldArray.put(dafield);
                        }
                    
                    
                    if ("04".equals(line.getFieldValue(rDafldspc.rrcTapeRecordId).asString().trim())) {
                        
                        JSONObject dafldspc = new JSONObject();
                        dafldspc.put("rrcTapeRecordId",line.getFieldValue(rDafldspc.rrcTapeRecordId).asString().trim());
                      dafldspc.put("daFieldDistrict",line.getFieldValue(rDafldspc.daFieldDistrict).asString().trim());
                      dafldspc.put("daFieldLeaseName",line.getFieldValue(rDafldspc.daFieldLeaseName).asString().trim());
                      dafldspc.put("daFieldTotalDepth",line.getFieldValue(rDafldspc.daFieldTotalDepth).asString().trim());
                      dafldspc.put("daFieldWellNumber",line.getFieldValue(rDafldspc.daFieldWellNumber).asString().trim());
                      dafldspc.put("daFieldAcres",line.getFieldValue(rDafldspc.daFieldAcres).asString().trim());
                      dafldspc.put("rrcTapeFiller",line.getFieldValue(rDafldspc.rrcTapeFiller).asString().trim());
                        
                    
                      result.put("DAFIELD", dafieldArray);
                      result.put("DAFLDSPC",dafldspcArray);
                      output.write(result.toString().getBytes());
                      out.flush();
                      
                     
                      dafieldArray.clear();
                      dafldspcArray.clear();
                      
                        } catch (Exception e) 
            {
        e.printStackTrace();
        }

        }

0 ответов

Другие вопросы по тегам