为什么风暴流的这种显式定义不起作用,而隐式定义却起作用?
Why does this explicit definition of a storm stream not work, while the implicit one does?
给定一个使用 Stream API 的简单 Apache Storm Topology,有两种方法可以初始化一个流:
版本 1 - 隐式声明
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5)
.print();
结果:这按预期工作,它只打印大于 5 的整数。
版本 2 - 显式声明
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
integerStream.filter(x -> x > 5);
integerStream.print();
结果: 这没有用 - 所有元组都被打印出来,包括小于 5 的整数。
问题:为什么这个显式声明不能正常工作,如何解决?
拓扑是 运行 在本地集群上,其中 IntSpout
只是一个简单的喷口,它使用以下命令发出 运行dom 整数:
StormTopology topo = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", new HashMap<>(), topo);
那是因为 integerStream.filter(x -> x > 5);
returns 您忽略了一个新流。
这个有效:
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
filteredStream.print();
您的第一个示例中也存在语法错误。它在第四行末尾多了一个分号。
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5) // <= there was a semicolon here
.print();
给定一个使用 Stream API 的简单 Apache Storm Topology,有两种方法可以初始化一个流:
版本 1 - 隐式声明
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5)
.print();
结果:这按预期工作,它只打印大于 5 的整数。
版本 2 - 显式声明
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
integerStream.filter(x -> x > 5);
integerStream.print();
结果: 这没有用 - 所有元组都被打印出来,包括小于 5 的整数。
问题:为什么这个显式声明不能正常工作,如何解决?
拓扑是 运行 在本地集群上,其中 IntSpout
只是一个简单的喷口,它使用以下命令发出 运行dom 整数:
StormTopology topo = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", new HashMap<>(), topo);
那是因为 integerStream.filter(x -> x > 5);
returns 您忽略了一个新流。
这个有效:
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
filteredStream.print();
您的第一个示例中也存在语法错误。它在第四行末尾多了一个分号。
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5) // <= there was a semicolon here
.print();