Что делает "row +: SavedState.toSeq" в StateStoreRestoreExec.doExecute?
Мы можем видеть StateStoreRestoreExec следующим образом.
case class StateStoreRestoreExec(
keyExpressions: Seq[Attribute],
stateId: Option[OperatorStateId],
child: SparkPlan)
extends UnaryExecNode with StateStoreReader {
override protected def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
numOutputRows += 1
row +: savedState.toSeq
}
}
Здесь мне интересно значение row +: savedState.toSeq
, Я думаю, что row является экземпляром UnsafeRow, а saveState.toSeq является экземпляром Seq. Так как мы можем управлять ими с +:
, С другой стороны, я думаю, что SavedState является экземпляром UnsafeRow, а toSeq не является членом UnsaveRow, так как же savedState.toSeq
Работа?
1 ответ
Решение
row
является примером InternalRow
, а также savedState
является Option[UnsafeRow]
, который расширяет InternalRow
, Здесь происходит то, что сохраненное состояние преобразуется из Option[UnsafeRow]
к Seq[UnsafeRow]
а затем row
Экземпляр добавляется к этой последовательности.
Когда ты flatMap
над этим UnsafeRow
объекты, вы получите обратно Iterator[UnsafeRow]
,