Распакуйте список, разбейте на пары и рассчитайте разницу

Я хочу рассчитать разницу во времени между входом и выходом для каждого идентификатора. Данные в формате:

String,Long,String,List[String]
======================================
 in, time0, door1, [id1, id2, id3, id4]
out, time1, door1, [id1, id2, id3]
out, time2, door1, [id4, id5]

В конце это должно закончиться с парами ключ-значение как:

{(id1, #time1-time0), (id2, #time1-time0), (id3, #time1-time0), (id4, #time2-time0), (id5, N/A)}

Что было бы хорошим подходом для решения этой проблемы?

РЕДАКТИРОВАТЬ: я пробовал следующее.

case class Data(direction: String, time:Long, door:String, ids:  List[String])
val data = sc.parallelize(Seq(Data("in", 5, "d1", List("id1", "id2", "id3", "id4")),Data("out", 20, "d1", List("id1", "id2", "id3")), Data("out",50, "d1", List("id4", "id5"))))
data.flatMap(x => (x.ids, x))

1 ответ

Решение
scala> case class Data( direction: String, time: Long, door: String, ids: List[ String ] )
defined class Data

scala> val data = sc.parallelize( Seq( Data( "in", 5, "d1", List( "id1", "id2", "id3", "id4" ) ), Data( "out", 20, "d1", List( "id1", "id2", "id3" ) ), Data( "out",50, "d1", List( "id4", "id5" ) ) ) )
data: org.apache.spark.rdd.RDD[Data] = ParallelCollectionRDD[0] at parallelize at <console>:14

// Get an RDD entry for each ( id, data ) pair
scala> data.flatMap( x => x.ids.map( id => ( id, x ) ) )
res0: org.apache.spark.rdd.RDD[(String, Data)] = FlatMappedRDD[1] at flatMap at <console>:17

// group by id to get data's with same id's
scala> res0.groupBy( { case ( id, data ) => id } )
res1: org.apache.spark.rdd.RDD[(String, Iterable[(String, Data)])] = ShuffledRDD[3] at groupBy at <console>:19

// convert Iterable[(String, Data)] to List[Data]
scala> res1.map( { case ( id, iter ) => ( id, iter.toList.map( { case ( i, d ) => d } ) ) } )
res2: org.apache.spark.rdd.RDD[(String, List[Data])] = MappedRDD[4] at map at <console>:21

// sort list of data's by data.time
res2.map( { case ( id, list ) => ( id, list.sortBy( d => d.time ) ) } )
res3: org.apache.spark.rdd.RDD[(String, List[Data])] = MappedRDD[5] at map at <console>:23

// get the time diff by doing lastData.time - firstData.time for each id
scala> :paste
// Entering paste mode (ctrl-D to finish)

res3.map( { case ( id, list ) => {
    list match {
        case d :: Nil => ( id, None )
        case d :: tail => ( id, Some( list.last.time - d.time ) )
        case _ => ( id, None )
    }
} } )

// Exiting paste mode, now interpreting.

res6: org.apache.spark.rdd.RDD[(String, Option[Long])] = MappedRDD[7] at map at <console>:25

Сейчас, res6 имеет желаемые данные.

Кроме того... я не был уверен, как вы хотели использовать direction поэтому я не использовал его, изменив часть кода, чтобы получить то, что вы хотите (я думаю, только последний res3 вещь должна немного измениться) или вы можете объяснить это здесь, и, может быть, я дам вам ответ. Если у вас есть другие сомнения... спросите.

Это также может быть достигнуто в более сжатой форме... но это будет трудно понять. Вот почему я предоставил подробный и более простой код.

Другие вопросы по тегам