Надежно обрабатывать сообщения в определенное время
Давайте предположим, что у меня есть приложение чата.
Клиент отправляет сообщение в чат, что приводит к некоторой команде для какого-то Актера. Теперь я хочу обработать то, что он написал, и сделать его доступным для других пользователей в этом чате, поэтому я обрабатываю эту команду. В то же время я хочу сказать себе (актеру), что мне нужно хранить это сообщение в базе данных истории чата, НО не прямо сейчас. Сохранение в базу данных должно происходить каждые 2 минуты. И если произошел сбой, я все равно смогу сохранить базу данных.
Я предполагаю, что рабочий процесс будет таким:
- Пользователь отправил сообщение
- Актер чата получил команду с этим сообщением
- Мы транслируем это сообщение всем и добавляем это сообщение в какую-то очередь, чтобы сохранить его в базе данных истории чата.
- Некоторые постоянные команды выполняются, когда истекло 2 минуты. Он собирает все входящие сообщения чата, которые еще не были сохранены в порядке их поступления
- Запустите транзакцию со всеми сообщениями, а затем удалите их из очереди.
- Если где-то после 3 произошел сбой и сообщения не были сохранены, то я должен попытаться сохранить их снова. Если бы они были настойчивы, я бы никогда больше не пытался их настаивать.
Как построить что-то подобное в Акке? Какие функции я должен использовать / какие шаблоны?
1 ответ
Вам может понадобиться два участника: один (координатор) будет отправлять уведомления о командах чата клиентам. Другой (троттлер) - будет отправлять данные в базу данных каждые 2 минуты. Ваша очередь будет просто внутренним состоянием дросселя:
class Coordinator extends Actor {
def receive = {
case command: Record =>
broadcast(command)
throttler ! command
}
}
class Throttler extends Actor {
import system.dispatcher
val queue = mutable.List[Record] //it may be a cache instead
def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html
def receive = {
case Start => schedule
case command: Record =>
queue ++= command
case Tick =>
schedule
try {
//---open transaction here---
for (r <- queue) push(r)
//---close transaction here---
queue.clear //will not be cleared in case of exception
} catch {...}
}
}
Вы также можете использовать реализацию на основе FSM, как сказал @abatyuk.
Если вам нужно уменьшить нагрузку на почтовые ящики - вы можете попробовать некоторые шаблоны противодавления / балансировки нагрузки, такие как Akka Work Pulling.
Если вы хотите защитить сам узел (чтобы восстановить состояние очереди в случае сбоя некоторых узлов вашего сервера) - вы можете использовать кластер Akka для репликации (вручную) состояния очереди. В этом случае координатором должен быть Cluster Singleton и он должен сам отправлять тики случайному субъекту (вы можете использовать для этого шину) и поддерживать свои успехи и неудачи в качестве супервизора. Обратите внимание, что состояние супервизора может быть утеряно, поэтому вы также должны реплицировать его через узлы (и объединять состояние между ними каждые 2 минуты, поэтому лучше использовать SortedSet
для очередей, потому что слияние будет что-то вроде sets.reduce(_ ++ _)
).
Такие хранилища, как Riak, уже предоставляют простой способ решения проблемы кластеризации, поэтому вы можете использовать их в качестве очередей (и координатор, и троттлер будут синглтонами без состояния). В случае Riak вы можете настроить его как "Доступно + разделение" (см. Теорему CAP), потому что слияние данных здесь не проблема - ваша история чата имеет тип данных CRDT(бесконфликтный).
Другое решение - это кэш с режимом WriteBehind (настроенным для запуска каждые 2 минуты) в качестве регулятора.
Источник событий может также защитить состояние вашего актера, но он более полезен, когда вам нужно повторить все действия после восстановления (вам это не нужно - он все повторно отправит в базу данных). Вы можете использовать моментальные снимки (это почти то же самое, что использовать кэш-память напрямую), но лучше сохранить их в кэш-памяти (с помощью SnapshotStore) вместо локальной FS, если вы заботитесь о доступности. Обратите внимание, что