PairDStreamFunctions.mapWithState завершается ошибкой, если задано время ожидания java.util.NoSuchElementException: None.get
Привет, я использую mapwithstate api с функциональностью тайм-аута
я использую пример, расположенный здесь по этому https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
но некоторые изменения сделаны: 1. Класс org.apache.spark.api.java.Optional недоступен в 1.6, поэтому я использую библиотеку гуавы для Факультативного 2. Я использовал функцию тайм-аута
ниже часть кода:
JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
**// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000) ));**
когда я запускал вышеупомянутый код, я получал ниже упомянутого исключения
16/02/25 11:41:33 ERROR Executor: Exception in task 0.0 in stage 157.0 (TID 22)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/02/25 11:41:33 WARN TaskSetManager: Lost task 0.0 in stage 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/02/25 11:41:33 ERROR TaskSetManager: Task 0 in stage 157.0 failed 1 times; aborting job
16/02/25 11:41:33 ERROR JobScheduler: Error running job streaming job 1456380693000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2 ответа
Извините, я непосредственно использовал функцию тайм-аута, функция тайм-аута требует некоторых изменений в функции отображения, которая передается в mapWithState ниже - это функция отображения, которую нам нужно использовать
Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>> mappingFunc=
new Function4<Time, String, Optional<Integer>, State<Integer>, Optional<Tuple2<String, Integer>>>() {
@Override
public Optional<Tuple2<String, Integer>> call(Time arg0,
String word, Optional<Integer> one,
State<Integer> state) throws Exception {
// TODO Auto-generated method stub
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
if(state.isTimingOut())
{
return Optional.of(output);
}
else
{
state.update(sum);
}
return Optional.of(output);
}
};
Я думаю, что ваша проблема должна быть решена, см. https://github.com/apache/spark/pull/11081
Поэтому вы можете попробовать версию, содержащую это исправление, которую вы можете получить путем клонирования и построения текущей версии spark в Branch-1.6 - https://github.com/apache/spark/tree/branch-1.6