Потребитель PipelineDB от KAFKA JSON с массивом

Мой кафка отправляю следующий json

'{
"eventSummaryList": [
    {
        "customer": 1,
        "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}",
        "identifierRule": 1770,
        "identifierSummary": 17,
        "rule": "rota_fora",
        "status": 1,
        "vehicle": 103970
    },
    {
        "customer": 2,
        "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}",
        "identifierRule": 8,
        "identifierSummary": 7,
        "rule": "velocidade_maior",
        "status": 1,
        "vehicle": 103970
    }
]

}"

Я создал этот непрерывный трансфомр

CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS 

                 SELECT   cast ( cast(pack ->>'eventSummaryList' as json)->>'customer'  as bigint ) as customer
                 FROM   pipeline_kafka.sensor_event_process_stream  

ТОГДА ВЫПОЛНИТЬ процедуру update_sensor_event_process_t();

но мой журнал pipeDB возвращает это...

КОНТЕКСТ: данные JSON, строка 1: { COPY sensor_event_process_stream, строка 1, пакет столбцов: "{" LOG: [pipe_kafka] sensor_event_process_stream<- topicNotificationProcess (PID 25201): не удалось обработать пакет, отброшено 8 сообщений ОШИБКА: неверный синтаксис ввода для типа json DETAIL: строка ввода неожиданно завершилась.

Как пройти через массив json и получить только содержимое столбца customer?

1 ответ

Решение

Привет, дорогие, я решил свою проблему, я пользуюсь функцией json_array_elements, остался вот так...

  CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS 
SELECT Cast(value::json ->> 'identifierRule' AS               BIGINT) AS id_regra, 
       Cast(value::json ->> 'rule' AS                         VARCHAR) AS regra, 
       Cast((value::json ->'data')::json->> 'veiculo' AS      BIGINT) AS id_veiculo,
       Cast(value::json ->> 'customer' AS bigint) AS id_cliente , 
       cast((value::json ->'data')::json->> 'velocidade' AS   int)    AS velocidade, 
       cast((value::json ->'data')::json->> 'odometro' AS     int)    AS odometro, 
       cast((value::json ->'data')::json->> 'data_posicao' AS bigint) AS data_posicao, 
       cast((value::json ->'data')::json->> 'id_motorista' AS bigint) AS id_motorista, 
       cast((value::json ->'data')::json->> 'latitude' AS float)      AS latitude, 
       cast((value::json ->'data')::json->> 'longitude' AS float)     AS longitude, 
       cast(value::json ->> 'status' AS boolean)                      AS status 
FROM   ( 
              SELECT (json_array_elements(pack->'eventSummaryList'))::json AS value 
              FROM   pipeline_kafka.sensor_event_process_stream ) b
THEN EXECUTE procedure update_sensor_event_process_t();

Важная деталь: не пропускайте json с пробелами и переносами строк, конвейер не принимается.

Другие вопросы по тегам