Одно поле в буферах протокола всегда отсутствует при чтении из SequenceFile
Что-то таинственное происходит со мной:
Что я хотел сделать:
1. Save a Protocol Buffers object as SequenceFile format.
2. Read this SequenceFile text and extract the field that I need.
Таинственная часть: одно поле, которое я хотел получить, всегда равно нулю.
Product_Perf
это поле, которое я хотел извлечь из SequencFiles, которое всегда отсутствует.
Вот моя схема буферов протокола:
message ProductJoin {
Signals signals = 1;
int64 id = 2;
}
message Signals {
ProductPerf product_perf = 1;
}
message ProductPerf {
int64 impressions = 1;
}
Вот как я сохраняю буферы протокола как SequenceFiles:
JavaPairRDD<BytesWritable, BytesWritable> bytesWritableJavaPairRdd =
flattenedPjPairRdd.mapToPair(
new PairFunction<Tuple2<Long, ProductJoin>, BytesWritable, BytesWritable>() {
@Override
public Tuple2<BytesWritable, BytesWritable> call(Tuple2<Long, ProductJoin> longProductJoinTuple2) throws Exception {
return new Tuple2<>(
new BytesWritable(longProductJoinTuple2._2().getId().getBytes()),
new BytesWritable(longProductJoinTuple2._2().toByteArray()));
}
}
//dump SequenceFiles
bytesWritableJavaPairRdd.saveAsHadoopFile(
"/tmp/path/",
BytesWritable.class,
BytesWritable.class,
SequenceFileOutputFormat.class
);
Ниже приведен код, как я читал SequenceFile:
sparkSession.sparkContext()
.sequenceFile("tmp/path", BytesWritable.class, BytesWritable.class)
.toJavaRDD()
.mapToPair(
bytesWritableBytesWritableTuple2 -> {
Method parserMethod = clazz.getDeclaredMethod("parser");
Parser<T> parser = (Parser<T>) parserMethod.invoke(null);
return new Tuple2<>(
Text.decode(bytesWritableBytesWritableTuple2._1().getBytes()),
parser.parseFrom(bytesWritableBytesWritableTuple2._2().getBytes()));
}
);