Кассандра с Spark, логирование ошибок

Есть ли способ узнать / распечатать запись, вызвавшую исключение? и поймать это исключение и пропустить его?

Теперь это вызвало ошибку, и я не могу увидеть полную запись, даже если в СДР много записей (что нормально, я бы не знал, какая из них была неправильной).

Единственная информация, которую я получаю, это такие вещи, как:

Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.NumberFormatException: For input string: "aaa"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:273)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at com.datastax.spark.connector.types.TypeConverter$IntConverter$$anonfun$convertPF$6.applyOrElse(TypeConverter.scala:179)
    at scala.PartialFunction$AndThen.applyOrElse(PartialFunction.scala:189)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:42)
    at com.datastax.spark.connector.types.TypeConverter$JavaIntConverter$.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:187)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:55)
    at com.datastax.spark.connector.types.TypeConverter$JavaIntConverter$.convert(TypeConverter.scala:187)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$35.applyOrElse(TypeConverter.scala:878)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:42)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:864)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:55)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:864)
    at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1$$anonfun$convertPF$1$$anonfun$applyOrElse$1.apply$mcVI$sp(MappedToGettableDataConverter.scala:205)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1$$anonfun$convertPF$1.applyOrElse(MappedToGettableDataConverter.scala:204)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:42)
    at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.convert(MappedToGettableDataConverter.scala:28)
    at com.datastax.spark.connector.writer.DefaultRowWriter.readColumnValues(DefaultRowWriter.scala:21)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)

Но я не знал, какой ряд провалился. Я включил журналы отладки на стороне Cassandra, и он не показывает никаких журналов об этом, я думаю, что ошибка фильтруется драйвером.

Код выглядит так:

def saveToCassandra(upserts: DStream[TableRef]) = {
    val rddCassandraUpsert = upserts.map {
      record =>
        TableRef(Some("aaa"),
                          record.ms_nb_tecnologia,
                          record.op_ts, DateTime.now())
    }
      rddCassandraUpsert.saveToCassandra(keyspace, table, SomeColumns(colsClass: _*))
  }

"aaa" должно быть целым числом, и я вынуждаю ошибку попытаться ее перехватить.

0 ответов

Другие вопросы по тегам