Сведение событий к временным интервалам
Сценарий: у меня есть служба, которая регистрирует события, как в этом примере CSV:
#TimeStamp, Name, ColorOfPullover
TimeStamp01, Peter, Green
TimeStamp02, Bob, Blue
TimeStamp03, Peter, Green
TimeStamp04, Peter, Red
TimeStamp05, Peter, Green
События, которые, например, Питер носит Грин, будут происходить очень часто подряд.
У меня две цели:
- Держите данные как можно меньше
- Храните все соответствующие данные
Соответствующее означает: мне нужно знать, в какое время человек носил какой цвет. Например:
#StartTime, EndTime, Name, ColorOfPullover
TimeStamp01, TimeStamp03, Peter, Green
TimeStamp02, TimeStamp02, Bob, Blue
TimeStamp03, TimeStamp03, Peter, Green
TimeStamp04, TimeStamp04, Peter, Red
TimeStamp05, TimeStamp05, Peter, Green
В этом формате я могу отвечать на такие вопросы, как: какого цвета был Питер во время TimeStamp02? (Я могу с уверенностью предположить, что каждый человек носит один и тот же цвет между двумя зарегистрированными событиями для одного и того же цвета.)
Основной вопрос: могу ли я использовать уже существующую технологию для этого? Т.е. я могу снабдить его непрерывным потоком событий, а он извлекает и хранит соответствующие данные?
Чтобы быть точным, мне нужно реализовать алгоритм, подобный этому (псевдокод). OnNewEvent
метод вызывается для каждой строки примера CSV. Где параметр event
уже содержит данные из строки в качестве переменных-членов.
def OnNewEvent(even)
entry = Database.getLatestEntryFor(event.personName)
if (entry.pulloverColor == event.pulloverColor)
entry.setIntervalEndDate(event.date)
Database.store(entry)
else
newEntry = new Entry
newEntry.setIntervalStartDate(event.date)
newEntry.setIntervalEndDate(event.date)
newEntry.setPulloverColor(event.pulloverColor))
newEntry.setName(event.personName)
Database.createNewEntry(newEntry)
end
end
2 ответа
This is typical scenario of any streaming architecture.
There are multiple existing technologies which work in tandem to get what you want.
1. NoSql Database (Hbase, Aerospike, Cassandra)
2. streaming jobs Like Spark streaming(micro batch), Storm
3. Run mapreduce in micro batch to insert into NoSql Database.
4. Kafka Distriuted queue
The end to end flow.
Data -> streaming framework -> NoSql Database.
OR
Data -> Kafka -> streaming framework -> NoSql Database.
IN NoSql database there are two ways to model your data.
1. Key by "Name" and for every event for that given key, insert into Database.
While fetching u get back all events corresponding to that key.
2. Key by "name", every time a event for key is there, do a UPSERT into a existing blob(Object saved as binary), Inside the blob you maintain the time range and color seen.
Code sample to read and write to Hbase and Aerospike
Hbase: http://bytepadding.com/hbase/
Aerospike: http://bytepadding.com/aerospike/
Один из способов сделать это - использовать HiveMQ. HiveMQ - это технология очереди сообщений, основанная на MQTT. Приятно то, что вы можете написать собственные плагины для обработки входящего сообщения. Чтобы получить последнюю запись о событии для человека, хеш-таблица в плагине HiveMQ будет работать нормально. Если количество разных людей очень велико, я бы подумал об использовании кеша типа Redis для кеширования последнего события для каждого человека.
Ваш сервис публикует события в HiveMQ. Плагин HiveMQ обрабатывает входящие события и обновляет вашу базу данных.