Замените объект json другим в потоковом режиме kafka

Я работаю над потоками Кафки. Я сталкиваюсь со следующими проблемами:

Подробности о том, что я сделал до сих пор:

Я создал ниже темы, поток и таблицы:

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bptcus

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic address-elasticsearch-sink

Созданные таблицы и поток для вышеупомянутых созданных тем.

CREATE table CUSTOMER_SRC  (customerId VARCHAR,name VARCHAR, age VARCHAR, address VARCHAR) WITH (KAFKA_TOPIC='bptcus', VALUE_FORMAT='JSON', KEY='customerId');

CREATE stream ADDRESS_SRC (addressId VARCHAR, city VARCHAR, state VARCHAR) WITH (KAFKA_TOPIC='address-elasticsearch-sink', VALUE_FORMAT='JSON');

Я могу видеть данные, как показано ниже:

select * from customer_src;  
1528743137610 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] 

select * from address_src;  
1528743413826 | Parent-1528743137047 | 1 | Detroit | MI

Создал другой поток, соединив таблицу и созданный выше поток.

CREATE stream CUST_ADDR_SRC as select c.name , c.age , c.address, a.rowkey, a.addressId , a.city , a.state  from  ADDRESS_SRC a left join CUSTOMER_SRC c  on c.rowkey=a.rowkey;

Я могу видеть данные в потоке CUST_ADDR_SRC, как показано ниже:

select * from cust_addr_src;  

1528743413826 | Parent-1528743137047 | Ron | 31 | [{"addressId":"1","city":"Fremont","state":"CA"},{"addressId":"2","city":"Dallas","state":"TX"}] | Parent-1528743137047 | 1 | Detroit | MI  

Мои вопросы:

  1. Теперь я хочу заменить addressId 1(Fremont) на addressId 1 (Детройт). Как я могу это сделать?
  2. Я также попытался распечатать поток ввода на консоль, как указано в билете

Распечатать Кафка Стрим Вводить на консоль?

Вот мой код:

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cusadd-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.61.125:9092");
    config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.1.61.125:2181");
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> source = builder.stream("cust_addr_src");
    source.foreach(new ForeachAction<String, String>() {
        public void apply(String key, String value) {
            System.out.println("Stream key values are: " + key + ": " + value);
        }
     });

Я не вижу выход.

Только я могу видеть следующий вывод:

12: 04: 42.145 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Сброс смещения для раздела cust_addr_src-0 до последнего смещения. 12:04:42.145 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Инициирование соединения с узлом 0 в hsharma-mbp15.local:9092. 12:04:42.145 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Добавлен датчик с именем node-0.bytes-sent 12:04:42.145 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - добавлен датчик с именем node-0.bytes-receive 12: 04: 42.145 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - добавлен датчик с именем node-0.latency 12:04:42.145 [StreamThread-1] DEBUG org.apache.kafka.clients.NetworkClient - Завершено подключение к узлу 0 12:04:42.145 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Извлеченное смещение 0 для раздела cust_addr_src-0 12:04:42.676 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Добавлен датчик с именем topic.cust_addr_src.bytes-fetched 12:04:42.680 [StreamThread-1] DEBUG org.apache.kafka.common.metrics.Metrics - добавлен датчик с именем topic.cust_addr_src.records-fetched 12:04:45.150 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Получено успешное hea Ответ rtbeat для группы cusadd-application.

Заранее спасибо.

0 ответов

Я вижу два подхода:

  1. Манипуляции со строкой: столбец адреса в настоящее время является строкой, содержащей объект JSON. Вы можете просто функции манипулирования строками, чтобы заменить нужные биты. Хотя это кажется хакерским.
  2. Манипуляция структурой: переключите оператор CREATE TABLE так, чтобы адрес был ARRAY<STRUCT<addressId STRING, city STRING, state>>тип, а не строка. Затем вы можете использовать элементы массива и поля структуры для создания вывода, например
ARRAY[
  STRUCT(
    addressId := address[0]->addressId,
    city := address_src->city,
    state := address[0]->state
  ),
  ... same for second element
]

Вышеупомянутое создаст массив, содержащий две структуры, с новым набором города.

Конечно, это работает, только если в массиве всегда два элемента. Если есть переменное количество, вам нужно будет использовать длинный оконный оператор CASE, чтобы делать разные вещи в зависимости от размера массива. например

CASE 
   WHEN ARRAY_LENGTH(address) = 1 
    THEN ARRAY[STRUCT(addressId := address[0]->addressId, city := address_src->city, state := address[0]->state)]
   WHEN ARRAY_LENGTH(address) = 2
     THEN ARRAY(... with two elements...)
   WHEN ARRAY_LENGTH(address) = 3
     THEN ARRAY(... with three elements...)
END

И т.п.

Другие вопросы по тегам