spark 2.2.0, при вызове cache() получаются разные результаты

Я озадачен этим фрагментом кода, и мне интересно, если это ошибка искры. Мне удалось сузить его до этого теста:

val path = tempFolder
Seq(1, 2).toDS.write.parquet(path)

val loaded = spark.read.parquet(path).as[Int]
val filtered = loaded.filter(_ < 2)
  .cache() // this line creates the problem. Remove it and filtered.toSet is Set(1)

Seq(-1,-2).toDS.write.mode(SaveMode.Append).parquet(path)

// this should be the correct value
//    filtered.collect.toSet should be(Set(1))

// but instead we get this incorrect value
filtered.collect.toSet should be(Set(-1,-2,1))

Итак, как вы можете видеть, фильтр читает добавленные данные. Кто-то может сказать, что это нормально, потому что искра всегда ленится. Но это не должно иметь место, потому что отфильтрованный - это преобразование загруженного, которое должно содержать только [1,2]. Кроме того, если я удаляю.cache(), тогда фильтруется действительно только [1], что, на мой взгляд, является правильным значением и соответствует тому, что обычно я ожидаю получить от spark.

Как вы думаете, это ошибка?

0 ответов

Другие вопросы по тегам