SparkStreaming - пустая широковещательная переменная при обновлении на определенный промежуток времени

Требование: обновлять широковещательную переменную каждую минуту из базы данных MySQL.

Процесс - для каждого нового rdd проверяется переменная широковещания. если его значение не равно нулю и интервал времени меньше минуты, то получение данных из широковещательной передачи в противном случае происходит их извлечение из базы данных MySQL и обновление широковещательной передачи.

Проблема - получение нулевой трансляции каждый раз, однако она должна возвращать ноль только в первый раз.

код:

      public Broadcast<Map<String, R>> updateAndGetCache(SparkContext sc,Constants dbConfig) throws Exception{
            Date currentDate = Calendar.getInstance().getTime();
            long diff = currentDate.getTime()-lastUpdatedAt.getTime();
            if (broadcastVal == null || diff > 60000) {  // every time flow hit this condition
                if (broadcastVal != null) broadcastVal.unpersist(); //don't touch this block
                lastUpdatedAt = new Date(System.currentTimeMillis());
               return getSparkContext(sc).broadcast(DBReaders.getCacheData(dbConfig));
           }
           return broadcastVal;
       }

    private static Function<JavaRDD<D>, JavaRDD<A>> actionModelBuilder = new Function<JavaRDD<D>, JavaRDD<A>>() {
            public JavaRDD<A> call(JavaRDD<D> v1) throws Exception {
                Broadcast<Map<String, R>> broadcastCacheData = BroadcastWrapper.getInstance()
                        .updateAndGetCache(v1.context(), getBroadcastConstants().value());
    //further processing
    }
    }

incommingData.transform(actionModelBuilder);

0 ответов

Другие вопросы по тегам