fs.rename(новый путь (rawFileName), новый путь (processFileName)) не работает
Я работаю над реализацией Apache Spark на основе Scala для передачи данных из удаленного местоположения в HDFS, а затем - для загрузки данных из HDFS в таблицы Hive.
Используя свое первое искровое задание, я вставил данные / файлы в HDFS в месте, скажем -
hdfs: //sandbox.hortonworks.com: 8020 / data / analytics / raw / folder
Давайте рассмотрим, что после подключения файлов CT_Click_Basic.csv и CT_Click_Basic1.csv.gz у меня есть следующие файлы в HDFS [имена файлов в общем расположении будут здесь именем папки, а ее содержимое будет присутствовать в файлах part-xxxxx]:
[root @ sandbox ~] # hdfs dfs -ls / data / analytics / raw / * / Найдено 3 объекта
-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS
-rw-r - r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000
-rw-r - r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001
Найдено 2 предмета
-rw-r - r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS
-rw-r -r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000
Теперь, используя мое другое задание Spark, я хочу переместить эти файлы как есть из папки / raw в / process и, наконец, в папку / archive в HDFS, основываясь на задачах, выполняемых на каждом этапе.
Для этого я сначала извлекаю список всех файлов, представленных в папке / raw, используя следующий код:
def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory()) {
filePaths+=fileStatus.getPath()
}
else {
listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
filePaths
}
и затем, используя следующую строку кодов, я пытаюсь переименовать / переместить файлы в папку / raw в папку / process:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
Но я не могу переместить / переименовать эти файлы, используя приведенный выше код. Я попытался отладить код и обнаружил, что fs.rename() возвращает мне "false".
Обратите внимание: я могу добиться переименования / перемещения файла, когда я копирую любой файл вручную в папку / data / analytics / raw из CT.csv [или любого другого файла], а затем запускаю fs.rename(), но он не работает для файлов Part-xxxxx.
Есть что-то, чего мне не хватает?
Любая быстрая помощь будет оценена.
С уважением, Бхупеш
2 ответа
Наконец, у меня есть проблема. На самом деле я пытался переименовать файл из /data/analytics/raw/folder.csv/part-xxxxx в /data/analytics/process/folder.csv/part-xxxxx, где / data / analytics / process присутствовал в HDFS, но "folder.csv"не было; следовательно это возвращало меня ложное, переименовывая. Я добавил следующую строку в мой код и работал нормально для меня
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
var processFolderPath = new Path(processFolderName)
if(!(fs.exists(processFolderPath)))
fs.mkdirs(processFolderPath)
val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
Rename может вернуть false, если новый Path(rawFileName) не существует.
Перед выполнением fs.rename убедитесь, что файл существует:
if (fs.exists(somePath)) {
fs.rename...
}
Другой причиной может быть тот файл, который вы пытаетесь переименовать, кто-то использует. Или, если вы попытаетесь переименовать каталог, некоторые файлы в нем могут быть использованы кем-то.
Чтобы убедиться, что это основная причина, попробуйте переименовать другие файлы в вашем коде:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**"))
}
Если это переименование будет успешным, основная причина действительно в состоянии гонки.