Прием вложенных json-файлов Pinot из Kafka
Я пытаюсь перенести данные из темы Kafka в таблицу Pinot. Ниже приведены образцы данных и мои файлы конфигурации.
Kafka Topics data :
{"Ip_type":"0","ip_tp":"2016-05-12 08:13:09.000190",
"ts":"2016-05-12T08:13:16.152000",
"Ptt":"00000001260293140836",
“task”:{
"DATETIME":"20260512081308",
"Id1”:”111111111,
"Id2":null,
“Amt”: 0
}}
schema.json File:
{
"schemaName": “RK,
"dimensionFieldSpecs": [
{
"name": "ip_type",
"dataType": "STRING"
},
{
"name": "ip_tp",
"dataType": "STRING"
},
{
"name": "Ptt",
"dataType": "STRING"
},
{
"dataType": "STRING",
"name": “task_str"
},
{
"dataType": "STRING",
"name": "DATETIME_str",
"singleValueField": false
},
{
"dataType": "STRING",
"name": "id1_str",
"singleValueField": false
},
{
"dataType": "STRING",
"name":"Id2_str",
"singleValueField": false
}
],
"metricFieldSpecs": [{
"name": "Amt",
"dataType": "FLOAT",
"defaultNullValue": 0
}],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "15:MINUTES"
}]
}
Realtime-table-config.json.
{
"tableName": "Try",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "MILLISECONDS",
"schemaName": “RK”,
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "click_events_2",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list":"localhost:9092",
"realtime.segment.flush.threshold.size": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.desired.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
},
"invertedIndexColumns": [],
"noDictionaryColumns": [
"task_str"
],
"jsonIndexColumns": [
"task_str"
]
},
"metadata": {
"customConfigs": {}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY",
"batchConfigMaps": [],
"segmentNameSpec": {},
"pushSpec": {}
},
"transformConfigs": [
{
"columnName": "task_str",
"transformFunction": "jsonFormat(task)"
},
{
"columnName": "DATETIME_str",
"transformFunction": "jsonPathArray(task, '$.[*].DATETIME')"
},
{
"columnName": "id1_str",
"transformFunction": "jsonPathArray(task, '$.[*].id1')"
},
{
"columnName": "id2_str",
"transformFunction": "jsonPathArray(task, '$.[*].id2')"
}
]
}
}
bin/pinot-admin.sh AddTable -schemaFile /..apache-pinot-incubating-0.7.1-bin/examples/stream/airlineStats/schema.json -tableConfigFile /../apache-pinot-incubating- 0.7.1-bin/examples/stream/airlineStats/Realtime-table-config.json.json -exec
Но я не могу видеть данные в таблице Пино в пользовательском интерфейсе. Я могу видеть таблицу, но без данных. Пожалуйста, предложите, есть ли способ сделать это в работе с пино? Я следил за этим приемом вложенного json-кода Pinot и пытался это сделать, но у меня это не работает.