org.apache.spark.SparkException: не удалось получить broadcast_25_piece0 broadcast_25 при попытке чтения широковещательной переменной, обновленной через регулярный интервал

Я использую приложение для потокового воспроизведения, которое обрабатывает события в реальном времени, полученные от kafka.

Я использую org.apache.spark.streaming.util.RecurringTimer, который запускает код, скажем, каждые 10 секунд, который отвечает за создание объекта Java, скажем CollegeInfo, и его трансляцию.

ниже приведен код, который запускается каждые 10 секунд и транслирует объект CollegeInfo

class JavaObjectBroadCastTimerFun extends AbstractFunction1<Object, BoxedUnit> implements Serializable
        {

            Broadcast<CollegeInfo> broadCastVariable = null;

            public BoxedUnit apply(Object s)
            {
                CollegeInfo collInfo = new CollegeInfo();
                collInfo.setUniversityName("university");
                collInfo.setCollegeName("college");
                collInfo.setCollegeCode("1");

                broadCastVariable = sc.broadcast(collInfo);

                return null;
            }
        };

Существует код, который запускается на узле-исполнителе, когда он получает событие и получает доступ к широковещательной переменной.

JavaObjectBroadCastTimerFun javaObjectBroadCastTimerFun = new JavaObjectBroadCastTimerFun();

        if (javaObjectBroadCastTimerFun.broadCastVariable != null
            && javaObjectBroadCastTimerFun.broadCastVariable.value() != null)
        {
            CollegeInfo coll = javaObjectBroadCastTimerFun.broadCastVariable.value();
            System.out.println(coll.toString());
        }

Ожидается, что я смогу получать транслируемую информацию о колледже каждый раз, когда читаю значение широковещательной переменной. Но иногда я получаю исключение ниже Spark

rg.apache.spark.SparkException: Failed to get broadcast_25_piece0 of broadcast_25
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1198)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:330)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:330)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:928)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:928)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1883)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1883)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:68)
at org.apache.spark.scheduler.Task.run(Task.scala:90)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Я использую спарк 1.5.1. Я пытался использовать, чтобы установить spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory но это не сработало. Пожалуйста, скажите мне причину этого исключения, и мой код правильный для широковещательных переменных через регулярные промежутки времени. Ранний ответ высоко ценится

заранее спасибо

0 ответов

Другие вопросы по тегам