org.apache.spark.SparkException: не удалось получить broadcast_25_piece0 broadcast_25 при попытке чтения широковещательной переменной, обновленной через регулярный интервал
Я использую приложение для потокового воспроизведения, которое обрабатывает события в реальном времени, полученные от kafka.
Я использую org.apache.spark.streaming.util.RecurringTimer, который запускает код, скажем, каждые 10 секунд, который отвечает за создание объекта Java, скажем CollegeInfo, и его трансляцию.
ниже приведен код, который запускается каждые 10 секунд и транслирует объект CollegeInfo
class JavaObjectBroadCastTimerFun extends AbstractFunction1<Object, BoxedUnit> implements Serializable
{
Broadcast<CollegeInfo> broadCastVariable = null;
public BoxedUnit apply(Object s)
{
CollegeInfo collInfo = new CollegeInfo();
collInfo.setUniversityName("university");
collInfo.setCollegeName("college");
collInfo.setCollegeCode("1");
broadCastVariable = sc.broadcast(collInfo);
return null;
}
};
Существует код, который запускается на узле-исполнителе, когда он получает событие и получает доступ к широковещательной переменной.
JavaObjectBroadCastTimerFun javaObjectBroadCastTimerFun = new JavaObjectBroadCastTimerFun();
if (javaObjectBroadCastTimerFun.broadCastVariable != null
&& javaObjectBroadCastTimerFun.broadCastVariable.value() != null)
{
CollegeInfo coll = javaObjectBroadCastTimerFun.broadCastVariable.value();
System.out.println(coll.toString());
}
Ожидается, что я смогу получать транслируемую информацию о колледже каждый раз, когда читаю значение широковещательной переменной. Но иногда я получаю исключение ниже Spark
rg.apache.spark.SparkException: Failed to get broadcast_25_piece0 of broadcast_25
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1198)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:330)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:330)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:928)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:928)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1883)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1883)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:68)
at org.apache.spark.scheduler.Task.run(Task.scala:90)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Я использую спарк 1.5.1. Я пытался использовать, чтобы установить spark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory
но это не сработало. Пожалуйста, скажите мне причину этого исключения, и мой код правильный для широковещательных переменных через регулярные промежутки времени. Ранний ответ высоко ценится
заранее спасибо