KTable-Ktable Join вызывает исключение нулевого указателя

Я столкнулся с исключением нулевого указателя при объединении двух KTables.

      
final ValueJoiner<topicA, topicB, user> userJoiner = (topicA, topicB) -> {
            LOG.debug(LOG_PREFIX + "Begin of join of topicA with topicB");
            LOG.trace(LOG_PREFIX + "topicA: {}", topicA);
            LOG.trace(LOG_PREFIX + "topicB: {}", topicB);
            //Construct a new user object after joining
            LOG.debug(LOG_PREFIX + "End of Join");
            LOG.trace(LOG_PREFIX + "user : {}", user_tmp);
            return user_tmp;
        };

        final KTable<String, user> userTable = topicA.join(topicB,
                topicA::getPWDRIDOWNER,
                userJoiner,
                Named.as("topicA-join-with-topicB"),
                Materialized.<String, user, KeyValueStore<Bytes, byte[]>>as("topicA-join-with-topicB").withKeySerde(Serdes.String()).withValueSerde(userSerde).withCachingDisabled());

        final KStream<String, user> userOutputStream = userTable.toStream().selectKey((k,v) -> {
            LOG.debug(LOG_PREFIX + "Selecting key for outgoing record");
            LOG.trace(LOG_PREFIX + "Selecting key for outgoing record ");
            LOG.trace(LOG_PREFIX + "user record : {} ", v);
            return v.getFieldA()+"_"+v.getFieldB()+"_"+v.getFieldC();
        });

Нулевой указатель появляется, когда запись присутствует в теме A, но не соответствует записи в теме B.

С журналами трассировки я вижу, что следующий журнал печатает нулевое значение.

      LOG.trace(LOG_PREFIX + "user record : {} ", v);

Это может означать, что соединение не инициировано, но все же оно продолжается с выполнением топологии. Таким образом, после соединения он начинает выполнение узла selectKey и запускается в NPE.

Насколько я понимаю, если соединение не запускается, то оно должно ждать в топологии.

Пожалуйста, помогите кому-нибудь понять, что может пойти не так

0 ответов

Другие вопросы по тегам