Как объединить события из двух систем Event Sourcing
Мне нужно объединить события, поступающие из двух разных систем поиска событий, которые обрабатывает модуль Akka.Net Persistence. Слияние должно сортировать события на основе их метки времени, и я нашел оператор MergeSorted в Akka.Stream, который делает именно то, что мне нужно (пробовал с двумя списками чисел — для событий я написал собственный EventEnvelopComparer).
В моем решении у меня есть система акторов (readsystem1) для чтения из db1 и вторая система акторов (readysystem2) для чтения из db2, обе созданы для передачи правильной строки подключения в db (база данных PostGres).
Проблема в том, что когда я использую оператор MergeSorted, мне нужно передать экземпляр ActorMaterializer, и если материализатор актора создается в системе акторов readsystem1, то загружаются (и сливаются сами с собой) только события из db1; наоборот, если я создам материализатор актера в файле readsystem2. Мне нужно загрузить их обоих.
Вот пример кода (запись временных меток в файл, просто для проверки):
var actorMaterializer1 = ActorMaterializer.Create(
readSystem1,
ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal1 = PersistenceQuery.Get(readSystem1)
.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source1 = readJournal1.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source1
.Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
.RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps1.txt")), actorMaterializer1);
// just creating the materializer changes the events loaded by the source!!!
var actorMaterializer2 = ActorMaterializer.Create(
readSystem2,
ActorMaterializerSettings.Create(readSystem1).WithDebugLogging(true));
var readJournal2 = PersistenceQuery.Get(readSystem2)
.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var source2 = readJournal2.CurrentEventsByPersistenceId("mypersistenceId", 0L, long.MaxValue);
await source2
.Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
.RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps2.txt")), actorMaterializer2);
// RunWith receives actorMaterializer1, so only events coming from db1
// will be loaded and merged with themselves
var source = source1.MergeSorted(source2, new EventEnvelopComparer());
await source
.Select(x => ByteString.FromString($"{x.Timestamp.ToString()}{Environment.NewLine}"))
.RunWith(FileIO.ToFile(new FileInfo("c:\\tmp\\timestamps.txt")), actorMaterializer1);
Как я могу это сделать? Можно ли прочитать 2 разные таблицы источников событий из одной и той же системы акторов, в одной и той же или в разных БД? Есть ли что-то в ActorMaterializer, что может решить мою проблему? Мой подход совершенно неверен?
1 ответ
Чтобы использовать события из двух разных ActorSystems, я думаю, вам нужно использовать StreamRefs. Но здесь вы можете настроить два ReadJournalId, каждый из которых указывает на другой файл *.db. Таким образом, вы можете использовать одну ActorSystem и материализатор.
var source1 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-1")
.CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);
var source2 = PersistenceQuery.Get(actorSystem).ReadJournalFor<SqlReadJournal>("read-journal-2")
.CurrentEventsByPersistenceId("sample-id-1", 0L, long.MaxValue);
var source = source1.MergeSorted(source2, new EventEnvelopComparer())
.RunForeach(x => System.Console.WriteLine($"EVENT: {x.Timestamp}"), actorSystem.Materializer());