Siddhi QL: внешнее соединение таблицы In-Memory с входным потоком
Я хочу рассчитать% протоколов, появляющихся в сетевом трафике непрерывно, чтобы эти% постоянно обновлялись новыми событиями. Круговая диаграмма создается и обновляется с процентами. Поскольку мне нужны как новые, так и предыдущие данные для расчета, я решил использовать таблицу в памяти, чтобы хранить события в течение более длительного времени (скажем, дня).
Поскольку таблицы событий можно использовать только при объединении с потоками событий, я выбрал внешнее объединение, чтобы получить также старые значения. Меня интересуют только протоколы и их проценты, мне нужно всего два столбца, но я не могу применить статистическую функцию во внешнем соединении. Запрос, который я создал до сих пор:
@Import('MAINInStream:1.0.0')
define stream MAINInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string, service string, duration double, orig_bytes long, resp_bytes long, conn_state string, local_orig bool, local_resp bool, missed_bytes long, history string, orig_pkts long, orig_ip_bytes long, resp_pkts long, resp_ip_bytes long, tunnel_parents string, sensorname string);
@Export('ProtocolStream:1.0.0')
define stream ProtocolStream (protocol string, count int);
define table mem_conn_table (timestamp long, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string);
from MAINInStream
select time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp, id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
insert into intermediateStream;
from MAINInStream
select time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp, id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
group by id_resp_p
insert into mem_conn_table;
from intermediateStream#window.externalTimeBatch(timestamp,1min, timestamp, 1min) as i right outer join mem_conn_table[time:dateDiff(time:currentTimestamp(),cast(timestamp,"string"), "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm:ss") == 0] as mc
on i.timestamp == mc.timestamp
SELECT (ifThenElse(mc.id_resp_p == 21,'FTP', ifThenElse(mc.id_resp_p == 22,'SSH', ifThenElse(mc.id_resp_p == 25,'SMTP', ifThenElse(mc.id_resp_p == 445,'SMB','MYSQL'))))) as protocol , cast(count(mc.id_resp_p),'int') as count
insert into ProtocolStream;
Я запускаю окно с одной внешней минутой и затем получаю протоколы и их количество, но это не дает мне никакого вывода.
Какие-либо предложения?
1 ответ
Вы не можете использовать внешние объединения с таблицами в памяти. Если вам нужно, вы можете отправлять события из таблицы в памяти в промежуточный поток и использовать его для объединения ( руководство). Тем не менее, для вашего сценария вы можете использовать externalTime
окно, вместо того, чтобы идти с таблицами событий. Попробуйте что-то похожее на ниже;
@Import('MAINInStream:1.0.0')
define stream MAINInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string, service string, duration double, orig_bytes long, resp_bytes long, conn_state string, local_orig bool, local_resp bool, missed_bytes long, history string, orig_pkts long, orig_ip_bytes long, resp_pkts long, resp_ip_bytes long, tunnel_parents string, sensorname string);
@Export('ProtocolStream:1.0.0')
define stream ProtocolStream (protocol string, count long);
@Export('PercentageStream:1.0.0')
define stream PercentageStream (protocol string, count long, percentage double);
from MAINInStream
select
time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp,
(ifThenElse(mc.id_resp_p == 21,'FTP', ifThenElse(mc.id_resp_p == 22,'SSH', ifThenElse(mc.id_resp_p == 25,'SMTP', ifThenElse(mc.id_resp_p == 445,'SMB','MYSQL'))))) as protocol
id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
insert into intermediateStream;
from intermediateStream#window.externalTime(timestamp, 1 day)
select timestamp, count() as totalCount
insert into totalCountStream;
from intermediateStream#window.externalTime(timestamp, 1 day)
select timestamp, protocol, count() as count
group by protocol
insert into perProtocolCountStream;
from perProtocolCountStream
select protocol, count
insert into ProtocolStream;
from totalCountStream#window.time(1 min) as tcs join perProtocolCountStream#window.time(1 min) as pcs
select pcs.protocol, pcs.count as count, ((pcs.count/tcs.totalCount)) * 100 as percentage
on tcs.timestamp == pcs.timestamp
insert into PercentageStream;