Как реализовать KStream-Ktable leftJoin, используя подход с несколькими ключами (соединение pk-fk и fk-fk) в потоках Kafka?

Я работаю над объединением потока и таблиц в kafka, используя подход левого соединения, поскольку я получаю значения для некоторых соединений, потому что в этом случае соединение реализовано как внешний ключ для первичного ключа, что является рабочим случаем.

      Below is the implemented code for join. (foreignKey-primaryKey) 

In this case id is primary key for PassengerTable

 .selectKey((id, stop) -> {
        if (stop.getPassenger() != null && stop.getPassenger().getId() != null) {
            return stop.getPassenger().getId();
        }
        return 0L;
    })
    .leftJoin(table, (stop, passenger) -> {
        topic.Value scA3 = null;
        if (passenger!= null) {
            scA3 = topic.Value
                .newBuilder()
                .setId(passenger.getId())
                .build();
        }

        stop.setPassenger(scA3);
        return stop;
    })

И для некоторых сценариев соединение не реализовано и получает нулевое значение для объекта, потому что это соединение реализовано как первичный ключ к внешнему ключу.

      Below is implemented code for pk-fk join.

In this case PassengerId is foreign-key of payment table and join is implementing between passenger table id (Pk) and paymentTable passengerId(fk).(join failed)

.selectKey((id, stop) -> {
            if (stop.getPayment() != null && stop.getPayment().getPassengerId() != null) {             
 return stop.getPayment().getPassengerId()
            }
            return 0L;
        })
        .leftJoin(paymentTable, (stop, payment) -> {
         logger.info("object  in left join payment : " + payment); // getting null value of payment obj because by default it is implenting join with id of payment table instead of passengerId this join is failed to implement.
            topic.Value paymentA3 = null;
            if (payment != null) {
                PaymentA3 = topic.Value
                    .newBuilder()
                    .getPassengerId(payment.getPassengerId())
                    .setId(payment.getId())
                    .build();
            }
            stop.setPayment(PaymentA3);
            return stop;
})

I have three scenarios:-
1) fk-pk join --> working
2) pk-fk join --> not working in my case
3) fk-fk join --> not working in my case

получение нулевых значений объекта во 2-м и 3-м сценариях, и соединение в этих случаях не реализуется.

Как я могу указать конкретный внешний ключ для реализации этих объединений (pk-fk и fk-fk)?Как связать ключ для любого соединения (leftKey или rightKey) для реализации соединений в kafka?

Итак, мой вопрос в том, как я могу реализовать соединение для этих сценариев (pk-fk и fk-fk)?

Если у кого-нибудь есть идеи об этом типе соединений, пожалуйста, дайте мне знать, как я могу продолжить реализацию этого соединения?

0 ответов

Другие вопросы по тегам