Ошибка при использовании Side Input - с помощью метода SideInputs, не принимающего тип KV в качестве ввода

Я сталкиваюсь со следующей ошибкой при использовании боковых входов.

Со следующим кодом модели:

PCollectionView<Map<String, String>> view1= information
                .apply(View.<String, String>asMap());

PCollection<KV<String, Position>> FileData;

FileData.apply("populate",
ParDo.of(new DoFn<KV<String, Position>, KV<String, Position>>() {
                    @ProcessElement
public void processElement(ProcessContext c) {

}.withSideInputs(view1));

Ошибка возникает при вызове метода withSideInputs. Параметр withsideinput не принимает значение типа KV в качестве ввода. Не могли бы вы сказать, что мне не хватает.

Сообщение об ошибке:

java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be cast to java.lang.Iterable
        at org.apache.beam.runners.core.SideInputHandler.addSideInputValue(SideInputHandler.java:142)
        at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:225)
        at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:207)
        at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79)
        at com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir.sweep(AbstractReservoir.java:413)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:269)
        at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)

Пример кода для воспроизведения вопроса:

public void testMapAsEntrySetSideInput() {

    final PCollectionView<Map<String, Integer>> view =
        pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
            .apply(View.<String, Integer>asMap());

    PCollection<KV<String, Integer>> output =
        pipeline.apply("CreateMainInput", Create.of(2 /* size */))
            .apply(
                "OutputSideInputs",
                ParDo.of(new DoFn<Integer, KV<String, Integer>>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    assertEquals((int) c.element(), c.sideInput(view).size());
                    assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
                    for (Entry<String, Integer> entry : c.sideInput(view).entrySet()) {
                      c.output(KV.of(entry.getKey(), entry.getValue()));
                    }
                  }
                }).withSideInputs(view));

    PAssert.that(output).containsInAnyOrder(
        KV.of("a", 1), KV.of("b", 3));

    pipeline.run();
  }

1 ответ

Это была ошибка в Apex Runner, и она была исправлена ​​в Beam версии 2.3.0.

Другие вопросы по тегам