Akka.Net: как создавать постоянные взгляды через читателя журнала
Согласно документации Akka.Net, PersistentView устарел, и вместо него следует использовать PersistenceQuery. В приложении ASP.Net Core 2.0 Web-API я использую Akka.Net с источником событий. Я использую плагин SQL Server для постоянства, с событиями и снимками. Для постоянных представлений я хочу начать использовать PersistenceQuery. Когда приложение запускается, события воспроизводятся для восстановления состояний актеров.
Я реализовал программу чтения журнала, которая получает события и использует ее для составления представления. Вопрос в том, как узнать, что произошло последнее воспроизведенное событие, чтобы составное представление могло быть сохранено (как своего рода снимок)? Я не хочу сохранять вид после каждого события на этапе восстановления.
Теперь программа чтения журнала запускается при инициализации ActorSystem (вызывается через Startup.cs). Код выглядит так:
private static void InitialiseJournalReader()
{
// Obtain read journal by plugin id.
var readJournal = PersistenceQuery.Get(ActorSystem).ReadJournalFor<SqlReadJournal>("akka.persistence.query.myjournal");
// Materialize stream, consuming events.
var materializer = ActorMaterializer.Create(ActorSystem);
var writer = ActorSystem.ActorOf(CreateViewsActor.GetActorProps(), CreateViewsActor.GetActorName());
// issue query to journal
Source<EventEnvelope, NotUsed> source = readJournal.CurrentEventsByTag("MyEvents");
source.RunForeach(envelope => writer.Ask(envelope.Event), materializer);
}
CreateViewsActor
является актером, который использует сообщения для создания одного или нескольких представлений. Также необходимо сохранить эти представления (в настоящее время в формате JSON в таблицу SQL Server).
К сожалению, до сих пор я не нашел работающего примера создания постоянных представлений через читателя журнала. Но, возможно, я искал не в тех местах. Пока у меня есть следующие вопросы:
- Есть ли рабочие примеры создания постоянных представлений через читателя журнала?
- Как CreateViewsActor (или любой код, ответственный за создание и сохранение представлений) может знать, что все сообщения восстановления были обработаны?
- Как лучше всего инициализировать читателей журнала?
1 ответ
Чтение журналов можно использовать для разных целей. В большинстве случаев для генерации специализированных представлений чтения из событий. Однако это не обязательно означает актеров - вы можете легко преобразовать представления в обновления в таблицах базы данных, чтобы получить материализованные представления.
В случае, если вы хотите объединить акторы с потоками, вы можете использовать Sink.ActorRef или Sink.ActorRefWithAck, в зависимости от того, хотите ли вы включить противодавление в своего актера или работать в режиме полного толчка. Пример:
using (var materializer = system.Materializer())
{
var readJournal = PersistenceQuery.Get(system)
.ReadJournalFor<SqlReadJournal>("akka.persistence.query.my-read-journal");
var writer = asysem.ActorOf(CreateViewsActor.Props(), CreateViewsActor.GetActorName());
readJournal
.CurrentEventsByTag("MyEvents")
.Collect(envelope => envelope.Event as MyEvent)
.RunWith(Sink.ActorRefWithAck<MyEvent>(writer,
onInitMessage: CreateViewsActor.Init.Instance,
ackMessage: CreateViewsActor.Ack.Instance,
onCompleteMessage: CreateViewsActor.Done.Instance), materializer);
}
Здесь сообщения инициатора и подтверждения отправляются от субъекта, чтобы сообщить потоку, когда начинать излучение или просто отправлять следующий элемент субъекту (когда он доступен). Последний параметр приемника (в сообщении о завершении) будет отправлен субъекту после завершения потока.
В случае сомнений вы всегда можете посмотреть официальные тесты Akka.NET (см. 1 и 2).
Что касается инициализации читателя журнала - это объект, привязанный по времени жизни к актерской системе, поэтому он может быть инициализирован и доступен из того же места, что и ваша актерская система.