ConcurrentModificationException в Flink
Я пытался получить данные из базы данных и вести подсчет. Если число равно 1, тогда обновление должно произойти, если его ноль, то вставьте. До этого, когда я получаю dbData.count(), я получаю исключение ConcurrentModificationException. Metric_id - это неграмотные значения, полученные из набора данных finaloutput.
DataSet<Tuple7<String, String, String, String, String, String, String>> method1=finaloutput.map(new Writents(fhir_resource));
method1.printToErr();
логическая часть класса Writents выглядит следующим образом:
String sourcequery="SELECT id,topic,from,to,metric_id from metrics where (topic='"+nt+"') AND (metric_id=('"+metric_id+"')) and (date(from)=(('"+StartDate+"')::date)) and (date(to) = (('"+EndDate+"')::date))";
dbData =env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(Utils.properties_fetch("drivername"))
.setDBUrl(Utils.properties_fetch("dbURL"))`
.setUsername(Utils.properties_fetch("username"))
.setPassword(Utils.properties_fetch("password"))
.setQuery(sourcequery)
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.DATE_TYPE_INFO,BasicTypeInfo.DATE_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO))
.finish());
return new Tuple7<String, String, String, String, String, String, String>(nt,metric_id.toString(),StartDate.toString(),EndDate.toString(),ExecutionDate.toString(),topic,(existance=(dbData.count()==0)?"NO DATA THERE":"DATA THERE"));
Когда я делаю отладку, я не получаю ошибки, но когда я запускаю программу, я получаю исключение ConcurrentModificationException, и задание не выполняется.
Исключение, которое я получаю, заключается в следующем:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:51)
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:954)
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:921)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:85)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
at com.cm.analytics.nts.Writents.map(Writents.java:177)
at com.cm.analytics.nts.Writents.map(Writents.java:1)
at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)