Использование пользовательского оператора Java в API Java в потоках инфосферы
Некоторое время я искал, как использовать настраиваемый оператор Java с инфосферными потоками Java API
Что мне нужно, так это после написания настраиваемого оператора, как показано ниже...
public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....
Я хочу использовать его, как показано ниже....
Topology topology = new Topology("toplogy_test");
TStream<String> inDataFileName = ...
//call the "Test" operator here
1 ответ
Вы можете вызвать оператор Java / оператор C++ из API топологии, выполнив следующие действия:
Добавьте инструментарий оператора Java:
SPL.addToolkit (топология, новый файл ("/home/streamsadmin/myTk"));
Преобразовать входящий поток в поток SPL:
StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>"); SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){ @Override public OutputTuple apply(String input_string, OutputTuple output_rstring) { output_rstring.setString("rstring_attr_name", input_string); return output_rstring; }}, rstringSchema);
Вызвать оператора:
SPLStream splOutputStream = SPL.invokeOperator ("OperatorNamespace:: YourOperatorName", splInputStream, rstringSchema, new HashMap ());
Вы можете найти больше информации об этом здесь:
http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/
Кстати, если вы думаете об использовании API-интерфейса Topology для написания топологии Streams, тогда проще написать обычный класс Java и вызывать его непосредственно из API-интерфейса Topology.
Например:
MyJavaCode someObj = new MyJavaCode();
Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
@Override
public String apply(String word) {
return someObj.someFunction(word);
}
});
Единственным требованием здесь является то, что ваш класс Java должен реализовывать Serializable.