Количество входных строк в искровой структурированной потоковой передаче с пользовательским приемником

Я использую пользовательский приемник в структурированном потоке (spark 2.2.0) и заметил, что spark создает неверные метрики для количества входных строк - он всегда равен нулю.

Моя конструкция потока:

StreamingQuery writeStream = session
            .readStream()
            .schema(RecordSchema.fromClass(TestRecord.class))
            .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
            .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
            .csv(s3Path.toString())
            .as(Encoders.bean(TestRecord.class))
            .flatMap(
                ((FlatMapFunction<TestRecord, TestOutputRecord>) (u) -> {
                    List<TestOutputRecord> list = new ArrayList<>();
                    try {
                        TestOutputRecord result = transformer.convert(u);
                        list.add(result);
                    } catch (Throwable t) {
                        System.err.println("Failed to convert a record");
                        t.printStackTrace();
                    }

                    return list.iterator();
                }),
                Encoders.bean(TestOutputRecord.class))
        .map(new DataReinforcementMapFunction<>(), Encoders.bean(TestOutputRecord.clazz))
        .writeStream()
        .trigger(Trigger.ProcessingTime(WRITE_FREQUENCY, TimeUnit.SECONDS))
        .format(MY_WRITER_FORMAT)
        .outputMode(OutputMode.Append())
        .queryName("custom-sink-stream")
        .start();

        writeStream.processAllAvailable();
        writeStream.stop();

Журналы:

Streaming query made progress: {
  "id" : "a8a7fbc2-0f06-4197-a99a-114abae24964",
  "runId" : "bebc8a0c-d3b2-4fd6-8710-78223a88edc7",
  "name" : "custom-sink-stream",
  "timestamp" : "2018-01-25T18:39:52.949Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 781,
    "triggerExecution" : 781
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[s3n://test-bucket/test]",
    "startOffset" : {
      "logOffset" : 0
    },
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "com.mycompany.spark.MySink@f82a99"
  }
}

Нужно ли заполнять какие-либо показатели в моем пользовательском приемнике, чтобы можно было отслеживать прогресс? Или это может быть проблема в FileStreamSource, когда он читает из корзины s3?

1 ответ

Решение

Проблема была связана с использованием dataset.rdd в моем пользовательском приемнике, который создает новый план, так что StreamExecution не знает об этом и, следовательно, не может получить метрики.

Замена data.rdd.mapPartitions с data.queryExecution.toRdd.mapPartitions устраняет проблему

Другие вопросы по тегам