Как сделать размер на основе удушения с помощью Акка ФСМ?

У меня есть сценарий использования, который я должен обработать запрос, используя Акка FSM, как только количество запросов достигает указанного значения.

sealed trait State
case object Idle extends State
case object Active extends State

sealed trait Data
case object Uninitialized extends Data
case object QuickStart extends Data
case class A(a: Int) extends Data

class RequestHandlers extends FSM[State, Data] {
  val queue = mutable.Queue[A]()
  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(_, Uninitialized) =>
      println("At Idle")
      //      self ! QuickStart
      goto(Active) using QuickStart
  }

  when(Active) {
    case Event(_, request: A) =>
      println("At Active")
      queue.take(2).map{x => println("request---  " + x.a  + "processing")
      queue.dequeue()

}

      Thread.sleep(2000L)
      goto(Active) using Uninitialized
  }


  whenUnhandled {
    case Event(update: A, QuickStart) =>
      queue += update
      if(queue.size >= 2) {
        println(s"At unhandled + ${update}" + "--" + queue)
        goto(Active) using update
      }
      else {
        println("size has not reached")
        goto(Active) using Uninitialized
      }
    case Event(update: A, Uninitialized) =>
      queue += update
      println(s"At unhandled - Uninitialised + $update")
      goto(Active) using QuickStart
  }

  initialize()

}

object demo extends App  {

  val actorSystem = ActorSystem("system")
  val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))

  val list = (1 to 10).toList
  list.foreach { abc =>

    actor ! Uninitialized
    actor ! A(abc)
    println("Sent")
  }

}

Я пытался использовать изменяемую очередь, где я добавляю свой запрос. После того, как размер очереди достигает определенного значения, т.е. 2 обрабатывают эти запросы одновременно. После обработки я снимаю с него. Если я отправлю 10 запросов, он обработает 8 запросов, но за последние 2 он никогда не перейдет в активное состояние. Я не понимаю, где я делаю ошибку при переходе.

Любая помощь будет оценена!

1 ответ

Решение

Я думаю, что минимальный пример того, что вы делаете, выглядит так:

// The only type of incoming message
case class Msg(a: Int)

// States
sealed trait State
case object Waiting extends State
case object Active extends State

// StateData is shared between states
case class StateData(queue: immutable.Queue[Msg])
object StateData {
  val empty = StateData(immutable.Queue.empty)

  def single(msg: Msg) = StateData(immutable.Queue(msg))
}


class RequestHandlers extends FSM[State, StateData] {
  val startTime = System.currentTimeMillis()

  def curTime = {
    val time = (System.currentTimeMillis() - startTime) / 1000f
    f"[$time%3.2f]"
  }

  startWith(Waiting, StateData.empty)

  onTransition {
    case Waiting -> Active =>
      //use nextStateData rather than stateData !
      nextStateData match {
        case StateData(queue) =>
          queue.foreach(x => println(s"$curTime processing ${x.a} "))
          Thread.sleep(2000L)
      }
  }

  when(Active) {
    case Event(msg: Msg, _) =>
      println(s"$curTime at Active $msg")
      // we've just processed old data
      // drop the old queue and create a new one with the new message
      goto(Waiting) using StateData.single(msg)
  }
  when(Waiting) {
    case Event(msg: Msg, StateData(oldQueue)) =>
      // add an event to the queue and check if it is time to process
      val newQueue = oldQueue :+ msg
      println(s"$curTime at Idle $msg, newQueue = $newQueue")
      if (newQueue.size == 2) {
        goto(Active) using StateData(newQueue)
      }
      else {
        stay using StateData(newQueue)
      }
  }

  initialize()
}

и тестовая программа

object demo extends App  {

    val actorSystem = ActorSystem("system")
    val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))

    (1 to 10).toList.foreach { i =>
      println(s"Send $i")
      actor ! Msg(i)
    }

}

Логика RequestHandlers является то, что он накапливает поступающие запросы в очереди, хранящейся в StateData объект (который имеет только один тип, который является общим для обоих состояний). Есть два состояния Waiting а также Active, Обработка на самом деле происходит на переходе Waiting -> Active, Вероятно, самым сложным моментом является не забывать, что когда FSM находится в Active состояние, новые сообщения будут поступать и должны быть обработаны путем добавления в очередь (или, скорее, начиная новую очередь с данными из этого сообщения).

PS Ну, этот пример, вероятно, не настолько минимален. На самом деле вы можете иметь только одно состояние и выполнять обработку внутри if (newQueue.size == 2) но это было бы довольно странно.

Другие вопросы по тегам