在信息流中的 java API 中使用 Java 自定义运算符

using Java custom operator within java API in infosphere streams

我已经搜索了一段时间如何使用自定义的 Java 运算符和信息流 Java API

我需要的是在编写如下自定义运算符之后...

public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....

我想像下面这样使用它....

        Topology topology = new Topology("toplogy_test");
        TStream<String> inDataFileName = ...
//call the "Test" operator here

您可以通过执行以下操作从拓扑 API 调用 Java 运算符/C++ 运算符:

  1. 添加Java算子的工具包:

    SPL.addToolkit(拓扑,新建文件("/home/streamsadmin/myTk"));

  2. 将传入流转换为 SPL 流:

    StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>");
    
    SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){
    
      @Override
      public OutputTuple apply(String input_string, OutputTuple output_rstring) {
        output_rstring.setString("rstring_attr_name", input_string);
        return output_rstring;
      }}, rstringSchema);
    
  3. 调用运算符:

    SPLStream splOutputStream = SPL.invokeOperator("OperatorNamespace::YourOperatorName", splInputStream, rstringSchema, new HashMap());

您可以在此处找到更多相关信息:

http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/#integrating-spl-operators-with-the-java-application-api

附带说明一下,如果您正在考虑使用拓扑 API 编写 Streams 拓扑,那么编写常规 Java class 并调用它会更容易直接来自 Topology API.

例如:

MyJavaCode someObj = new MyJavaCode();

Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
        @Override
        public String apply(String word) {
            return someObj.someFunction(word);
        }
    });

这里唯一的要求是你的 Java class 需要实现 Serializable。