ConcurrentModificationException в Flink

Я пытался получить данные из базы данных и вести подсчет. Если число равно 1, тогда обновление должно произойти, если его ноль, то вставьте. До этого, когда я получаю dbData.count(), я получаю исключение ConcurrentModificationException. Metric_id - это неграмотные значения, полученные из набора данных finaloutput.

DataSet<Tuple7<String, String, String, String, String, String, String>> method1=finaloutput.map(new Writents(fhir_resource)); method1.printToErr();

логическая часть класса Writents выглядит следующим образом:

String sourcequery="SELECT id,topic,from,to,metric_id from metrics where (topic='"+nt+"') AND (metric_id=('"+metric_id+"')) and (date(from)=(('"+StartDate+"')::date)) and (date(to) = (('"+EndDate+"')::date))";

    dbData =env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                                           .setDrivername(Utils.properties_fetch("drivername"))
                                           .setDBUrl(Utils.properties_fetch("dbURL"))`
                                           .setUsername(Utils.properties_fetch("username"))
                                           .setPassword(Utils.properties_fetch("password"))
                                           .setQuery(sourcequery)                                                                           
                                           .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.DATE_TYPE_INFO,BasicTypeInfo.DATE_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO))
                                           .finish());  
    return new Tuple7<String, String, String, String, String, String, String>(nt,metric_id.toString(),StartDate.toString(),EndDate.toString(),ExecutionDate.toString(),topic,(existance=(dbData.count()==0)?"NO DATA THERE":"DATA THERE"));

Когда я делаю отладку, я не получаю ошибки, но когда я запускаю программу, я получаю исключение ConcurrentModificationException, и задание не выполняется.

Исключение, которое я получаю, заключается в следующем:

java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) at java.util.ArrayList$Itr.next(ArrayList.java:859) at org.apache.flink.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:51) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:954) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:921) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:85) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815) at org.apache.flink.api.java.DataSet.count(DataSet.java:398) at com.cm.analytics.nts.Writents.map(Writents.java:177) at com.cm.analytics.nts.Writents.map(Writents.java:1) at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748)

0 ответов

Другие вопросы по тегам