apache spark - проверьте, существует ли файл

Я новичок, чтобы зажечь, и у меня есть вопрос. У меня есть двухэтапный процесс, в котором первый шаг - запись файла SUCCESS.txt в папку на HDFS. Мой второй шаг, который представляет собой искровое задание, должен проверить, существует ли этот файл SUCCESS.txt, прежде чем он начнет обрабатывать данные.

Я проверил API-интерфейс spark и не нашел никакого метода, который проверяет, существует ли файл. Есть идеи, как справиться с этим?

Единственный метод, который я нашел, был sc.textFile(hdfs:///SUCCESS.txt).count(), который вызывал бы исключение, когда файл не существует. Я должен поймать это исключение и написать свою программу соответственно. Мне не очень понравился этот подход. Надеясь найти лучшую альтернативу.

9 ответов

Для файла в HDFS вы можете использовать способ hadoop:

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))

Для Pyspark вы можете достичь этого, не вызывая подпроцесс, используя что-то вроде:

fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))

Я скажу, лучший способ вызвать это через функцию, которая внутренне проверяет наличие файла в традиционной проверке файла hadoop.

object OutputDirCheck {
  def dirExists(hdfsDirectory: String): Boolean = {
    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
    val exists = fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
    return exists
  }
}

Используя dbutils:

def path_exists(path):
  try:
    if len(dbutils.fs.ls(path)) > 0:
      return True
  except:
    return False

Для Spark 2.0 или выше вы можете использовать существующий метод hadoop.fr.FileSystem
:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

object Test extends App {
  val spark = SparkSession.builder
    .master("local[*]")
    .appName("BigDataETL - Check if file exists")
    .getOrCreate()

  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  // This methods returns Boolean (true - if file exists, false - if file doesn't exist
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")
}

для Spark от 1,6 до 2,0

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object Test extends App {
  val sparkConf = new SparkConf().setAppName(s"BigDataETL - Check if file exists")
  val sc = new SparkContext(sparkConf)
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")
}

Для PySpark:

from py4j.protocol import Py4JJavaError
def path_exist(path):
    try:
        rdd = sc.textFile(path)
        rdd.take(1)
        return True
    except Py4JJavaError as e:
        return False

Для пользователей Pyspark Python:

я не нашел ничего с python или pyspark, поэтому нам нужно выполнить команду hdfs из кода python. Это сработало для меня.

Команда hdfs, чтобы получить, если папка существует: возвращает 0, если истина

hdfs dfs -test -d /folder-path

Команда hdfs, чтобы получить, если файл существует: возвращает 0, если истина

hdfs dfs -test -d /folder-path 

Для размещения этого в коде Python я следовал ниже строки кода:

import subprocess

def run_cmd(args_list):
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
    proc.communicate()
    return proc.returncode

cmd = ['hdfs', 'dfs', '-test', '-d', "/folder-path"]
            code = run_cmd(cmd)
if code == 0:
    print('folder exist')
    print(code) 

Выведите, если папка существует:

папка существует 0

Для Java-кодеров;

 SparkConf sparkConf = new SparkConf().setAppName("myClassname");
        SparkContext sparky = new SparkContext(sparkConf);       
        JavaSparkContext context = new JavaSparkContext(sparky);

     FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
            Path path = new Path(sparkConf.get(path_to_File));

            if (!hdfs.exists(path)) {
                 //Path does not exist.
            } 
         else{
               //Path exist.
           }

Ответ @Nandeesh оценивает все Py4JJavaErrorисключения. Предлагается добавить еще один шаг для оценки сообщения об ошибке исключения Java:

      from py4j.protocol import Py4JJavaError


def file_exists(path):
    try:
        spark.sparkContext.textFile(path).take(1)
    except Py4JJavaError as e:
        if 'org.apache.hadoop.mapred.InvalidInputException: Input path does not exist' in str(e.java_exception):
            return False
        else:
            return True
Другие вопросы по тегам