Как объединить несколько входов JDBC в конфигурации logstash на основе условия одного столбца?

У меня есть две таблицы в SQL Server, то есть AppDetails и AppBranchDetails. Я хочу прочитать все строки этих двух таблиц и объединить в зависимости от условия.

Ниже приведены два запроса, которые я хочу выполнить:

  1. выберите id как colg_id, name, sirname из порядка AppDetails по id
  2. выберите идентификатор в виде идентификатора филиала, имени филиала, филиала в порядке AppBranchDetails по идентификатору

Из приведенных выше двух запросов "id" является первичным ключом, который одинаков для обеих таблиц.

Вывод будет выглядеть как показано ниже для id == 1:

{
  "name": "ram",
  "sirname": "patil",
  "id": 1,
  "BRANCH": [
    {
      "id": 1,
      "branch_name": "EE",
      "branch_add": "IND"
    },
    {
      "id": 1,
      "branch_name": "ME",
      "branch_add": "IND"
    }
  ]
}

Вывод будет выглядеть следующим образом для id == 2:

{
  "name": "sham",
  "sirname": "bhosle",
  "id": 2,
  "BRANCH": [
    {
      "id": 2,
      "branch_name": "SE",
      "branch_add": "US"
    },
    {
      "id": 2,
      "branch_name": "FE",
      "branch_add": "US"
    }
  ]
}

Я пытаюсь с конфигурацией ниже (app.conf):

input {
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://x.x.x.x:1433;databaseName=AAA;"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_user => "sa"
    jdbc_password => "sa@111"
    statement => "select id as colg_id, name, sirname from AppDetails order by id"
    tracking_column => "colg_id"
    use_column_value => true
    type => "college"
  }
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://x.x.x.x:1433;databaseName=AAA;"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_user => "sa"
    jdbc_password => "sa@111"
    statement => "select id as branch_id, branch_name, branch_add from AppBranchDetails order by id"
    tracking_column => "branch_id"
    use_column_value => true
    type => "branch"
  }
}

filter {
    if [type] == "branch" {
        aggregate {
        task_id => "%{branch_id}"
        code => "
            map['BRANCH'] ||= []
            map['BRANCH'] << event.get('branch_id')
            map['BRANCH'] << event.get('branch_name')
            map['BRANCH'] << event.get('branch_add')
            event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 5
        }
        mutate {
            remove_field => [ "@version" , "@timestamp" ]
        }
    }
}

output {
  stdout { codec => json_lines }
}

Кто-нибудь может подсказать, пожалуйста, как я могу добиться результата, о котором я упоминал выше.

0 ответов

Другие вопросы по тегам