Конфигурационный файл Morphline, не индексирующий австро-данные
Я создаю индекс для моих AVR-данных в Solr. Индекс генерируется только для элементов данных, которые находятся на корневом уровне и не являются вложенными. Ниже приведен пример схемы (не включая все)
Моя схема Avro, как показано ниже.
{
"type" : "record",
"name" : "abcd",
"namespace" : "xyz",
"doc" : "Schema Definition for Low Fare Search Shopping Request/Response Data",
"fields" : [ {
"name" : "ShopID",
"type" : "string"
}, {
"name" : "RqSysTimestamp",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "RqTimestamp",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "RsSysTimestamp",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "RsTimestamp",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "Request",
"type" : {
"type" : "record",
"name" : "RequestStruct",
"fields" : [ {
"name" : "TransactionID",
"type" : [ "string", "null" ]
}, {
"name" : "AgentSine",
"type" : [ "string", "null" ]
}, {
"name" : "CabinPref",
"type" : [ {
"type" : "array",
"items" : {
"type" : "record",
"name" : "CabinStruct",
"fields" : [ {
"name" : "Cabin",
"type" : [ "string", "null" ]
}, {
"name" : "PrefLevel",
"type" : [ "string", "null" ]
} ]
}
}, "null" ]
}, {
"name" : "CountryCode",
"type" : [ "string", "null" ]
},
"name" : "PassengerStatus",
"type" : [ "string", "null" ]
}, {
}
Как мне ссылаться на "TransactionID" в моем файле конфигурации morphline. Я перепробовал все варианты, но он не генерирует индекс для вложенных элементов данных.
Ниже приведен пример моего файла конфигурации morphline.
extractAvroPaths {
flatten : true
paths : {
ShopID : /ShopID
RqSysTimestamp : /RqSysTimestamp
RqTimestamp : /RqTimestamp
RsSysTimestamp :/RsSysTimestamp
RsTimestamp : /RsTimestamp
TransactionID : "/Request/RequestStruct/TransactionID"
AgentSine : "/Request/RequestStruct/AgentSine"
Cabin :/Cabin
PrefLevel :/PrefLevel
CountryCode :/CountryCode
FrequentFlyerStatus :/FrequentFlyerStatus
1 ответ
Команда toAvro ожидает java.util.Map в качестве входных данных при преобразовании во вложенную запись Avro. Так что это мое решение.
morphlines: [
{
id: convertJsonToAvro
importCommands: [ "org.kitesdk.**" ]
commands: [
# read the JSON blob
{ readJson: {} }
# java code
{
java {
imports : """
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.kitesdk.morphline.base.Fields;
import java.io.IOException;
import java.util.Set;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
"""
code : """
String jsonStr = record.getFirstValue(Fields.ATTACHMENT_BODY).toString();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> map = null;
try {
map = (Map<String, Object>)mapper.readValue(jsonStr, Map.class);
} catch (IOException e) {
e.printStackTrace();
}
Set<String> keySet = map.keySet();
for (String o : keySet) {
record.put(o, map.get(o));
}
return child.process(record);
"""
}
}
# convert the extracted fields to an avro object
# described by the schema in this field
{ toAvro {
schemaFile: /etc/flume/conf/a1/like_user_event_realtime.avsc
} }
#{ logInfo { format : "loginfo: {}", args : ["@{}"] } }
# serialize the object as avro
{ writeAvroToByteArray: {
format: containerlessBinary
} }
]
}
]