Как вычислить смежные данные с помощью spark/scala

Я хава RDD, то RDD тип Tuple2(value,timestamp)значение равно 1 или 0, временная метка является последовательной, а переменная limitTime=4. Когда я сопоставляю RDDЕсли значение равно 1, выходное значение от текущей метки времени до (timestamp +limitTime) равно 1, в противном случае текущее значение равно 0, я называю это периодом. Но есть особый случай ,, когда значение равно 1, а его отметка времени находится в периоде ,, то оно игнорируется, текущее значение выхода равно 0

input :          (0,0),(1,1),(0,3),(0,5),(0,7),(0,8),(0,10),(1,12),(0,14),(0,15)
expected output :(0,0),(1,1),(1,3),(1,5),(0,7),(0,8),(0,10),(1,12),(1,14),(1,15)

special input2:  (0,0),(1,1),(0,3),(1,5),(0,7),(1,8),(0,10),(1,12),(0,14),(0,15)
expected output2:(0,0),(1,1),(1,3),(1,5),(0,7),(1,8),(1,10),(1,12),(0,14),(0,15)
this is my try:

 var limitTime=4
    var startTime= -limitTime
  val rdd=sc.parallelize(List((0,0),(1,1),(0,3),(1,5),(0,7),(1,8),(0,10),(1,12),(0,14),(0,15)),4)
      val results=rdd.mapPartitions(parIter => {
        var resultIter = new ArrayBuffer[Tuple2[Int,Int]]()
        while (parIter.hasNext) {
          val iter = parIter.next()
          if(iter._1==1){
            if(iter._2<=startTime+limitTime&&iter._2!=0&&iter._2>=startTime){
              resultIter.append(iter)
            }else{
              resultIter.append(iter)
              startTime=iter._2
            }
          }else{
            if(iter._2<=startTime+limitTime&&iter._2!=0&&iter._2>=startTime){
              resultIter.append((1,iter._2))
            }else{
              resultIter.append(iter)
            }
          }
        }
        resultIter.toIterator
      })
    results.collect().foreach(println)

это так неэффективно, как я могу получить тот же результат без массива?

1 ответ

Следующий код должен работать для обоих ваших случаев.

var limitTime=3
var first = true
var previousValue = 0
val rdd=sc.parallelize(List((0,0),(1,1),(0,3),(0,5),(0,7),(0,8),(0,10),(1,12),(0,14),(0,15)), 4)
val tempResult = rdd.collect.map(pair => {
  if(first){
    first = false
    previousValue = pair._1
    (pair._1, pair._2)
  }
  else {
    if ((pair._1 == 1 || previousValue == 1) && limitTime > 0) {
      limitTime -= 1
      previousValue = 1
      (1, pair._2)
    }
    else {
      if (limitTime == 0) limitTime = 3
      previousValue = pair._1
      (pair._1, pair._2)
    }
  }
})
tempResult.foreach(print)

Если это не так, пожалуйста, дайте мне знать

Другие вопросы по тегам