Как глобально упорядочить несколько упорядоченных наблюдаемых в Monix

Предположим, у меня есть несколько итераторов, которые упорядочены. Если бы я хотел объединить эти итераторы при глобальном их упорядочении (например, [(1,3,4), (2,4,5)] -> [1,2,3,4,4,5]) используя monix, как бы я это сделал?

3 ответа

Не использует Monix, но я не уверен, что это актуально

import scala.collection.BufferedIterator


def merge[A:Ordering](xs: Seq[Iterator[A]]) = 
  new Iterator[A] {
    val its = xs.map(_.buffered)
    def hasNext = its.exists(_.hasNext)
    def next = its.filter{ _.hasNext}
                  .minBy(_.head)
                  .next
  }


val ys = merge(Seq(List(1,3,5).toIterator, List(2,4,6).toIterator, List(10,11).toIterator))

ys.toList  //> res0: List[Int] = List(1, 2, 3, 4, 5, 6, 10, 11)

Поскольку наблюдаемое - это поток элементов, его можно разделить на два типа:

  • Конечные потоки
  • Бесконечные потоки

Обратите внимание, что для правильной сортировки вам понадобятся все элементы. Итак, нет простого способа сделать это.

Для конечных потоков вам придется собрать все элементы, а затем отсортировать их. Вы можете превратить это обратно в наблюдаемое с помощьюObservable.fromIterable.

val items = List((1,3,4), (2,4,5))

val sortedList = Observable
  .fromIterable(items)
  .flatMap(item => Observable.fromIterable(List(item._1, item._2, item._3)))
  .toListL // Flatten to an Observable[Int]
  .map(_.sorted) 

Для бесконечных потоков единственное, что вы можете сделать, - это буферизовать элементы до определенного времени или размера. Я не вижу другого пути, потому что вы не знаете, когда закончится поток.

Например,

val itemsStream: Observable[(Int, Int, Int)]= ???

itemsStream
  .bufferIntrospective(10)
  .flatMap((itemList: List[(Int, Int, Int)]) => // You'll have to sort this
     ???
  )

Немного поздно, но мне нужны были наблюдаемые monix для сортировки слиянием, и я не смог найти решение, вот как я его решил.

Идея состоит в том, чтобы использоватьbufferWhileна двоихheadсписков, чтобы вы могли держать под рукой фактические значения и проверять, какое из них меньше

        private def mergeSortInternal(aHead: Observable[Int], bHead: Observable[Int], aTail: Observable[Int], bTail: Observable[Int]): Observable[Int] = {
    (aHead.isEmpty ++ bHead.isEmpty).bufferWhile(_ => true).map {
      isEmptyPair: Seq[Boolean] =>
        val Seq(aIsEmpty, bIsEmpty) = isEmptyPair
        if (aIsEmpty) {
          bHead ++ bTail
        }
        else if (bIsEmpty) {
          aHead ++ aTail
        }
        else {
          {
            (aHead ++ bHead).bufferWhile(_ => true).map((heads: Seq[Int]) => {
              val Seq(aHeadValue, bHeadValue) = heads
              if (aHeadValue < bHeadValue) {
                Observable(aHeadValue) ++ mergeSortInternal(aTail.head, bHead, aTail.tail, bTail)
              }
              else {
                Observable(bHeadValue) ++ mergeSortInternal(aHead, bTail.head, aTail, bTail.tail)
              }
            }).flatten
          }

        }
    }.flatten
  }
  
  private def mergeSort(a: Observable[Int], b: Observable[Int]) = {
    mergeSortInternal(a.head, b.head, a.tail, b.tail)
  }

Другие вопросы по тегам