Распакуйте список, разбейте на пары и рассчитайте разницу
Я хочу рассчитать разницу во времени между входом и выходом для каждого идентификатора. Данные в формате:
String,Long,String,List[String]
======================================
in, time0, door1, [id1, id2, id3, id4]
out, time1, door1, [id1, id2, id3]
out, time2, door1, [id4, id5]
В конце это должно закончиться с парами ключ-значение как:
{(id1, #time1-time0), (id2, #time1-time0), (id3, #time1-time0), (id4, #time2-time0), (id5, N/A)}
Что было бы хорошим подходом для решения этой проблемы?
РЕДАКТИРОВАТЬ: я пробовал следующее.
case class Data(direction: String, time:Long, door:String, ids: List[String])
val data = sc.parallelize(Seq(Data("in", 5, "d1", List("id1", "id2", "id3", "id4")),Data("out", 20, "d1", List("id1", "id2", "id3")), Data("out",50, "d1", List("id4", "id5"))))
data.flatMap(x => (x.ids, x))
1 ответ
scala> case class Data( direction: String, time: Long, door: String, ids: List[ String ] )
defined class Data
scala> val data = sc.parallelize( Seq( Data( "in", 5, "d1", List( "id1", "id2", "id3", "id4" ) ), Data( "out", 20, "d1", List( "id1", "id2", "id3" ) ), Data( "out",50, "d1", List( "id4", "id5" ) ) ) )
data: org.apache.spark.rdd.RDD[Data] = ParallelCollectionRDD[0] at parallelize at <console>:14
// Get an RDD entry for each ( id, data ) pair
scala> data.flatMap( x => x.ids.map( id => ( id, x ) ) )
res0: org.apache.spark.rdd.RDD[(String, Data)] = FlatMappedRDD[1] at flatMap at <console>:17
// group by id to get data's with same id's
scala> res0.groupBy( { case ( id, data ) => id } )
res1: org.apache.spark.rdd.RDD[(String, Iterable[(String, Data)])] = ShuffledRDD[3] at groupBy at <console>:19
// convert Iterable[(String, Data)] to List[Data]
scala> res1.map( { case ( id, iter ) => ( id, iter.toList.map( { case ( i, d ) => d } ) ) } )
res2: org.apache.spark.rdd.RDD[(String, List[Data])] = MappedRDD[4] at map at <console>:21
// sort list of data's by data.time
res2.map( { case ( id, list ) => ( id, list.sortBy( d => d.time ) ) } )
res3: org.apache.spark.rdd.RDD[(String, List[Data])] = MappedRDD[5] at map at <console>:23
// get the time diff by doing lastData.time - firstData.time for each id
scala> :paste
// Entering paste mode (ctrl-D to finish)
res3.map( { case ( id, list ) => {
list match {
case d :: Nil => ( id, None )
case d :: tail => ( id, Some( list.last.time - d.time ) )
case _ => ( id, None )
}
} } )
// Exiting paste mode, now interpreting.
res6: org.apache.spark.rdd.RDD[(String, Option[Long])] = MappedRDD[7] at map at <console>:25
Сейчас, res6
имеет желаемые данные.
Кроме того... я не был уверен, как вы хотели использовать direction
поэтому я не использовал его, изменив часть кода, чтобы получить то, что вы хотите (я думаю, только последний res3
вещь должна немного измениться) или вы можете объяснить это здесь, и, может быть, я дам вам ответ. Если у вас есть другие сомнения... спросите.
Это также может быть достигнуто в более сжатой форме... но это будет трудно понять. Вот почему я предоставил подробный и более простой код.