Как объединить несколько входов JDBC в конфигурации logstash на основе условия одного столбца?
У меня есть две таблицы в SQL Server, то есть AppDetails и AppBranchDetails. Я хочу прочитать все строки этих двух таблиц и объединить в зависимости от условия.
Ниже приведены два запроса, которые я хочу выполнить:
- выберите id как colg_id, name, sirname из порядка AppDetails по id
- выберите идентификатор в виде идентификатора филиала, имени филиала, филиала в порядке AppBranchDetails по идентификатору
Из приведенных выше двух запросов "id" является первичным ключом, который одинаков для обеих таблиц.
Вывод будет выглядеть как показано ниже для id == 1:
{
"name": "ram",
"sirname": "patil",
"id": 1,
"BRANCH": [
{
"id": 1,
"branch_name": "EE",
"branch_add": "IND"
},
{
"id": 1,
"branch_name": "ME",
"branch_add": "IND"
}
]
}
Вывод будет выглядеть следующим образом для id == 2:
{
"name": "sham",
"sirname": "bhosle",
"id": 2,
"BRANCH": [
{
"id": 2,
"branch_name": "SE",
"branch_add": "US"
},
{
"id": 2,
"branch_name": "FE",
"branch_add": "US"
}
]
}
Я пытаюсь с конфигурацией ниже (app.conf):
input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://x.x.x.x:1433;databaseName=AAA;"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_user => "sa"
jdbc_password => "sa@111"
statement => "select id as colg_id, name, sirname from AppDetails order by id"
tracking_column => "colg_id"
use_column_value => true
type => "college"
}
jdbc {
jdbc_connection_string => "jdbc:sqlserver://x.x.x.x:1433;databaseName=AAA;"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_user => "sa"
jdbc_password => "sa@111"
statement => "select id as branch_id, branch_name, branch_add from AppBranchDetails order by id"
tracking_column => "branch_id"
use_column_value => true
type => "branch"
}
}
filter {
if [type] == "branch" {
aggregate {
task_id => "%{branch_id}"
code => "
map['BRANCH'] ||= []
map['BRANCH'] << event.get('branch_id')
map['BRANCH'] << event.get('branch_name')
map['BRANCH'] << event.get('branch_add')
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
mutate {
remove_field => [ "@version" , "@timestamp" ]
}
}
}
output {
stdout { codec => json_lines }
}
Кто-нибудь может подсказать, пожалуйста, как я могу добиться результата, о котором я упоминал выше.