Как обрабатывать новые файлы в каталоге HDFS, когда их запись в конце концов закончена?

В моем случае у меня есть файлы CSV, постоянно загружаемые в HDFS.

Как только новый файл будет загружен, я хотел бы обработать новый файл с помощью Spark SQL (например, вычислить максимум поля в файле, преобразовать файл в parquet). т.е. у меня есть взаимно-однозначное сопоставление между каждым входным файлом и преобразованным / обработанным выходным файлом.

Я оценивал Spark Streaming для прослушивания каталога HDFS, а затем для обработки "потокового файла" с помощью Spark.

Однако, чтобы обработать весь файл, мне нужно знать, когда завершится "поток файлов". Я хотел бы применить преобразование ко всему файлу, чтобы сохранить сквозное однозначное сопоставление между файлами.

Как я могу преобразовать весь файл, а не его микропакеты?

Насколько я знаю, Spark Streaming может применять преобразование только к пакетам (DStreams сопоставлены с RDDs) и не ко всему файлу сразу (когда его конечный поток завершен).

Это верно? Если так, какую альтернативу я должен рассмотреть для моего сценария?

2 ответа

Решение

Возможно, я неправильно понял ваш вопрос с первой попытки...

Насколько я знаю, Spark Streaming может применять преобразование только к пакетам (DStreams, сопоставленным с RDD), а не ко всему файлу сразу (когда его конечный поток завершен).

Это верно?

Нет, это не правильно.

Spark Streaming будет применять преобразование ко всему файлу сразу, как это было записано в HDFS в то время, когда интервал между партиями Spark Streaming истек.

Spark Streaming примет текущее содержимое файла и начнет его обработку.


Как только новый файл загружен, мне нужно обработать новый файл с помощью Spark/SparkSQL

С Spark это практически невозможно из-за его архитектуры, которая занимает некоторое время с момента "загрузки" и обработки Spark.

Вы должны рассмотреть возможность использования совершенно новой и блестящей структурированной потоковой передачи или (скоро устаревшей) потоковой передачи Spark.

Оба решения поддерживают просмотр каталога для новых файлов и запускают задание Spark после загрузки нового файла (что является именно вашим вариантом использования).

Цитирование источников ввода структурированного потока:

В Spark 2.0 есть несколько встроенных источников.

  • Источник файла - читает файлы, записанные в каталоге, как поток данных. Поддерживаемые форматы файлов: текст, CSV, JSON, паркет. См. Документы интерфейса DataStreamReader для более актуального списка и поддерживаемых опций для каждого формата файла. Обратите внимание, что файлы должны быть атомарно размещены в заданном каталоге, что в большинстве файловых систем может быть достигнуто с помощью операций перемещения файлов.

Смотрите также Основные источники Spark Streaming:

Помимо сокетов, StreamingContext API предоставляет методы для создания DStreams из файлов в качестве входных источников.

Файловые потоки. Для чтения данных из файлов любой файловой системы, совместимой с API HDFS (то есть HDFS, S3, NFS и т. Д.), DStream может быть создан как:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming будет отслеживать каталог dataDirectory и обрабатывать любые файлы, созданные в этом каталоге (файлы, записанные во вложенных каталогах, не поддерживаются).

Одно предостережение, учитывая ваше требование:

Мне нужно знать, когда завершится "поток файлов".

Не делай этого с Спарк.

Снова процитируем основные источники Spark Streaming:

  • Файлы должны быть созданы в dataDirectory путем атомарного перемещения или переименования их в каталог данных.

  • После перемещения файлы не должны быть изменены. Таким образом, если файлы постоянно добавляются, новые данные не будут прочитаны.

Подводя итоги... вы должны перемещать файлы в каталог, который Spark наблюдает, когда файлы завершены и готовы к обработке с помощью Spark. Это выходит за рамки Spark.

Вы можете использовать DFSInotifyEventInputStream для просмотра каталога Hadoop, а затем программно выполнять задание Spark при создании файла.

Смотрите это сообщение: HDFS File Watcher

Другие вопросы по тегам