SparkStreaming - пустая широковещательная переменная при обновлении на определенный промежуток времени
Требование: обновлять широковещательную переменную каждую минуту из базы данных MySQL.
Процесс - для каждого нового rdd проверяется переменная широковещания. если его значение не равно нулю и интервал времени меньше минуты, то получение данных из широковещательной передачи в противном случае происходит их извлечение из базы данных MySQL и обновление широковещательной передачи.
Проблема - получение нулевой трансляции каждый раз, однако она должна возвращать ноль только в первый раз.
код:
public Broadcast<Map<String, R>> updateAndGetCache(SparkContext sc,Constants dbConfig) throws Exception{
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (broadcastVal == null || diff > 60000) { // every time flow hit this condition
if (broadcastVal != null) broadcastVal.unpersist(); //don't touch this block
lastUpdatedAt = new Date(System.currentTimeMillis());
return getSparkContext(sc).broadcast(DBReaders.getCacheData(dbConfig));
}
return broadcastVal;
}
private static Function<JavaRDD<D>, JavaRDD<A>> actionModelBuilder = new Function<JavaRDD<D>, JavaRDD<A>>() {
public JavaRDD<A> call(JavaRDD<D> v1) throws Exception {
Broadcast<Map<String, R>> broadcastCacheData = BroadcastWrapper.getInstance()
.updateAndGetCache(v1.context(), getBroadcastConstants().value());
//further processing
}
}
incommingData.transform(actionModelBuilder);