Что делает "row +: SavedState.toSeq" в StateStoreRestoreExec.doExecute?

Мы можем видеть StateStoreRestoreExec следующим образом.

case class StateStoreRestoreExec(
    keyExpressions: Seq[Attribute],
    stateId: Option[OperatorStateId],
    child: SparkPlan)
  extends UnaryExecNode with StateStoreReader {

  override protected def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")

  child.execute().mapPartitionsWithStateStore(
    getStateId.checkpointLocation,
    operatorId = getStateId.operatorId,
    storeVersion = getStateId.batchId,
    keyExpressions.toStructType,
    child.output.toStructType,
    sqlContext.sessionState,
    Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
      val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
      iter.flatMap { row =>
        val key = getKey(row)
        val savedState = store.get(key)
        numOutputRows += 1
        row +: savedState.toSeq
      }
}

Здесь мне интересно значение row +: savedState.toSeq, Я думаю, что row является экземпляром UnsafeRow, а saveState.toSeq является экземпляром Seq. Так как мы можем управлять ими с +:, С другой стороны, я думаю, что SavedState является экземпляром UnsafeRow, а toSeq не является членом UnsaveRow, так как же savedState.toSeq Работа?

1 ответ

Решение

row является примером InternalRow, а также savedState является Option[UnsafeRow], который расширяет InternalRow, Здесь происходит то, что сохраненное состояние преобразуется из Option[UnsafeRow] к Seq[UnsafeRow] а затем row Экземпляр добавляется к этой последовательности.

Когда ты flatMap над этим UnsafeRow объекты, вы получите обратно Iterator[UnsafeRow],

Другие вопросы по тегам