KTable-Ktable Join вызывает исключение нулевого указателя
Я столкнулся с исключением нулевого указателя при объединении двух KTables.
final ValueJoiner<topicA, topicB, user> userJoiner = (topicA, topicB) -> {
LOG.debug(LOG_PREFIX + "Begin of join of topicA with topicB");
LOG.trace(LOG_PREFIX + "topicA: {}", topicA);
LOG.trace(LOG_PREFIX + "topicB: {}", topicB);
//Construct a new user object after joining
LOG.debug(LOG_PREFIX + "End of Join");
LOG.trace(LOG_PREFIX + "user : {}", user_tmp);
return user_tmp;
};
final KTable<String, user> userTable = topicA.join(topicB,
topicA::getPWDRIDOWNER,
userJoiner,
Named.as("topicA-join-with-topicB"),
Materialized.<String, user, KeyValueStore<Bytes, byte[]>>as("topicA-join-with-topicB").withKeySerde(Serdes.String()).withValueSerde(userSerde).withCachingDisabled());
final KStream<String, user> userOutputStream = userTable.toStream().selectKey((k,v) -> {
LOG.debug(LOG_PREFIX + "Selecting key for outgoing record");
LOG.trace(LOG_PREFIX + "Selecting key for outgoing record ");
LOG.trace(LOG_PREFIX + "user record : {} ", v);
return v.getFieldA()+"_"+v.getFieldB()+"_"+v.getFieldC();
});
Нулевой указатель появляется, когда запись присутствует в теме A, но не соответствует записи в теме B.
С журналами трассировки я вижу, что следующий журнал печатает нулевое значение.
LOG.trace(LOG_PREFIX + "user record : {} ", v);
Это может означать, что соединение не инициировано, но все же оно продолжается с выполнением топологии. Таким образом, после соединения он начинает выполнение узла selectKey и запускается в NPE.
Насколько я понимаю, если соединение не запускается, то оно должно ждать в топологии.
Пожалуйста, помогите кому-нибудь понять, что может пойти не так