apache spark - проверьте, существует ли файл
Я новичок, чтобы зажечь, и у меня есть вопрос. У меня есть двухэтапный процесс, в котором первый шаг - запись файла SUCCESS.txt в папку на HDFS. Мой второй шаг, который представляет собой искровое задание, должен проверить, существует ли этот файл SUCCESS.txt, прежде чем он начнет обрабатывать данные.
Я проверил API-интерфейс spark и не нашел никакого метода, который проверяет, существует ли файл. Есть идеи, как справиться с этим?
Единственный метод, который я нашел, был sc.textFile(hdfs:///SUCCESS.txt).count(), который вызывал бы исключение, когда файл не существует. Я должен поймать это исключение и написать свою программу соответственно. Мне не очень понравился этот подход. Надеясь найти лучшую альтернативу.
9 ответов
Для файла в HDFS вы можете использовать способ hadoop:
val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))
Для Pyspark вы можете достичь этого, не вызывая подпроцесс, используя что-то вроде:
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
Я скажу, лучший способ вызвать это через функцию, которая внутренне проверяет наличие файла в традиционной проверке файла hadoop.
object OutputDirCheck {
def dirExists(hdfsDirectory: String): Boolean = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
val exists = fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
return exists
}
}
Используя dbutils:
def path_exists(path):
try:
if len(dbutils.fs.ls(path)) > 0:
return True
except:
return False
Для Spark 2.0 или выше вы можете использовать существующий метод hadoop.fr.FileSystem
:
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
object Test extends App {
val spark = SparkSession.builder
.master("local[*]")
.appName("BigDataETL - Check if file exists")
.getOrCreate()
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
}
для Spark от 1,6 до 2,0
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
object Test extends App {
val sparkConf = new SparkConf().setAppName(s"BigDataETL - Check if file exists")
val sc = new SparkContext(sparkConf)
val fs = FileSystem.get(sc.hadoopConfiguration)
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
}
Для PySpark:
from py4j.protocol import Py4JJavaError
def path_exist(path):
try:
rdd = sc.textFile(path)
rdd.take(1)
return True
except Py4JJavaError as e:
return False
Для пользователей Pyspark Python:
я не нашел ничего с python или pyspark, поэтому нам нужно выполнить команду hdfs из кода python. Это сработало для меня.
Команда hdfs, чтобы получить, если папка существует: возвращает 0, если истина
hdfs dfs -test -d /folder-path
Команда hdfs, чтобы получить, если файл существует: возвращает 0, если истина
hdfs dfs -test -d /folder-path
Для размещения этого в коде Python я следовал ниже строки кода:
import subprocess
def run_cmd(args_list):
proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
return proc.returncode
cmd = ['hdfs', 'dfs', '-test', '-d', "/folder-path"]
code = run_cmd(cmd)
if code == 0:
print('folder exist')
print(code)
Выведите, если папка существует:
папка существует 0
Для Java-кодеров;
SparkConf sparkConf = new SparkConf().setAppName("myClassname");
SparkContext sparky = new SparkContext(sparkConf);
JavaSparkContext context = new JavaSparkContext(sparky);
FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
Path path = new Path(sparkConf.get(path_to_File));
if (!hdfs.exists(path)) {
//Path does not exist.
}
else{
//Path exist.
}
Ответ @Nandeesh оценивает все
Py4JJavaError
исключения. Предлагается добавить еще один шаг для оценки сообщения об ошибке исключения Java:
from py4j.protocol import Py4JJavaError
def file_exists(path):
try:
spark.sparkContext.textFile(path).take(1)
except Py4JJavaError as e:
if 'org.apache.hadoop.mapred.InvalidInputException: Input path does not exist' in str(e.java_exception):
return False
else:
return True