spark 2.2.0, при вызове cache() получаются разные результаты
Я озадачен этим фрагментом кода, и мне интересно, если это ошибка искры. Мне удалось сузить его до этого теста:
val path = tempFolder
Seq(1, 2).toDS.write.parquet(path)
val loaded = spark.read.parquet(path).as[Int]
val filtered = loaded.filter(_ < 2)
.cache() // this line creates the problem. Remove it and filtered.toSet is Set(1)
Seq(-1,-2).toDS.write.mode(SaveMode.Append).parquet(path)
// this should be the correct value
// filtered.collect.toSet should be(Set(1))
// but instead we get this incorrect value
filtered.collect.toSet should be(Set(-1,-2,1))
Итак, как вы можете видеть, фильтр читает добавленные данные. Кто-то может сказать, что это нормально, потому что искра всегда ленится. Но это не должно иметь место, потому что отфильтрованный - это преобразование загруженного, которое должно содержать только [1,2]. Кроме того, если я удаляю.cache(), тогда фильтруется действительно только [1], что, на мой взгляд, является правильным значением и соответствует тому, что обычно я ожидаю получить от spark.
Как вы думаете, это ошибка?