Количество записанных записей в искровой структурированной потоковой передаче
Я пытаюсь получить ряд прочитанных и записанных записей в структурированном потоке. я использую SparkListener.onTaskEnd()
для захвата входных и выходных метрик, но кажется, что количество записанных записей (taskEnd.taskMetrics().outputMetrics().recordsWritten()
) ноль, но это не так.
Вот моя конструкция потока:
StreamingQuery writeStream = session
.readStream()
.schema(RecordSchema.fromClass(IntegTestRecord.class))
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv(inputFolder.getRoot().toPath().toString())
.as(Encoders.bean(IntegTestRecord.class))
.flatMap(
((FlatMapFunction<IntegTestRecord, IntegTestVendingRecord>) (u) -> {
List<IntegTestVendingRecord> resultIterable = new ArrayList<>();
try {
IntegTestVendingRecord result = transformer.convert(u);
resultIterable.add(result);
} catch (Throwable t) {
System.err.println("Ooops");
t.printStackTrace();
}
return resultIterable.iterator();
}),
Encoders.bean(IntegTestVendingRecord.class))
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("path", outputFolder.getRoot().toPath().toString())
.option("checkpointLocation", checkpointFolder.getRoot().toPath().toString())
.start();
writeStream.processAllAvailable();
writeStream.stop();
Протестировал его с одной хорошей и одной плохой (непарсируемой) входной записью и вот что получилось:
(TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0
(TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2
(TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables():
(TestMain.java:onTaskEnd(84)) - duration total (min, med, max) = 323
(TestMain.java:onTaskEnd(84)) - number of output rows = 2
(TestMain.java:onTaskEnd(84)) - duration total (min, med, max) = 364
(TestMain.java:onTaskEnd(84)) - internal.metrics.input.recordsRead = 2
(TestMain.java:onTaskEnd(84)) - internal.metrics.input.bytesRead = 157
(TestMain.java:onTaskEnd(84)) - internal.metrics.resultSerializationTime = 3
(TestMain.java:onTaskEnd(84)) - internal.metrics.resultSize = 2396
(TestMain.java:onTaskEnd(84)) - internal.metrics.executorCpuTime = 633807000
(TestMain.java:onTaskEnd(84)) - internal.metrics.executorRunTime = 683
(TestMain.java:onTaskEnd(84)) - internal.metrics.executorDeserializeCpuTime = 55662000
(TestMain.java:onTaskEnd(84)) - internal.metrics.executorDeserializeTime = 58
(TestMain.java:onTaskEnd(89)) - input records = 2
Streaming query made progress: {
"id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5",
"runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7",
"name" : null,
"timestamp" : "2018-01-26T14:44:05.362Z",
"numInputRows" : 2,
"processedRowsPerSecond" : 0.8163265306122448,
"durationMs" : {
"addBatch" : 1994,
"getBatch" : 126,
"getOffset" : 52,
"queryPlanning" : 220,
"triggerExecution" : 2450,
"walCommit" : 41
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]",
"startOffset" : null,
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 2,
"processedRowsPerSecond" : 0.8163265306122448
} ],
"sink" : {
"description" : "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]"
}
}
Количество входов является правильным, но количество письменных записей, взятых из taskEnd.taskMetrics().outputMetrics().recordsWritten()
это ноль. Даже у накопителей нет правильного значения - должно быть 1, но оно показывает 2 "количество выходных строк"
Это ошибка? Или мой поток не построен правильно? Есть ли надежный способ подсчета количества прочитанных и записанных записей?
Обновить
Открытая проблема со Spark относительно количества письменных записей: https://issues.apache.org/jira/browse/SPARK-23288