Работа с большими файлами с определенным расширением с помощью оператора сканирования каталогов

У меня есть файл размером более 1 ГБ, поступающий в мой каталог из MQ, для полной передачи файла требуется некоторое время, но файл будет создан в этом каталоге, даже если он не полный. Боюсь, мой оператор directoryScan подберет неполный файл. Кроме того, я не могу добавить начальную задержку, потому что я не уверен, сколько времени займет передача файла.

PS: я где-то читал, что некоторые протоколы передачи файлов позаботятся об этом, добавив другое расширение к файлу, пока оно не будет завершено. Скажем, мой оператор directoryScan ожидает любой файл с расширением.txt, поэтому этот протокол передачи файлов будет создавать файл с расширением.abc до тех пор, пока передача не будет завершена.

Как я должен идти вперед с этим?

1 ответ

Если вы собираетесь использовать маршрут с регулярным выражением, вот пример вызова оператора для чтения только файлов с определенным расширением:

   // DirectoryScan operator with an absolute file argument and a file name pattern         
stream<rstring name> Dir2 = DirectoryScan()                                        
{                                                                                        
  param                                                                                  
    directory : "/tmp/work";                                                             
   pattern : "\\.txt$";                                                                 
}   

Если это не сработает, возможно ли настроить MQ для записи файла в другой каталог, а затем переместить его в целевой каталог после его завершения?

Если вы знаете размер файла, вы можете использовать Size() функция, чтобы игнорировать файл, пока он не станет нужного размера. Этот фрагмент использует Custom Оператор ожидает, пока размер файла не станет меньше 2000 байт.

graph
        stream <rstring filename, uint64 size> DirScanOutput = DirectoryScan() {
            param
                directory: "test1";
                sleepTime: 10.0; //wait 10s between scans
                pattern: ".*\\.txt";

          output DirScanOutput : size= Size();
        } 


        stream<rstring file> FileNameStream= Custom(DirScanOutput as In){
            logic
                onTuple In:{
                    if (size < 2000ul){
                        printStringLn("Required size not met yet.");
                    } else {
                        printStringLn("Size of file reached.");
                        submit({file=filename}, FileNameStream);

                    }
                }
        }

        stream <cityData> CityDataRecord = FileSource(FileNameStream) {
            param
                format: csv;
        } 

Я надеюсь, что одно из этих предложений работает для вас.

Другие вопросы по тегам