Почему `collect` заставляет наблюдателя не получать уведомлений?
Я использую rxscala и обнаружил очень тонкую проблему, и мой код упрощается следующим образом:
import rx.lang.scala.Observable
import rx.lang.scala.subjects.PublishSubject
object SubtleBug extends App {
case class Projects(projects: List[Project] = Nil)
case class Project(name: String, docs: List[Doc] = Nil)
case class Doc(path: String, baseContent: String)
sealed trait ServerEvent
case class ProjectNames(projects: Seq[String]) extends ServerEvent
case class NewDocument(projectName: String, path: String, version: Int, content: String) extends ServerEvent
val receivedEvents = PublishSubject[ServerEvent]
new Thread(new Runnable {
override def run(): Unit = {
val events = Seq(
new ProjectNames(Seq("p1", "p2")),
NewDocument("p1", "/aaa", 1, "my-content"),
NewDocument("p1", "/bbb", 1, "my-content")
)
events.foreach { event =>
receivedEvents.onNext(event)
Thread.sleep(200)
}
}
}).start()
lazy val projects: Observable[Option[Projects]] = receivedEvents.scan(Option.empty[Projects]) {
case (_, ProjectNames(names)) => {
Some(Projects(names.map(name => Project(name)).toList))
}
case (Some(ps), NewDocument(projectName, docPath, version, content)) => {
val doc = Doc(docPath, content)
val newPs = ps.copy(projects = ps.projects.map {
case project if project.name == projectName => project.copy(docs = project.docs ::: List(doc))
case p => p
})
Some(newPs)
}
case _ => None
}.collect({
case Some(p) => Some(p)
})
projects.foreach(ps => {
println("### 111: " + ps.map(_.projects))
projects.foreach(x => println("### 222: " + ps.map(_.projects))) // !!!(2)
})
Thread.sleep(2000)
}
Ключевым моментом является !!!(2)
линия, которая находится внутри projects
сам.
Он печатает следующее:
### 111: Some(List(Project(p1,List()), Project(p2,List())))
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List())))
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content), Doc(/bbb,my-content))), Project(p2,List())))
Проблема там нет ### 222:
линия!
Но если я изменю collect
часть, и добавление None
дело:
.collect({
case Some(p) => Some(p)
case None => None // added case
})
Это напечатает ### 222:
линии, как я ожидал:
### 111: None
### 111: Some(List(Project(p1,List()), Project(p2,List())))
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List())))
### 222: None
### 222: None
### 222: Some(List(Project(p1,List()), Project(p2,List())))
### 222: Some(List(Project(p1,List()), Project(p2,List())))
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content), Doc(/bbb,my-content))), Project(p2,List())))
### 222: None
### 222: Some(List(Project(p1,List()), Project(p2,List())))
### 222: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List())))
### 222: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List())))
Я не могу понять почему.
PS: Вы можете клонировать код здесь: https://github.com/freewind/rxscala-test/blob/master/src/main/scala/myrx/SubtleBug.scala
1 ответ
В ваших кодах есть условие гонки. PublishSubject
будет удалять любые события, прежде чем подписаться на него. Так что если receivedEvents.onNext(event)
работает до projects.foreach
событие будет отброшено. И в scan
Если первое событие отброшено, сопоставление с образцом больше не будет работать.
Ты можешь использовать ReplaySubject
починить это.