Мойка для построчного файла IO с противодавлением

У меня есть задание по обработке файлов, которое в настоящее время использует акторы akka с управляемым вручную противодавлением для обработки конвейера обработки, но мне никогда не удавалось успешно справиться с противодавлением на этапе чтения входного файла.

Это задание принимает входной файл и группирует строки по идентификационному номеру, присутствующему в начале каждой строки, а затем, как только он попадает в строку с новым идентификационным номером, он отправляет сгруппированные строки актору обработки через сообщение, а затем продолжает с новый идентификационный номер, вплоть до конца файла.

Похоже, это было бы хорошим вариантом использования Akka Streams, использующего Файл в качестве приемника, но я все еще не уверен в трех вещах:

1) Как я могу прочитать файл построчно?

2) Как сгруппировать по идентификатору, присутствующему в каждой строке? В настоящее время я использую очень императивную обработку для этого, и я не думаю, что у меня будет такая же способность в потоковом конвейере.

3) Как я могу применить противодавление, чтобы я не держал строки в памяти быстрее, чем могу обработать данные в нисходящем направлении?

2 ответа

Решение

Акка ручьи groupBy это один подход. Но у группы есть maxSubstreams параметр, который потребовал бы, чтобы вы знали, что максимальное число идентификаторов находится в начале. Итак: решение ниже использует scan идентифицировать блоки с одинаковыми идентификаторами и splitWhen разделить на подпотоки:

object Main extends App {
  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()

  def extractId(s: String) = {
    val a = s.split(",")
    a(0) -> a(1)
  }

  val file = new File("/tmp/example.csv")

  private val lineByLineSource = FileIO.fromFile(file)
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
    .map(_.utf8String)

  val future: Future[Done] = lineByLineSource
    .map(extractId)
    .scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) )
    .drop(1)
    .splitWhen(_._1)
    .fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) ))
    .concatSubstreams
    .runForeach(println)

  private val reply = Await.result(future, 10 seconds)
  println(s"Received $reply")
  Await.ready(system.terminate(), 10 seconds)
}

extractId разбивает строки на id -> кортежи данных. scan prepends id -> кортежи данных с флагом начала ID диапазона. drop бросает элемент грунтовки в scan, splitwhen запускает новый подпоток для каждого начала диапазона. fold объединяет подпотоки в списки и удаляет логическое значение диапазона начала идентификатора, так что каждый подпоток создает один элемент. Вместо сгиба вы, вероятно, хотите кастом SubFlow который обрабатывает потоки строк для одного идентификатора и генерирует некоторый результат для диапазона идентификаторов. concatSubstreams объединяет подпотоки для диапазона идентификаторов, создаваемые splitWhen, обратно в один поток, который печатается runForEach,

Бежать с:

$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof

Выход:

(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))

Похоже, что самый простой способ добавить "обратное давление" в вашу систему без внесения огромных изменений - это просто изменить тип почтового ящика групп ввода, потребляющих Actor, на BoundedMailbox.

  1. Измените тип актера, который потребляет ваши линии на BoundedMailbox с высоким mailbox-push-timeout-time:

    bounded-mailbox {
      mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox"
      mailbox-capacity = 1
      mailbox-push-timeout-time = 1h
    }
    
    val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox"))
    
  2. Создайте итератор из вашего файла, создайте сгруппированный (по идентификатору) итератор из этого итератора. Затем просто перебирайте данные, отправляя группы потребителю Actor. Обратите внимание, что отправка будет блокироваться в этом случае, когда почтовый ящик Актера заполнится.

    def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = {
      def rec(s: Stream[A]): Stream[Seq[A]] =
        if (s.isEmpty) Stream.empty else {
          s.span(keyFun(s.head) == keyFun(_)) match {
          case (prefix, suffix) => prefix.toList #:: rec(suffix)
        }
      }
      rec(iter.toStream).toIterator
    }
    
    val lines = Source.fromFile("input.file").getLines()
    
    iterGroupBy(lines){l => l.headOption}.foreach {
      lines:Seq[String] =>
          actor.tell(lines, ActorRef.noSender)
    }
    

Это оно! Возможно, вы захотите переместить материал для чтения файлов в отдельный поток, так как он собирается блокироваться. Также, регулируя mailbox-capacity Вы можете регулировать количество потребляемой памяти. Но если чтение пакета из файла всегда быстрее, чем обработка, кажется разумным сохранить небольшую емкость, например 1 или 2.

обн iterGroupBy реализовано с Streamпроверено не производить Stackru,

Другие вопросы по тегам