Манипулирование данными с помощью ZIO Stream — компилируется и запускается, но не завершается.

Я создал 3 разные версии мини-конвейера обработки данных. Один с Scala Views, один с FS2 и третий с ZIO Streams. Реализация View и FS2 запускается и завершается довольно быстро (FS2 намного быстрее). Однако моя реализация ZIO компилируется и запускается, но не завершается. Мне было интересно, есть ли у кого-нибудь идеи, почему он не завершается (данные составляют около 900 МБ).

Реализация ZIO (компилируется и запускается, но не завершается):

`

      val zeroTupple = (0.0, 0.0)
val monoid = Monoid[(Double, Double)]

val file = "src/main/FlightData/2018.csv"

val parseLine: String => SpeedRow = str =>
    castToSpeedRow(str.split(",").map(_.trim).toVector) //Used in FS2 implementation

val parseCSV =
    ZPipeline.utf8Decode >>>
        ZPipeline.splitLines >>>
        ZPipeline.drop(1) >>>   //Dropping the header
        ZPipeline.map(parseLine(_))

val filterSpeedRows
    : ZPipeline[Any, CharacterCodingException, SpeedRow, SpeedRow] =
    ZPipeline.filter(p => p.airtime.nonEmpty && p.distance.nonEmpty)

def getStream(filename: String) = ZStream.fromFileName(filename)

val transformationPipe
    : ZPipeline[Any, Throwable, SpeedRow, (Double, Double)] =
    ZPipeline.map(sr => (sr.distance.get, sr.airtime.get))

val groupedStream =
    getStream(file)
        .via(parseCSV >>> filterSpeedRows)
        .groupBy(sr =>
            ZIO.succeed {
                ((sr.origin, sr.destination), sr)
            }
        ) { (k, s) =>
            ZStream.fromZIO {
                s.via(transformationPipe)
                    .runFold(zeroTupple)((sum, value) =>
                        monoid.combine(sum, value)
                    )
                    //.map(values => k -> values)
            }
        }

override def run: ZIO[Any & (ZIOAppArgs & Scope), Any, Any] =
    groupedStream.run(ZSink.collectAll)`

Реализация FS2 для сравнения (работает нормально):

`

      val file = "src/main/FlightData/2018.csv"

def stringToRows(string: String): SpeedRow =
    val result = castToSpeedRow(string.split(",").map(_.trim).toVector)
    result

def getFileStream(path: String): Stream[IO, SpeedRow] =
    val filePath = Path(path)
    Files[IO].readUtf8Lines(filePath).drop(1).dropLast.map(stringToRows)

def processStream(stream: Stream[IO, SpeedRow]) =
    val monoidDD = Monoid[(Double, Double)]
    val filtered =
        stream.filter(p => p.airtime != None && p.distance != None)
    val grouped = filtered.fold(
      Map.empty[(String, String), List[(Double, Double)]]
    )((m, sr) =>
        m + (
          (sr.origin, sr.destination) -> ((
            sr.distance.get,
            sr.airtime.get
          ) :: m
              .getOrElse((sr.origin, sr.destination), Nil))
        )
    )
    val foldedResult = grouped
        .map(m => m.mapValues(e => monoidDD.combineAll(e)))
        .map(m => m.toMap)
    foldedResult

override def run: IO[Unit] =
    val streamOfData =
        processStream(getFileStream(file)).compile.toVector
    streamOfData.myDebug.void`

0 ответов

Другие вопросы по тегам