Ошибка при использовании Side Input - с помощью метода SideInputs, не принимающего тип KV в качестве ввода
Я сталкиваюсь со следующей ошибкой при использовании боковых входов.
Со следующим кодом модели:
PCollectionView<Map<String, String>> view1= information
.apply(View.<String, String>asMap());
PCollection<KV<String, Position>> FileData;
FileData.apply("populate",
ParDo.of(new DoFn<KV<String, Position>, KV<String, Position>>() {
@ProcessElement
public void processElement(ProcessContext c) {
}.withSideInputs(view1));
Ошибка возникает при вызове метода withSideInputs. Параметр withsideinput не принимает значение типа KV в качестве ввода. Не могли бы вы сказать, что мне не хватает.
Сообщение об ошибке:
java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be cast to java.lang.Iterable
at org.apache.beam.runners.core.SideInputHandler.addSideInputValue(SideInputHandler.java:142)
at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:225)
at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:207)
at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79)
at com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir.sweep(AbstractReservoir.java:413)
at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:269)
at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
Пример кода для воспроизведения вопроса:
public void testMapAsEntrySetSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
.apply(View.<String, Integer>asMap());
PCollection<KV<String, Integer>> output =
pipeline.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
ParDo.of(new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
for (Entry<String, Integer> entry : c.sideInput(view).entrySet()) {
c.output(KV.of(entry.getKey(), entry.getValue()));
}
}
}).withSideInputs(view));
PAssert.that(output).containsInAnyOrder(
KV.of("a", 1), KV.of("b", 3));
pipeline.run();
}
1 ответ
Это была ошибка в Apex Runner, и она была исправлена в Beam версии 2.3.0.