Как обрабатывать новые файлы в каталоге HDFS, когда их запись в конце концов закончена?
В моем случае у меня есть файлы CSV, постоянно загружаемые в HDFS.
Как только новый файл будет загружен, я хотел бы обработать новый файл с помощью Spark SQL (например, вычислить максимум поля в файле, преобразовать файл в parquet
). т.е. у меня есть взаимно-однозначное сопоставление между каждым входным файлом и преобразованным / обработанным выходным файлом.
Я оценивал Spark Streaming для прослушивания каталога HDFS, а затем для обработки "потокового файла" с помощью Spark.
Однако, чтобы обработать весь файл, мне нужно знать, когда завершится "поток файлов". Я хотел бы применить преобразование ко всему файлу, чтобы сохранить сквозное однозначное сопоставление между файлами.
Как я могу преобразовать весь файл, а не его микропакеты?
Насколько я знаю, Spark Streaming может применять преобразование только к пакетам (DStreams
сопоставлены с RDDs
) и не ко всему файлу сразу (когда его конечный поток завершен).
Это верно? Если так, какую альтернативу я должен рассмотреть для моего сценария?
2 ответа
Возможно, я неправильно понял ваш вопрос с первой попытки...
Насколько я знаю, Spark Streaming может применять преобразование только к пакетам (DStreams, сопоставленным с RDD), а не ко всему файлу сразу (когда его конечный поток завершен).
Это верно?
Нет, это не правильно.
Spark Streaming будет применять преобразование ко всему файлу сразу, как это было записано в HDFS в то время, когда интервал между партиями Spark Streaming истек.
Spark Streaming примет текущее содержимое файла и начнет его обработку.
Как только новый файл загружен, мне нужно обработать новый файл с помощью Spark/SparkSQL
С Spark это практически невозможно из-за его архитектуры, которая занимает некоторое время с момента "загрузки" и обработки Spark.
Вы должны рассмотреть возможность использования совершенно новой и блестящей структурированной потоковой передачи или (скоро устаревшей) потоковой передачи Spark.
Оба решения поддерживают просмотр каталога для новых файлов и запускают задание Spark после загрузки нового файла (что является именно вашим вариантом использования).
Цитирование источников ввода структурированного потока:
В Spark 2.0 есть несколько встроенных источников.
- Источник файла - читает файлы, записанные в каталоге, как поток данных. Поддерживаемые форматы файлов: текст, CSV, JSON, паркет. См. Документы интерфейса DataStreamReader для более актуального списка и поддерживаемых опций для каждого формата файла. Обратите внимание, что файлы должны быть атомарно размещены в заданном каталоге, что в большинстве файловых систем может быть достигнуто с помощью операций перемещения файлов.
Смотрите также Основные источники Spark Streaming:
Помимо сокетов, StreamingContext API предоставляет методы для создания DStreams из файлов в качестве входных источников.
Файловые потоки. Для чтения данных из файлов любой файловой системы, совместимой с API HDFS (то есть HDFS, S3, NFS и т. Д.), DStream может быть создан как:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming будет отслеживать каталог dataDirectory и обрабатывать любые файлы, созданные в этом каталоге (файлы, записанные во вложенных каталогах, не поддерживаются).
Одно предостережение, учитывая ваше требование:
Мне нужно знать, когда завершится "поток файлов".
Не делай этого с Спарк.
Снова процитируем основные источники Spark Streaming:
Файлы должны быть созданы в dataDirectory путем атомарного перемещения или переименования их в каталог данных.
После перемещения файлы не должны быть изменены. Таким образом, если файлы постоянно добавляются, новые данные не будут прочитаны.
Подводя итоги... вы должны перемещать файлы в каталог, который Spark наблюдает, когда файлы завершены и готовы к обработке с помощью Spark. Это выходит за рамки Spark.
Вы можете использовать DFSInotifyEventInputStream для просмотра каталога Hadoop, а затем программно выполнять задание Spark при создании файла.
Смотрите это сообщение: HDFS File Watcher