Накопить результат в мгновение ока
Я пытаюсь узнать некоторые веса для линейного онлайн-классификатора, используя Flink. Итак, у меня изначально вектор весов инициализирован до нуля. Для каждого нового экземпляра я хочу обновить этот вес. Я читал код для Flink GradientDescent и MultipleLinearRegression, чтобы попытаться найти лучший способ сделать это.
Пока что то, что я пробовал, не работает:
val dimensionsDS = input.map(_.vector.size).reduce((_, b) => b)
val initialWeights = dimensionsDS.map {
dimension =>
val values = Array.fill(dimension)(0.0)
WeightVector(DenseVector(values), .0)
}
val finalWeights = initialWeights.iterate(1) { weightVectorDS =>
input.mapWithBcVariable(weightVectorDS) { (data, wv) =>
import Breeze._
val vector = data.vector.asBreeze
val w = wv.weights.asBreeze
val pred = vector dot w
if (pred * data.label <= 1) {
vector :*= eta * data.label
val wt = w + vector
wt :*= math.min(1.0, 1 / (math.sqrt(lambda) * norm(wt)))
// Truncate
if (wt.toArray.count(_ != 0) > nFeatures) {
val topN = wt.toArray.zipWithIndex.sortBy(-_._1)
for (i <- nFeatures + 1 until wt.size)
wt(topN(i)._2) = 0
WeightVector(DenseVector(wt.toArray), 0.0)
} else wv
} else {
wv
}
}
}
Проблема с этим кодом в том, что wv
начальные веса всегда одинаковы, и он не обновляется для каждого нового экземпляра map
функция. Это имеет смысл, так как mapWithBcVariable
только передает переменную всем узлам, но не обновляет ее.
Самый простой способ - сделать начальные веса изменяемыми, но я не думаю, что это хорошая идея.
Кто-нибудь знает, как я могу обновить веса для каждого экземпляра input
?
Обновить
Это изменчивая версия:
val values = Array.fill(dimensionsDS)(0.0)
var w0 = WeightVector(DenseVector(values), .0).weights.asBreeze
val finalWeights = input.map { data =>
val vector = data.vector.asBreeze
val pred = vector dot w0
if (pred * data.label <= 1) {
vector :*= eta * data.label
w0 = w0 + vector
w0 :*= math.min(1.0, 1 / (math.sqrt(lambda) * norm(w0)))
// Truncate
if (w0.toArray.count(_ != 0) > nFeatures) {
val topN = w0.toArray.zipWithIndex.sortBy(-_._1)
for (i <- nFeatures + 1 until w0.size)
w0(topN(i)._2) = 0
}
}
w0
}