Конфигурационный файл Morphline, не индексирующий австро-данные

Я создаю индекс для моих AVR-данных в Solr. Индекс генерируется только для элементов данных, которые находятся на корневом уровне и не являются вложенными. Ниже приведен пример схемы (не включая все)

Моя схема Avro, как показано ниже.

{
  "type" : "record",
  "name" : "abcd",
  "namespace" : "xyz",
  "doc" : "Schema Definition for Low Fare Search Shopping Request/Response Data",
  "fields" : [ {
    "name" : "ShopID",
    "type" : "string"
  }, {
    "name" : "RqSysTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "RqTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "RsSysTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "RsTimestamp",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "Request",
    "type" : {
      "type" : "record",
      "name" : "RequestStruct",
      "fields" : [ {
        "name" : "TransactionID",
        "type" : [ "string", "null" ]
      }, {
        "name" : "AgentSine",
        "type" : [ "string", "null" ]
      }, {
        "name" : "CabinPref",
        "type" : [ {
          "type" : "array",
          "items" : {
            "type" : "record",
            "name" : "CabinStruct",
            "fields" : [ {
              "name" : "Cabin",
              "type" : [ "string", "null" ]
            }, {
              "name" : "PrefLevel",
              "type" : [ "string", "null" ]
            } ]
          }
        }, "null" ]
      }, {
        "name" : "CountryCode",
        "type" : [ "string", "null" ]
      }, 
        "name" : "PassengerStatus",
        "type" : [ "string", "null" ]
      }, {
}

Как мне ссылаться на "TransactionID" в моем файле конфигурации morphline. Я перепробовал все варианты, но он не генерирует индекс для вложенных элементов данных.

Ниже приведен пример моего файла конфигурации morphline.

extractAvroPaths {
          flatten : true
          paths : { 
        ShopID : /ShopID
                RqSysTimestamp : /RqSysTimestamp
                RqTimestamp : /RqTimestamp
                RsSysTimestamp :/RsSysTimestamp
                RsTimestamp : /RsTimestamp
                TransactionID : "/Request/RequestStruct/TransactionID"
                AgentSine : "/Request/RequestStruct/AgentSine"
                Cabin :/Cabin
                PrefLevel :/PrefLevel
                CountryCode :/CountryCode
                FrequentFlyerStatus :/FrequentFlyerStatus

1 ответ

Команда toAvro ожидает java.util.Map в качестве входных данных при преобразовании во вложенную запись Avro. Так что это мое решение.

morphlines: [
  {
    id: convertJsonToAvro
    importCommands: [ "org.kitesdk.**" ]
    commands: [
      # read the JSON blob
      { readJson: {} }
      
      # java code
      {
              java { 
                    imports : """
                      import com.fasterxml.jackson.databind.JsonNode;
                      import com.fasterxml.jackson.databind.ObjectMapper;
                      import org.kitesdk.morphline.base.Fields;
                      import java.io.IOException;
                      import java.util.Set;
                      import java.util.ArrayList;
                      import java.util.Iterator;
                      import java.util.List;
                      import java.util.Map;
                    """

                    code : """
                      String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
                      ObjectMapper mapper = new ObjectMapper();
                      Map<String, Object> map = null;
                      try {
                          map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                      Set<String> keySet = map.keySet();
                      for (String o : keySet) {
                          record.put(o, map.get(o));
                      }
                      return child.process(record);                   
                    """

              }
      }               
      
      # convert the extracted fields to an avro object
      # described by the schema in this field
      { toAvro {
        schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
      } }
      
      #{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
  
      # serialize the object as avro
      { writeAvroToByteArray: {
        format: containerlessBinary
      } }
  
    ]
  }
]

Другие вопросы по тегам