fluent-bit не может анализировать журналы Kubernetes

Я хотел бы пересылать журналы Kubernetes из fluent-bit в elasticsearch через fluentd, но fluent-bit не может правильно анализировать журналы Kubernetes. Для установки Fluent-bit и Fluentd я использую диаграммы Helm. Я пробовал как stable / fluentbit, так и fluent / fluentbit и столкнулся с той же проблемой:

#0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch [error type]: mapper_parsing_exception [reason]: 'Could not dynamically add mapping for field [app.kubernetes.io/component]. Existing mapping for [kubernetes.labels.app] must be of type object but found [text].'"

Я поместил следующие строки в файл значений fluent-bit, как показано здесь

  remapMetadataKeysFilter:
    enabled: true
    match: kube.*

    ## List of the respective patterns and replacements for metadata keys replacements
    ## Pattern must satisfy the Lua spec (see https://www.lua.org/pil/20.2.html)
    ## Replacement is a plain symbol to replace with
    replaceMap:
      - pattern: "[/.]"
        replacement: "_"

... ничего не изменилось, перечислены те же ошибки.

Есть ли способ избавиться от этой ошибки?

мои values.yaml здесь:

    # Default values for fluent-bit.

# kind -- DaemonSet or Deployment
kind: DaemonSet

# replicaCount -- Only applicable if kind=Deployment
replicaCount: 1

image:
  repository: fluent/fluent-bit
  pullPolicy: Always
  # tag:

imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""

serviceAccount:
  create: true
  annotations: {}
  name:

rbac:
  create: true

podSecurityPolicy:
  create: false

podSecurityContext:
  {}
  # fsGroup: 2000

securityContext:
  {}
  # capabilities:
  #   drop:
  #   - ALL
  # readOnlyRootFilesystem: true
  # runAsNonRoot: true
  # runAsUser: 1000

service:
  type: ClusterIP
  port: 2020
  annotations:
    prometheus.io/path: "/api/v1/metrics/prometheus"
    prometheus.io/port: "2020"
    prometheus.io/scrape: "true"

serviceMonitor:
  enabled: true
  namespace: monitoring
  interval: 10s
  scrapeTimeout: 10s
  # selector:
  #  prometheus: my-prometheus

resources:
  {}
  # limits:
  #   cpu: 100m
  #   memory: 128Mi
  # requests:
  #   cpu: 100m
  #   memory: 128Mi

nodeSelector: {}

tolerations: []

affinity: {}

podAnnotations: {}

priorityClassName: ""

env: []

envFrom: []

extraPorts: []
#   - port: 5170
#     containerPort: 5170
#     protocol: TCP
#     name: tcp

extraVolumes: []

extraVolumeMounts: []

## https://docs.fluentbit.io/manual/administration/configuring-fluent-bit
config:
  ## https://docs.fluentbit.io/manual/service
  service: |
    [SERVICE]
        Flush 1
        Daemon Off
        Log_Level info
        Parsers_File parsers.conf
        Parsers_File custom_parsers.conf
        HTTP_Server On
        HTTP_Listen 0.0.0.0
        HTTP_Port 2020

  ## https://docs.fluentbit.io/manual/pipeline/inputs
  inputs: |
    [INPUT]
        Name tail
        Path /var/log/containers/*.log
        Parser docker
        Tag kube.*
        Mem_Buf_Limit 5MB
        Skip_Long_Lines On

    [INPUT]
        Name systemd
        Tag host.*
        Systemd_Filter _SYSTEMD_UNIT=kubelet.service
        Read_From_Tail On

  ## https://docs.fluentbit.io/manual/pipeline/filters
  filters: |
    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Merge_Log           On
        Merge_Log_Key       log_processed
        K8S-Logging.Parser  On
        K8S-Logging.Exclude Off

    [FILTER]
        Name    lua
        Match   kube.*
        script  /fluent-bit/etc/functions.lua
        call    dedot
        
  ## https://docs.fluentbit.io/manual/pipeline/outputs
  outputs: |
    [OUTPUT]
        Name          forward
        Match         *
        Host          fluentd-in-forward.elastic-system.svc.cluster.local
        Port          24224
        tls           off
        tls.verify    off

  ## https://docs.fluentbit.io/manual/pipeline/parsers
  customParsers: |
    [PARSER]
        Name docker_no_time
        Format json
        Time_Keep Off
        Time_Key time
        Time_Format %Y-%m-%dT%H:%M:%S.%L

3 ответа

У меня была та же проблема, вызванная несколькими метками, которые преобразуются в json. Я переименовал конфликтующие ключи, чтобы они соответствовали новому формату рекомендуемых меток:

      <filter **>
  @type rename_key
  rename_rule1 ^app$ app.kubernetes.io/name
  rename_rule2 ^chart$ helm.sh/chart
  rename_rule3 ^version$ app.kubernetes.io/version
  rename_rule4 ^component$ app.kubernetes.io/component
  rename_rule5 ^istio$ istio.io/name
</filter>

Я думаю, что ваша проблема не в kubernetes, не в диаграмме fluentbit / fluentd, ваша проблема в elasticsearch, конкретно в отображении.

В elsticsearch версии 7.x вы не можете использовать разные типы (строка, int и т. Д.) Для одного и того же поля.

Для решения этой проблемы я использую "ignore_malformed": true в шаблоне индекса, который я использую для журналов kubernetes.

https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html

Неправильно сформированное поле не индексируется, но другие поля в документе обрабатываются нормально.

Я столкнулся с той же проблемой, я использовал >

Fluent-operator для развертывания fluent-bit (v2.0.11) opster/оператор opensearch для развертывания opensearch(v2.3.0).

Я использовал сценарий Lua ниже -->

         function dedot(tag, timestamp, record)
    if record["kubernetes"] == nil then
        return 0, 0, 0
    end
    if (record["kubernetes"]["labels"]["app"] ~= nil and type(record["kubernetes"]["labels"]["app"]) ~= "table") then
      record["kubernetes"]["labels"]["app"] = {appName = record["kubernetes"]["labels"]["app"]}
    end

    dedot_keys(record["kubernetes"]["annotations"])
    dedot_keys(record["kubernetes"]["labels"])
    
    return 1, timestamp, record
end

function dedot_keys(map)
    if map == nil then
        return
    end
    local new_map = {}
    local changed_keys = {}
    for k, v in pairs(map) do
        local deslashed = string.gsub(k, "%/", "_")
        local dedotted = string.gsub(deslashed, "%.", "_")
        if dedotted ~= k then
            new_map[dedotted] = v
            changed_keys[k] = true
        end
    end
    for k in pairs(changed_keys) do
        map[k] = nil
    end
    for k, v in pairs(new_map) do
        map[k] = v
    end
end

Если вы присмотритесь, я добавил фрагмент перед удалением клавиш, который помещает приложение в качестве объекта.

         if (record["kubernetes"]["labels"]["app"] ~= nil and type(record["kubernetes"] 
   ["labels"]["app"]) ~= "table") then
      record["kubernetes"]["labels"]["app"] = {appName = record["kubernetes"] 
     ["labels"]["app"]}
   end

Я создал фильтр Lua с помощью приведенного выше сценария Lua.

А также я изменил конфигурацию вывода opensearch, чтобы иметь следующие настройки -->

      writeOperation: upsert
generateID: true
replaceDots: true

Вот и все, ошибка исчезла, и я вижу, что все мои журналы успешно доставлены в opensearch.

Другие вопросы по тегам