StreamingContext с фильтром в Scala/Spark

Я успешно использовал StreamingContext из Scala для подсчета слов:

val scc = new StreamingContext(sc,Seconds(1))
val dstream = scc.textFileStream("""file:///pathToDirectoryWindows""");
//dstream is DStream[String] 
val words = dstream.flatMap(line=>line.split(" "));

Но я попытался сделать то же самое с фильтрацией, то есть с учетом только файлов с расширением.txt. Кажется, textFileStream не позволяет фильтровать, поэтому я попытался fileStream:

val fstream=scc.fileStream("""file:///pathToFolderWin""",x=>x.getName().contains(".txt"), true); 

Но на этот раз я не могу разделить, потому что результат не DStream[String], а inputDStream[(Nothing, Nothing)] . Как поступить со строками, кроме фильтрации файлов? Большое спасибо, Леви

2 ответа

Другое решение:

import org.apache.hadoop.fs.Path
val fstream=scc.fileStream("""file:///pathToFolderWin""", (path: Path) => path.getName().endsWith(".txt"), true) 

При работе с StreamingContext.fileStream Вы должны явно указать тип ключа Hadoop, тип значения Hadoop и формат входящего. Например, если тип ключа Long вы получаете Text с форматом ввода TextInputFormat Вы бы написали:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/file")
               .map { case (key, text) => (key.toString, text.toString.split(" "))}

Это дало бы DStream[(String, Array[String)], где Array[String] ваша линия после раскола.

Или, если вы хотите только значения, которые вы напишите:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/file")
               .map { case (_, text) => text.toString.split(" "))}

И вы получите DStream[Array[String]]

редактировать

Чтобы применить фильтр к расширению файла, который вы можете, вы можете использовать Apache Commons IO - FilenameUtils.getExtension:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](
                  "/path/to/file", (file: Path) => 
                     FilenameUtils.getExtension(file.toString).equalsIgnoreCase("txt"))
               .map { case (_, text) => text.toString.split(" "))}
Другие вопросы по тегам