Как рассчитать агрегации в окне, когда показания датчика не отправляются, если они не изменились с момента последнего события?

Как я могу вычислить агрегации на окне от датчика, когда новые события отправляются, только если значение датчика изменилось с момента последнего события? Показания датчика снимаются в фиксированное время, например каждые 5 секунд, но они передаются только в том случае, если показания изменяются с момента последнего считывания.

Итак, если я хотел бы создать среднее значение signal_stength для каждого устройства:

eventsDF = ... 
avgSignalDF = eventsDF.groupBy("deviceId").avg("signal_strength")

Например, события, отправленные устройством за одноминутное окно:

event_time  device_id  signal_strength
12:00:00    1          5
12:00:05    1          4
12:00:30    1          5
12:00:45    1          6
12:00:55    1          5

Тот же набор данных с событиями, которые на самом деле не отправлены, заполнены:

event_time  device_id  signal_strength
12:00:00    1          5
12:00:05    1          4
12:00:10    1          4
12:00:15    1          4
12:00:20    1          4
12:00:25    1          4
12:00:30    1          5
12:00:35    1          5
12:00:40    1          5
12:00:45    1          6
12:00:50    1          6
12:00:55    1          5

Сигнал силы sum является 57 и avg является 57/12

Как эти недостающие данные могут быть выведены с помощью искровой структурированной потоковой передачи и среднего значения, рассчитанного по выведенным значениям?

Примечание. Я использовал среднее значение в качестве примера агрегации, но решение должно работать для любой функции агрегирования.

2 ответа

Решение

Редакция:

Я изменил логику для вычисления среднего только из отфильтрованного dataframe, так что это устраняет пробелы.

//input structure
case class StreamInput(event_time: Long, device_id: Int, signal_strength: Int)
//columns for which we want to maintain state
case class StreamState(prevSum: Int, prevRowCount: Int, prevTime: Long, prevSignalStrength: Int, currentTime: Long, totalRow: Int, totalSum: Int, avg: Double)
//final result structure
case class StreamResult(event_time: Long, device_id: Int, signal_strength: Int, avg: Double)

val filteredDF = ???  //get input(filtered rows only)

val interval = 5  // event_time interval

// using .mapGroupsWithState to maintain state for runningSum & total row count till now

// you need to set the timeout threshold to indicate how long you wish to maintain the state
val avgDF = filteredDF.groupByKey(_.device_id)
  .mapGroupsWithState[StreamState, StreamResult](GroupStateTimeout.NoTimeout()) {

  case (id: Int, eventIter: Iterator[StreamInput], state: GroupState[StreamState]) => {
    val events = eventIter.toSeq

    val updatedSession = if (state.exists) {
      //if state exists update the state with the new values
      val existingState = state.get

      val prevTime = existingState.currentTime
      val currentTime = events.map(x => x.event_time).last
      val currentRowCount = (currentTime - prevTime)/interval
      val rowCount = existingState.rowCount + currentRowCount.toInt
      val currentSignalStength = events.map(x => x.signal_strength).last

      val total_signal_strength = currentSignalStength + 
        (existingState.prevSignalStrength * (currentRowCount -1)) + 
        existingState.total_signal_strength

      StreamState(
        existingState.total_signal_strength,
        existingState.rowCount,
        prevTime,
        currentSignalStength,
        currentTime,
        rowCount,
        total_signal_strength.toInt,
        total_signal_strength/rowCount.toDouble
      )

    } else {
      // if there are no earlier state
      val runningSum = events.map(x => x.signal_strength).sum
      val size = events.size.toDouble
      val currentTime = events.map(x => x.event_time).last
      StreamState(0, 1, 0, runningSum, currentTime, 1, runningSum, runningSum/size)
    }

    //save the updated state
    state.update(updatedSession)
    StreamResult(
      events.map(x => x.event_time).last,
      id,
      events.map(x => x.signal_strength).last,
      updatedSession.avg
    )
  }
}

val result = avgDF
  .writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start

Идея состоит в том, чтобы рассчитать две новые колонки:

  1. totalRowCount: промежуточный итог количества строк, которые должны присутствовать, если вы не отфильтровали.
  2. total_signal_strength: промежуточный итог signal_strength до сих пор. (это также включает пропущенные итоги строк).

Его рассчитывается по:

total_signal_strength = 
  current row's signal_strength  +  
  (total_signal_strength of previous row * (rowCount -1)) + 
  //rowCount is the count of missed rows computed by comparing previous and current event_time.
  previous total_signal_strength

формат промежуточного состояния:

+----------+---------+---------------+---------------------+--------+
|event_time|device_id|signal_strength|total_signal_strength|rowCount|
+----------+---------+---------------+---------------------+--------+
|         0|        1|              5|                    5|       1|
|         5|        1|              4|                    9|       2|
|        30|        1|              5|                   30|       7|
|        45|        1|              6|                   46|      10|
|        55|        1|              5|                   57|      12|
+----------+---------+---------------+---------------------+--------+

конечный результат:

+----------+---------+---------------+-----------------+
|event_time|device_id|signal_strength|              avg|
+----------+---------+---------------+-----------------+
|         0|        1|              5|              5.0|
|         5|        1|              4|              4.5|
|        30|        1|              5|4.285714285714286|
|        45|        1|              6|              4.6|
|        55|        1|              5|             4.75|
+----------+---------+---------------+-----------------+

Математически эквивалентно средневзвешенной задаче на основе продолжительности:

avg=(signal_strength*duration)/60

Задача здесь состоит в том, чтобы получить длительность для каждого сигнала, один вариант здесь для каждой микропакеты, собрать результат в драйвере, тогда это все статистические проблемы, чтобы получить длительность, вы можете сделать левое смещение по времени начала, затем вычесть, что-то вроде этот:

window.start.leftShift(1)-window.start

который дал бы вам:

event_time  device_id  signal_strength duration
12:00:00    1          5                  5(5-0)
12:00:05    1          4                  25(30-5)
12:00:30    1          5                  15(45-30)
12:00:45    1          6                  10(55-45)
12:00:55    1          5                  5 (60-55)

(5*5+4*25+5*15+6*10+5*5)/60=57/12

Начиная со структурированной потоковой передачи Spark 2.3.2, вы должны написать свой собственный настроенный приемник, чтобы собирать результаты каждого этапа в драйвер и выполнять математическую работу таким образом.

Другие вопросы по тегам