fs.rename(новый путь (rawFileName), новый путь (processFileName)) не работает

Я работаю над реализацией Apache Spark на основе Scala для передачи данных из удаленного местоположения в HDFS, а затем - для загрузки данных из HDFS в таблицы Hive.

Используя свое первое искровое задание, я вставил данные / файлы в HDFS в месте, скажем -

hdfs: //sandbox.hortonworks.com: 8020 / data / analytics / raw / folder

Давайте рассмотрим, что после подключения файлов CT_Click_Basic.csv и CT_Click_Basic1.csv.gz у меня есть следующие файлы в HDFS [имена файлов в общем расположении будут здесь именем папки, а ее содержимое будет присутствовать в файлах part-xxxxx]:

[root @ sandbox ~] # hdfs dfs -ls / data / analytics / raw / * / Найдено 3 объекта

-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS

-rw-r - r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000

-rw-r - r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001

Найдено 2 предмета

-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS

-rw-r -r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000

Теперь, используя мое другое задание Spark, я хочу переместить эти файлы как есть из папки / raw в / process и, наконец, в папку / archive в HDFS, основываясь на задачах, выполняемых на каждом этапе.

Для этого я сначала извлекаю список всех файлов, представленных в папке / raw, используя следующий код:

    def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
           if(!fileStatus.isDirectory()) {
                filePaths+=fileStatus.getPath()      
            }
            else {
                listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
            }
        }
  }   
  filePaths
}

и затем, используя следующую строку кодов, я пытаюсь переименовать / переместить файлы в папку / raw в папку / process:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }

Но я не могу переместить / переименовать эти файлы, используя приведенный выше код. Я попытался отладить код и обнаружил, что fs.rename() возвращает мне "false".

Обратите внимание: я могу добиться переименования / перемещения файла, когда я копирую любой файл вручную в папку / data / analytics / raw из CT.csv [или любого другого файла], а затем запускаю fs.rename(), но он не работает для файлов Part-xxxxx.

Есть что-то, чего мне не хватает?

Любая быстрая помощь будет оценена.

С уважением, Бхупеш

2 ответа

Решение

Наконец, у меня есть проблема. На самом деле я пытался переименовать файл из /data/analytics/raw/folder.csv/part-xxxxx в /data/analytics/process/folder.csv/part-xxxxx, где / data / analytics / process присутствовал в HDFS, но "folder.csv"не было; следовательно это возвращало меня ложное, переименовывая. Я добавил следующую строку в мой код и работал нормально для меня

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size

   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)

   var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
   var processFolderPath = new Path(processFolderName)
   if(!(fs.exists(processFolderPath)))
         fs.mkdirs(processFolderPath)
   val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }

Rename может вернуть false, если новый Path(rawFileName) не существует.
Перед выполнением fs.rename убедитесь, что файл существует:

if (fs.exists(somePath)) {
 fs.rename...
}

Другой причиной может быть тот файл, который вы пытаетесь переименовать, кто-то использует. Или, если вы попытаетесь переименовать каталог, некоторые файлы в нем могут быть использованы кем-то.
Чтобы убедиться, что это основная причина, попробуйте переименовать другие файлы в вашем коде:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**"))
 }

Если это переименование будет успешным, основная причина действительно в состоянии гонки.

Другие вопросы по тегам