Надежно обрабатывать сообщения в определенное время

Давайте предположим, что у меня есть приложение чата.

Клиент отправляет сообщение в чат, что приводит к некоторой команде для какого-то Актера. Теперь я хочу обработать то, что он написал, и сделать его доступным для других пользователей в этом чате, поэтому я обрабатываю эту команду. В то же время я хочу сказать себе (актеру), что мне нужно хранить это сообщение в базе данных истории чата, НО не прямо сейчас. Сохранение в базу данных должно происходить каждые 2 минуты. И если произошел сбой, я все равно смогу сохранить базу данных.

Я предполагаю, что рабочий процесс будет таким:

  1. Пользователь отправил сообщение
  2. Актер чата получил команду с этим сообщением
  3. Мы транслируем это сообщение всем и добавляем это сообщение в какую-то очередь, чтобы сохранить его в базе данных истории чата.
  4. Некоторые постоянные команды выполняются, когда истекло 2 минуты. Он собирает все входящие сообщения чата, которые еще не были сохранены в порядке их поступления
  5. Запустите транзакцию со всеми сообщениями, а затем удалите их из очереди.
  6. Если где-то после 3 произошел сбой и сообщения не были сохранены, то я должен попытаться сохранить их снова. Если бы они были настойчивы, я бы никогда больше не пытался их настаивать.

Как построить что-то подобное в Акке? Какие функции я должен использовать / какие шаблоны?

1 ответ

Решение

Вам может понадобиться два участника: один (координатор) будет отправлять уведомления о командах чата клиентам. Другой (троттлер) - будет отправлять данные в базу данных каждые 2 минуты. Ваша очередь будет просто внутренним состоянием дросселя:

class Coordinator extends Actor {
   def receive = {
     case command: Record => 
          broadcast(command)
          throttler ! command
   }
}


class Throttler extends Actor {

  import system.dispatcher

  val queue = mutable.List[Record] //it may be a cache instead

  def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html


  def receive = {
       case Start => schedule
       case command: Record =>
           queue ++= command
       case Tick => 
          schedule
          try {
            //---open transaction here---
            for (r <- queue) push(r)
            //---close transaction here---
            queue.clear //will not be cleared in case of exception
          } catch {...}
  }
}

Вы также можете использовать реализацию на основе FSM, как сказал @abatyuk.

Если вам нужно уменьшить нагрузку на почтовые ящики - вы можете попробовать некоторые шаблоны противодавления / балансировки нагрузки, такие как Akka Work Pulling.

Если вы хотите защитить сам узел (чтобы восстановить состояние очереди в случае сбоя некоторых узлов вашего сервера) - вы можете использовать кластер Akka для репликации (вручную) состояния очереди. В этом случае координатором должен быть Cluster Singleton и он должен сам отправлять тики случайному субъекту (вы можете использовать для этого шину) и поддерживать свои успехи и неудачи в качестве супервизора. Обратите внимание, что состояние супервизора может быть утеряно, поэтому вы также должны реплицировать его через узлы (и объединять состояние между ними каждые 2 минуты, поэтому лучше использовать SortedSet для очередей, потому что слияние будет что-то вроде sets.reduce(_ ++ _)).

Такие хранилища, как Riak, уже предоставляют простой способ решения проблемы кластеризации, поэтому вы можете использовать их в качестве очередей (и координатор, и троттлер будут синглтонами без состояния). В случае Riak вы можете настроить его как "Доступно + разделение" (см. Теорему CAP), потому что слияние данных здесь не проблема - ваша история чата имеет тип данных CRDT(бесконфликтный).

Другое решение - это кэш с режимом WriteBehind (настроенным для запуска каждые 2 минуты) в качестве регулятора.

Источник событий может также защитить состояние вашего актера, но он более полезен, когда вам нужно повторить все действия после восстановления (вам это не нужно - он все повторно отправит в базу данных). Вы можете использовать моментальные снимки (это почти то же самое, что использовать кэш-память напрямую), но лучше сохранить их в кэш-памяти (с помощью SnapshotStore) вместо локальной FS, если вы заботитесь о доступности. Обратите внимание, что

Другие вопросы по тегам