Прием вложенных json-файлов Pinot из Kafka

Я пытаюсь перенести данные из темы Kafka в таблицу Pinot. Ниже приведены образцы данных и мои файлы конфигурации.

          Kafka Topics data : 

     {"Ip_type":"0","ip_tp":"2016-05-12 08:13:09.000190",
    "ts":"2016-05-12T08:13:16.152000",
    "Ptt":"00000001260293140836",
    “task”:{
    "DATETIME":"20260512081308",
    "Id1”:”111111111,
    "Id2":null,
    “Amt”: 0
    }}

    schema.json File:

   {
   "schemaName": “RK,

   "dimensionFieldSpecs": [
   {
   "name": "ip_type",
   "dataType": "STRING"
   },
   {
   "name": "ip_tp",
   "dataType": "STRING"
   },
  {
  "name": "Ptt",
  "dataType": "STRING"
  },

 {
 "dataType": "STRING",
 "name": “task_str"
 },
{
"dataType": "STRING",
"name": "DATETIME_str",
"singleValueField": false
},
{
"dataType": "STRING",
"name": "id1_str",
"singleValueField": false
},
{
"dataType": "STRING",
"name":"Id2_str",
"singleValueField": false
}
],

"metricFieldSpecs": [{
"name": "Amt",
"dataType": "FLOAT",
"defaultNullValue": 0
}],
"dateTimeFieldSpecs": [{
"name": "ts",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "15:MINUTES"
   }]
   }

Realtime-table-config.json.


{
"tableName": "Try",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "MILLISECONDS",
"schemaName": “RK”,
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "click_events_2",
"stream.kafka.decoder.class.name":        "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
  "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
  "stream.kafka.broker.list":"localhost:9092",
  "realtime.segment.flush.threshold.size": "0",
  "realtime.segment.flush.threshold.time": "24h",
 "realtime.segment.flush.desired.size": "50M",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
},
    "invertedIndexColumns": [],
    "noDictionaryColumns": [
        "task_str"
    ],
    "jsonIndexColumns": [
        "task_str"
    ]
},
"metadata": {
    "customConfigs": {}
},
"ingestionConfig": {
    "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY",
        "batchConfigMaps": [],
        "segmentNameSpec": {},
        "pushSpec": {}
    },
    "transformConfigs": [
        {
            "columnName": "task_str",
            "transformFunction": "jsonFormat(task)"
        },
        {
            "columnName": "DATETIME_str",
            "transformFunction": "jsonPathArray(task, '$.[*].DATETIME')"
        },
        {
            "columnName": "id1_str",
            "transformFunction": "jsonPathArray(task, '$.[*].id1')"
        },
         {
            "columnName": "id2_str",
            "transformFunction": "jsonPathArray(task, '$.[*].id2')"
        }
    ]
 }
}

    bin/pinot-admin.sh AddTable -schemaFile /..apache-pinot-incubating-0.7.1-bin/examples/stream/airlineStats/schema.json    -tableConfigFile /../apache-pinot-incubating- 0.7.1-bin/examples/stream/airlineStats/Realtime-table-config.json.json -exec

Но я не могу видеть данные в таблице Пино в пользовательском интерфейсе. Я могу видеть таблицу, но без данных. Пожалуйста, предложите, есть ли способ сделать это в работе с пино? Я следил за этим приемом вложенного json-кода Pinot и пытался это сделать, но у меня это не работает.

0 ответов

Другие вопросы по тегам