Накопить результат в мгновение ока

Я пытаюсь узнать некоторые веса для линейного онлайн-классификатора, используя Flink. Итак, у меня изначально вектор весов инициализирован до нуля. Для каждого нового экземпляра я хочу обновить этот вес. Я читал код для Flink GradientDescent и MultipleLinearRegression, чтобы попытаться найти лучший способ сделать это.

Пока что то, что я пробовал, не работает:

val dimensionsDS = input.map(_.vector.size).reduce((_, b) => b)

val initialWeights = dimensionsDS.map {
    dimension =>
      val values = Array.fill(dimension)(0.0)
      WeightVector(DenseVector(values), .0)
}

val finalWeights = initialWeights.iterate(1) { weightVectorDS =>
    input.mapWithBcVariable(weightVectorDS) { (data, wv) =>
      import Breeze._

      val vector = data.vector.asBreeze
      val w = wv.weights.asBreeze

      val pred = vector dot w

      if (pred * data.label <= 1) {
        vector :*= eta * data.label
        val wt = w + vector
        wt :*= math.min(1.0, 1 / (math.sqrt(lambda) * norm(wt)))

        // Truncate
        if (wt.toArray.count(_ != 0) > nFeatures) {
          val topN = wt.toArray.zipWithIndex.sortBy(-_._1)
          for (i <- nFeatures + 1 until wt.size)
            wt(topN(i)._2) = 0
          WeightVector(DenseVector(wt.toArray), 0.0)
        } else wv
      } else {
        wv
      }
    }
  }

Проблема с этим кодом в том, что wvначальные веса всегда одинаковы, и он не обновляется для каждого нового экземпляра map функция. Это имеет смысл, так как mapWithBcVariable только передает переменную всем узлам, но не обновляет ее.

Самый простой способ - сделать начальные веса изменяемыми, но я не думаю, что это хорошая идея.

Кто-нибудь знает, как я могу обновить веса для каждого экземпляра input?

Обновить

Это изменчивая версия:

val values = Array.fill(dimensionsDS)(0.0)
var w0 = WeightVector(DenseVector(values), .0).weights.asBreeze

val finalWeights = input.map { data =>
  val vector = data.vector.asBreeze
  val pred = vector dot w0

  if (pred * data.label <= 1) {
      vector :*= eta * data.label
      w0 = w0 + vector
      w0 :*= math.min(1.0, 1 / (math.sqrt(lambda) * norm(w0)))

      // Truncate
      if (w0.toArray.count(_ != 0) > nFeatures) {
        val topN = w0.toArray.zipWithIndex.sortBy(-_._1)
        for (i <- nFeatures + 1 until w0.size)
          w0(topN(i)._2) = 0
      }
    }
    w0
  }

0 ответов

Другие вопросы по тегам