StreamingContext с фильтром в Scala/Spark
Я успешно использовал StreamingContext из Scala для подсчета слов:
val scc = new StreamingContext(sc,Seconds(1))
val dstream = scc.textFileStream("""file:///pathToDirectoryWindows""");
//dstream is DStream[String]
val words = dstream.flatMap(line=>line.split(" "));
Но я попытался сделать то же самое с фильтрацией, то есть с учетом только файлов с расширением.txt. Кажется, textFileStream не позволяет фильтровать, поэтому я попытался fileStream:
val fstream=scc.fileStream("""file:///pathToFolderWin""",x=>x.getName().contains(".txt"), true);
Но на этот раз я не могу разделить, потому что результат не DStream[String], а inputDStream[(Nothing, Nothing)] . Как поступить со строками, кроме фильтрации файлов? Большое спасибо, Леви
2 ответа
Другое решение:
import org.apache.hadoop.fs.Path
val fstream=scc.fileStream("""file:///pathToFolderWin""", (path: Path) => path.getName().endsWith(".txt"), true)
При работе с StreamingContext.fileStream
Вы должны явно указать тип ключа Hadoop, тип значения Hadoop и формат входящего. Например, если тип ключа Long
вы получаете Text
с форматом ввода TextInputFormat
Вы бы написали:
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/file")
.map { case (key, text) => (key.toString, text.toString.split(" "))}
Это дало бы DStream[(String, Array[String)]
, где Array[String]
ваша линия после раскола.
Или, если вы хотите только значения, которые вы напишите:
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/file")
.map { case (_, text) => text.toString.split(" "))}
И вы получите DStream[Array[String]]
редактировать
Чтобы применить фильтр к расширению файла, который вы можете, вы можете использовать Apache Commons IO - FilenameUtils.getExtension
:
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](
"/path/to/file", (file: Path) =>
FilenameUtils.getExtension(file.toString).equalsIgnoreCase("txt"))
.map { case (_, text) => text.toString.split(" "))}