Получаемые в будущем наблюдаемые обратные вызовы Scala не вызывают

Используя scala 2.11.7, rxscala_2.11 0.25.0, rxjava 1.0.16, мой oddFutures обратные вызовы не вызывают AsyncDisjointedChunkMultiprocessing.process():

package jj.async

import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import rx.lang.scala.Observable
import jj.async.helpers._

/* Problem: How to multi-process records asynchronously in chunks.
   Processing steps: 
    - fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations)
    - process ea. chunk through a filter asynchronously (has 10-record input limit)
    - compute the reverse of the filtered result
    - enrich (also has 10-record input limit) filtered results asynchronously
    - return enriched filtered results once all records are processed
 */
object AsyncDisjointedChunkMultiprocessing {
  private implicit val ec = ExecutionContext.global

  def process(): List[Enriched] = {
    @volatile var oddsBuffer = Set[Int]()
    @volatile var enrichedFutures = Observable just Set[Enriched]()
    oddFutures.foreach(
      odds =>
        if (odds.size + oddsBuffer.size >= chunkSize) {
          val chunkReach = chunkSize - oddsBuffer.size
          val poors = oddsBuffer ++ odds take chunkReach
          enrichedFutures = enrichedFutures + poors
          oddsBuffer = odds drop chunkReach
        } else {
          oddsBuffer ++= odds
        },
      error => throw error,
      () => enrichedFutures + oddsBuffer)
    enrichedFutures.toBlocking.toList.flatten
  }

  private def oddFutures: Observable[Set[Int]] =
    Repository.query(chunkSize) { chunk =>
      evenFuture(chunk) map {
        filtered => chunk -- filtered
      }
    }

  private def evenFuture(chunk: Set[Int]): Future[Set[Int]] = {
    checkSizeLimit(chunk)
    Future { Remote even chunk }
  }
}

class Enriched(i: Int)

object Enriched {
  def apply(i: Int) = new Enriched(i)

  def enrich(poors: Set[Int]): Set[Enriched] = {
    checkSizeLimit(poors);
    Thread.sleep(1000)
    poors map { Enriched(_) }
  }
}

object Repository {
  def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Set[Int]] = {
    implicit val ec = ExecutionContext.global
    Observable.from {
      Thread.sleep(20)
      f(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
      Thread.sleep(20)
      f(Set(11, 12, 13, 14, 15, 16, 17, 18, 19, 20))
      Thread.sleep(15)
      f(Set(21, 22, 23, 24, 25))
    }
  }
}

package object helpers {
  val chunkSize = 10

  implicit class EnrichedObservable(enrichedObs: Observable[Set[Enriched]]) {
    def +(poors: Set[Int]): Observable[Set[Enriched]] = {
      enrichedObs merge Observable.just {
        Enriched.enrich(poors)
      }
    }
  }

  def checkSizeLimit(set: Set[_ <: Any]) =
    if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}")
}

// unmodifiable
object Remote {
  def even = { xs: Set[Int] =>
    Thread.sleep(1500)
    xs filter { _ % 2 == 0 }
  }
}

Что-то не так с тем, как я создаю свой Observable.from(Future) в Repository.query()?

1 ответ

Решение

Проблема в том, что я пытаюсь создать наблюдаемое из нескольких вариантов будущего, но Observable.from(Future) предусматривает только исключительное будущее (компилятор не жаловался, потому что я небрежно пропустил разделительные запятые, тем самым узурпируя не подозревающую перегрузку). Мой Sol'n:

object Repository {
  def query(f: Set[Int] => Future[Set[Int]])(fetchSize: Int = 10): Observable[Future[Set[Int]]] =
    // observable (as opposed to list) because modeling a process 
    // where the total result size is unknown beforehand. 
    // Also, not creating or applying because it blocks the futures
    (1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) =>
      obs + f(DataSource.fetch(i)())
    }
}

object DataSource {
  def fetch(begin: Int)(fetchSize: Int = 10) = {
    val end = begin + fetchSize
    Thread.sleep(200)
    (for {
      i <- begin until end
    } yield i).toSet
  }
}

где:

implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) {
  def +(future: Future[Set[Int]]) =
  obs merge (Observable just future)
}
Другие вопросы по тегам