Рекурсивный вызов метода в Apache Spark

Я строю семейное дерево из базы данных на Apache Spark, используя рекурсивный поиск, чтобы найти конечного родителя (то есть человека на вершине семейного дерева) для каждого человека в БД. Для этого предполагается, что первый человек, вернувшийся при поиске своего идентификатора, является правильным родителем.

val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personSeq = peopleById.lookup(personId)
    val person = personSeq(0)
    if(person.personId == "0 "|| person.id == person.parentId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

Это приводит к следующей ошибке "Причины: org.apache.spark.SparkException: преобразования и действия RDD могут вызываться только драйвером, но не внутри других преобразований; например, rdd1.map(x => rdd2.values.count() * x) недопустимо, поскольку преобразование значений и действие count не могут быть выполнены внутри преобразования rdd1.map. Для получения дополнительной информации см. SPARK-5063."

Из других похожих вопросов я понимаю, что проблема в том, что я вызываю findUltimateParent Id из цикла foreach, и если я вызываю метод из оболочки с идентификатором человека, он возвращает правильный конечный идентификатор родителя.

Тем не менее, ни одно из других предложенных решений не работает для меня, или, по крайней мере, я не вижу, как реализовать их в моей программе, кто-нибудь может помочь?

2 ответа

Решение

Если я вас правильно понял - вот решение, которое будет работать для любого размера ввода (хотя производительность может быть не очень высокой) - оно выполняет N итераций над RDD, где N - "самая глубокая семья" (наибольшее расстояние от предка до ребенка) в вход:

// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])

// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
  def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}

object RecursiveParentLookup {
  // requested method
  def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {

    // all persons keyed by id
    def byId = rdd.keyBy(_.id).cache()

    // recursive function that "climbs" one generation at each iteration
    def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
      val cached = persons.cache()
      // find which persons can climb further up family tree
      val haveGrandparents = cached.filter(_.hasGrandparent)

      if (haveGrandparents.isEmpty()) {
        cached // we're done, return result
      } else {
        val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
        // for those who can - join with persons to find the grandparent and attach it instead of parent
        val withGrandparents = haveGrandparents
          .keyBy(_.ancestor.get.parentId.get) // grandparent id
          .join(byId)
          .values
          .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
        // call this method recursively on the result
        done ++ climbOneGeneration(withGrandparents)
      }
    }

    // call recursive method - start by assuming each person is its own parent, if it has one:
    climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
  }

}

Вот тест, чтобы лучше понять, как это работает:

/**
  *     Example input tree:
  *
  *            1             5
  *            |             |
  *      ----- 2 -----       6
  *      |           |
  *      3           4
  *
  */

val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))

test("find ultimate parent") {
  val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
  val result = RecursiveParentLookup.findUltimateParent(input).collect()
  result should contain theSameElementsAs Seq(
    WithAncestor(person1, None),
    WithAncestor(person2, Some(person1)),
    WithAncestor(person3, Some(person1)),
    WithAncestor(person4, Some(person1)),
    WithAncestor(person5, None),
    WithAncestor(person6, Some(person5))
  )
}

Должно быть легко отобразить ваш вклад в эти Person объекты и отобразить вывод WithAncestor объекты в то, что вам нужно. Обратите внимание, что этот код предполагает, что если у любого человека есть parentId X - другой человек с таким идентификатором действительно существует во входных данных.

Исправили это с помощью SparkContext.broadcast:

val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())

def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personOption = broadcastedPeople.value.get(personId)
    if(personOption.isEmpty) {

        return "0";

    }
    val person = personOption.get
    if(person.personId == 0 || person.orgId == person.personId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

отлично работает сейчас!

Другие вопросы по тегам