Как мне обработать входящее событие и, основываясь на поле в событии, записать в разные потоки, используя wso2?
Я пытаюсь использовать один поток, обрабатывать входящий формат JSON и записывать в разные потоки на основе атрибута в событии. Например, если входной поток состоит из чего-то вроде этого:
{ "event_type" : "temperature",
"json" : {
"type": "Temperature",
"DeviceID":"xyz",
"temperature": "32",
"timestamp" : "2019-03-19T12:37:43.356119Z"
}
}
Другое событие выглядит так:
{ "event_type" : "location",
"json" : {
"type": "GPS",
"DeviceID":"xyz",
"location": {"coordinates": [-73.856077, 40.848447]},
"timestamp" : "2019-09-22T00:00:00+05:30"
}
}
Оба события отправляются на одну конечную точку http (это ограничение, с которым я сталкиваюсь)
Как я могу использовать один исходный поток http, обрабатывать эти события и если event_type
является temperature
вставить в temperature_collection
в монго дБ и если event_type
является location
вставить в location_collection в Монго БД?
Возможно ли это сделать с помощью одного потока?
Если нет, как я могу избежать записи нескольких конечных точек, по одной для каждого типа событий?
1 ответ
Да, можно определить только один поток и направить каждый поток с помощью фильтров Сиддхи,
@source(type='http' , receiver.url='http://localhost:8000/SensorStream',
@map(type='json', fail.on.missing.attribute='false',
@attributes(eventType='$.event_type', type='$.json.type',deviceID='$.json.DeviceID', temperature='$.json.temperature', location='$.json.location', timestamp='$.json.timestamp' ) ) )
define stream SensorStream(eventType string, type string, deviceID string, temperature string, location string, timestamp string);
from SensorStream[eventType=='temperature']
select deviceID, temperature, timestamp
insert into TemperatureStream;
from SensorStream[eventType=='location']
select deviceID, location, timestamp
insert into LocationStream;
Как видно из приведенного выше примера, свойство карты источника 'fail.on.missing.attribute' используется, чтобы гарантировать, что различные форматы могут быть отображены в один поток вместе с настраиваемым отображением. После того, как события достигли конечной точки, поток делится на основе значения атрибута с использованием фильтров.