为什么风暴流的这种显式定义不起作用,而隐式定义却起作用?

Why does this explicit definition of a storm stream not work, while the implicit one does?

给定一个使用 Stream API 的简单 Apache Storm Topology,有两种方法可以初始化一个流:


版本 1 - 隐式声明

    StreamBuilder builder = new StreamBuilder();
    builder
        .newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
        .filter(x -> x > 5)
        .print();

结果:这按预期工作,它只打印大于 5 的整数。


版本 2 - 显式声明

    Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
    integerStream.filter(x -> x > 5);
    integerStream.print();

结果: 这没有用 - 所有元组都被打印出来,包括小于 5 的整数。


问题:为什么这个显式声明不能正常工作,如何解决?


拓扑是 运行 在本地集群上,其中 IntSpout 只是一个简单的喷口,它使用以下命令发出 运行dom 整数:

    StormTopology topo = builder.build();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", new HashMap<>(), topo);

那是因为 integerStream.filter(x -> x > 5); returns 您忽略了一个新流。

这个有效:

    Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
    Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
    filteredStream.print();

您的第一个示例中也存在语法错误。它在第四行末尾多了一个分号。

StreamBuilder builder = new StreamBuilder();
builder
    .newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
    .filter(x -> x > 5) // <= there was a semicolon here 
    .print();