在信息流中的 java API 中使用 Java 自定义运算符
using Java custom operator within java API in infosphere streams
我已经搜索了一段时间如何使用自定义的 Java 运算符和信息流 Java API
我需要的是在编写如下自定义运算符之后...
public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....
我想像下面这样使用它....
Topology topology = new Topology("toplogy_test");
TStream<String> inDataFileName = ...
//call the "Test" operator here
您可以通过执行以下操作从拓扑 API 调用 Java 运算符/C++ 运算符:
添加Java算子的工具包:
SPL.addToolkit(拓扑,新建文件("/home/streamsadmin/myTk"));
将传入流转换为 SPL 流:
StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>");
SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){
@Override
public OutputTuple apply(String input_string, OutputTuple output_rstring) {
output_rstring.setString("rstring_attr_name", input_string);
return output_rstring;
}}, rstringSchema);
调用运算符:
SPLStream splOutputStream = SPL.invokeOperator("OperatorNamespace::YourOperatorName", splInputStream, rstringSchema, new HashMap());
您可以在此处找到更多相关信息:
附带说明一下,如果您正在考虑使用拓扑 API 编写 Streams 拓扑,那么编写常规 Java class 并调用它会更容易直接来自 Topology API.
例如:
MyJavaCode someObj = new MyJavaCode();
Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
@Override
public String apply(String word) {
return someObj.someFunction(word);
}
});
这里唯一的要求是你的 Java class 需要实现 Serializable。
我已经搜索了一段时间如何使用自定义的 Java 运算符和信息流 Java API
我需要的是在编写如下自定义运算符之后...
public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....
我想像下面这样使用它....
Topology topology = new Topology("toplogy_test");
TStream<String> inDataFileName = ...
//call the "Test" operator here
您可以通过执行以下操作从拓扑 API 调用 Java 运算符/C++ 运算符:
添加Java算子的工具包:
SPL.addToolkit(拓扑,新建文件("/home/streamsadmin/myTk"));
将传入流转换为 SPL 流:
StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>"); SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){ @Override public OutputTuple apply(String input_string, OutputTuple output_rstring) { output_rstring.setString("rstring_attr_name", input_string); return output_rstring; }}, rstringSchema);
调用运算符:
SPLStream splOutputStream = SPL.invokeOperator("OperatorNamespace::YourOperatorName", splInputStream, rstringSchema, new HashMap());
您可以在此处找到更多相关信息:
附带说明一下,如果您正在考虑使用拓扑 API 编写 Streams 拓扑,那么编写常规 Java class 并调用它会更容易直接来自 Topology API.
例如:
MyJavaCode someObj = new MyJavaCode();
Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
@Override
public String apply(String word) {
return someObj.someFunction(word);
}
});
这里唯一的要求是你的 Java class 需要实现 Serializable。