Манипулирование данными с помощью ZIO Stream — компилируется и запускается, но не завершается.
Я создал 3 разные версии мини-конвейера обработки данных. Один с Scala Views, один с FS2 и третий с ZIO Streams. Реализация View и FS2 запускается и завершается довольно быстро (FS2 намного быстрее). Однако моя реализация ZIO компилируется и запускается, но не завершается. Мне было интересно, есть ли у кого-нибудь идеи, почему он не завершается (данные составляют около 900 МБ).
Реализация ZIO (компилируется и запускается, но не завершается):
`
val zeroTupple = (0.0, 0.0)
val monoid = Monoid[(Double, Double)]
val file = "src/main/FlightData/2018.csv"
val parseLine: String => SpeedRow = str =>
castToSpeedRow(str.split(",").map(_.trim).toVector) //Used in FS2 implementation
val parseCSV =
ZPipeline.utf8Decode >>>
ZPipeline.splitLines >>>
ZPipeline.drop(1) >>> //Dropping the header
ZPipeline.map(parseLine(_))
val filterSpeedRows
: ZPipeline[Any, CharacterCodingException, SpeedRow, SpeedRow] =
ZPipeline.filter(p => p.airtime.nonEmpty && p.distance.nonEmpty)
def getStream(filename: String) = ZStream.fromFileName(filename)
val transformationPipe
: ZPipeline[Any, Throwable, SpeedRow, (Double, Double)] =
ZPipeline.map(sr => (sr.distance.get, sr.airtime.get))
val groupedStream =
getStream(file)
.via(parseCSV >>> filterSpeedRows)
.groupBy(sr =>
ZIO.succeed {
((sr.origin, sr.destination), sr)
}
) { (k, s) =>
ZStream.fromZIO {
s.via(transformationPipe)
.runFold(zeroTupple)((sum, value) =>
monoid.combine(sum, value)
)
//.map(values => k -> values)
}
}
override def run: ZIO[Any & (ZIOAppArgs & Scope), Any, Any] =
groupedStream.run(ZSink.collectAll)`
Реализация FS2 для сравнения (работает нормально):
`
val file = "src/main/FlightData/2018.csv"
def stringToRows(string: String): SpeedRow =
val result = castToSpeedRow(string.split(",").map(_.trim).toVector)
result
def getFileStream(path: String): Stream[IO, SpeedRow] =
val filePath = Path(path)
Files[IO].readUtf8Lines(filePath).drop(1).dropLast.map(stringToRows)
def processStream(stream: Stream[IO, SpeedRow]) =
val monoidDD = Monoid[(Double, Double)]
val filtered =
stream.filter(p => p.airtime != None && p.distance != None)
val grouped = filtered.fold(
Map.empty[(String, String), List[(Double, Double)]]
)((m, sr) =>
m + (
(sr.origin, sr.destination) -> ((
sr.distance.get,
sr.airtime.get
) :: m
.getOrElse((sr.origin, sr.destination), Nil))
)
)
val foldedResult = grouped
.map(m => m.mapValues(e => monoidDD.combineAll(e)))
.map(m => m.toMap)
foldedResult
override def run: IO[Unit] =
val streamOfData =
processStream(getFileStream(file)).compile.toVector
streamOfData.myDebug.void`