Написание эффективного кода в спарк и EMR
Мне нужно сделать геозону на больших данных в AWS EMR. У меня есть CSV локаций и огромная информация о логах, которые хранятся в S3 от avro fromat. Мой текущий подход заключается в чтении журналов данных из файлов S3 и csv в фрейм данных, циклический переход по файлу csv и с помощью UDF я добавляю новый столбец в лог-фрейм данных как расстояние от моего местоположения и каждого экземпляра в журнале. Было предложено подсчитать количество случаев, которые произошли на расстоянии менее определенного диапазона (25 м, 50 м,...200 м), и я просто отфильтровываю фрейм данных по вычисляемому столбцу, просчитываю их и добавляю в список. Наконец, в конце цикла я добавляю результат в dataframe, сохраняю его и повторяю для другого места в csv. На данный момент этот код очень медленный, несмотря на то, что количество филиалов невелико (всего 50 случаев), однако размер журнала очень велик, и я думаю, что мое решение не оптимально. Я буду признателен, если вы поможете мне понять, как лучше всего решать такие проблемы в Spark.
val distances = List.range(25, 225, 25) //different distances
case class Output(dealer: String, d25: Double, d50: Double, d75: Double, d100: Double, d125: Double, d150: Double,
d175: Double, d200: Double) //structure for converting list to dataframe
val diff = udf((x1: Double, y1:Double, x2:Double, y2:Double) => distance(x1, y1, x2, y2))
val address_prefix = Map("30d"->"11")
val file_location = "s3://csv_files/branch_locations.csv"
for(k<-address_prefix.keys) {
val inputS3Location = "s3://data/2017" + address_prefix(k) + "*.avro"
//reading Avrofiles
val inputData = spark.sparkContext.newAPIHadoopFile(inputS3Location,
classOf[AvroKeyInputFormat[DeviceBrqData]], classOf[AvroKey[DeviceBrqData]], classOf[NullWritable], job.getConfiguration)
.map(x => x._1.datum())
val rows = inputData.map(x => getRowFromBrqData(x))
val cleanRows = rows.filter { x => nonEmptyRow(x) }.filter{y=> nonEmptyLoc(y)}.sample(false, 0.1) //get 10% of data
val df = spark.sqlContext.createDataFrame(cleanRows, schema)
var result_list = List[List[Any]]()
val branch_csv = spark.read.csv(file_location)
val branch_rows= branch_csv.collect()
for(i<-1 to branch_rows.length-1 ) {
var long = branch_rows(i).get(8).toString.toDouble
var lat = branch_rows(i).get(7).toString.toDouble
val test = df.withColumn("Diss", diff(lit(lat), lit(long), df("latitude"), df("longitude")))
val test_grouped = test.groupBy(test("id")).agg(min(test("Diss")) as "min_dist") //drop duplicate ids and consider least distance for each id
var temp = List[Any]() //to save results
temp = branch_rows(i).get(1).toString :: temp //save branchname in list
distances.foreach { j=>
val value = test_grouped.filter(test_grouped("min_dist")<= j).count()
temp = temp ::: List(value.toDouble)
}
result_list = temp :: result_list
}
import spark.implicits._
val list = result_list.map(x => Output(x(0).toString, x(1).toString.toDouble, x(2).toString.toDouble,x(3).toString.toDouble,
x(4).toString.toDouble,x(5).toString.toDouble,x(6).toString.toDouble,x(7).toString.toDouble,x(8).toString.toDouble))
val result_rdd = spark.sparkContext.parallelize(list).repartition(1)
val outputPath = "S3://"+k+"_processed_branch.csv"
result_rdd.toDF("branch_name","d25","d50","d75","d100","d125","d150","d175","d200").write.format("csv").option("header", true).save(outputPath)