如何将 Trident/Storm 中的值存储在列表中(使用 Java API)

How to store values from Trident/Storm in a List (using the Java API)

我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否正在执行它们应该执行的操作。

我希望能够检索在 运行 拓扑之后产生的所有值并将它们放入列表中,这样我就可以 "see"他们并检查他们的条件。

   FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
   TridentTopology topology = new TridentTopology();
   topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    // Soo... how do I retrieve the "aggregated_foos" from here?

我是运行拓扑为TrackedTopology(回复代码来自, thank you @brianghig for asking it and @Thomas Kielbus

这就是我 "launch" 拓扑以及将样本值输入其中的方式:

TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());

feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));

当我这样做时,我可以在日志消息中看到拓扑 运行 正确,并且值计算正确,但我想 "fish" 结果出来进入 List (或任何结构,此时)所以我实际上可以在我的测试中放入一些 Asserts

我一直在尝试 [a s**ton] 不同的方法,但其中 none 有效。

最新的想法是在聚合后添加一个 bolt,这样它将 "persist" 我的值放入列表中:

下面你会看到 class 试图遍历 aggregate 发出的所有元组,并将它们放入我之前初始化的列表中:

class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
    final List<AggregatedFoo> results;

    public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
        this.results = results;
    }

    @Override
    public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
                            TridentCollector collector) {
        for (TridentTuple tuple : tuples) {
            results.add((AggregatedFoo) tuple.getValue(0));
        }
    }
}

所以现在代码看起来像:

// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    .partitionPersist(new FieldFetcherFactory(),
                        new Fields("aggregated_foos"),
                        new FieldFetcherStateUpdater(results));

     LOGGER.info("Done. Checkpoint results={}", results);

但没有...日志显示 Done. Checkpoint results=[](空列表)

有办法得到吗?我想它一定是可行的,但我一直没能想出办法...

任何提示或 link 页面或类似内容将不胜感激。提前谢谢你。

您需要使用静态成员变量 result。如果您有多个并行任务 运行(即 parallelism_hint > 1),您还需要 synchronizeresult.

的写入权限

在你的例子中,result 将为空,因为 Storm 在内部创建了一个新的螺栓实例(包括 ArrayList 的一个新实例)。使用静态变量确保您可以访问正确的对象(因为在您的螺栓的所有实例中只有一个)。