Использование пользовательского оператора Java в API Java в потоках инфосферы

Некоторое время я искал, как использовать настраиваемый оператор Java с инфосферными потоками Java API

Что мне нужно, так это после написания настраиваемого оператора, как показано ниже...

public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....

Я хочу использовать его, как показано ниже....

        Topology topology = new Topology("toplogy_test");
        TStream<String> inDataFileName = ...
//call the "Test" operator here

1 ответ

Решение

Вы можете вызвать оператор Java / оператор C++ из API топологии, выполнив следующие действия:

  1. Добавьте инструментарий оператора Java:

    SPL.addToolkit (топология, новый файл ("/home/streamsadmin/myTk"));

  2. Преобразовать входящий поток в поток SPL:

    StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>");
    
    SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){
    
      @Override
      public OutputTuple apply(String input_string, OutputTuple output_rstring) {
        output_rstring.setString("rstring_attr_name", input_string);
        return output_rstring;
      }}, rstringSchema);
    
  3. Вызвать оператора:

    SPLStream splOutputStream = SPL.invokeOperator ("OperatorNamespace:: YourOperatorName", splInputStream, rstringSchema, new HashMap ());

Вы можете найти больше информации об этом здесь:

http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/

Кстати, если вы думаете об использовании API-интерфейса Topology для написания топологии Streams, тогда проще написать обычный класс Java и вызывать его непосредственно из API-интерфейса Topology.

Например:

MyJavaCode someObj = new MyJavaCode();

Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
        @Override
        public String apply(String word) {
            return someObj.someFunction(word);
        }
    });

Единственным требованием здесь является то, что ваш класс Java должен реализовывать Serializable.

Другие вопросы по тегам