apache beam bigtable Итеративная мутация
Я перевожу свой поток данных Google Java Java 1.9 в Beam 2.0 и пытаюсь использовать BigtableIO.Write
....
.apply("", BigtableIO.write()
.withBigtableOptions(bigtableOptions)
.withTableId("twoSecondVitals"));
В ParDo перед BigtableIO я изо всех сил пытаюсь сделать Iterable.
try{
Mutation mutation = Mutation.parseFrom(new ObjectMapper().writeValueAsBytes(v));
Mutation mu[] = {mutation};
Iterable<Mutation> imu = Arrays.asList(mu);
log.severe("imu");
c.output(KV.of(ByteString.copyFromUtf8(rowKey+"_"+v.getEpoch()), imu));
}catch (Exception e){
log.severe(rowKey+"_"+v.getEpoch()+" error:"+e.getMessage());
}
Приведенный выше код вызывает следующее исключение InvalidProtocolBufferException: тег конечной группы сообщения протокола не соответствует ожидаемому тегу
v это список объектов (Vitals.class). API hbase использует метод Put для создания мутации. Как создать мутацию BigTable, которая будет работать с приемником BigtableIO?
1 ответ
Просматривая тесты SDK, я смог найти свой ответ.
Iterable<Mutation> mutations =
ImmutableList.of(Mutation.newBuilder()
.setSetCell(
Mutation.SetCell.newBuilder()
.setValue(ByteString.copyFrom(new ObjectMapper().writeValueAsBytes(v)))
.setFamilyName("vitals")
).build());