如何将 Trident/Storm 中的值存储在列表中(使用 Java API)
How to store values from Trident/Storm in a List (using the Java API)
我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否正在执行它们应该执行的操作。
我希望能够检索在 运行 拓扑之后产生的所有值并将它们放入列表中,这样我就可以 "see"他们并检查他们的条件。
FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
// Soo... how do I retrieve the "aggregated_foos" from here?
我是运行拓扑为TrackedTopology
(回复代码来自, thank you @brianghig for asking it and @Thomas Kielbus)
这就是我 "launch" 拓扑以及将样本值输入其中的方式:
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());
feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));
当我这样做时,我可以在日志消息中看到拓扑 运行 正确,并且值计算正确,但我想 "fish" 结果出来进入 List
(或任何结构,此时)所以我实际上可以在我的测试中放入一些 Asserts
。
我一直在尝试 [a s**ton] 不同的方法,但其中 none 有效。
最新的想法是在聚合后添加一个 bolt,这样它将 "persist" 我的值放入列表中:
下面你会看到 class 试图遍历 aggregate
发出的所有元组,并将它们放入我之前初始化的列表中:
class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
final List<AggregatedFoo> results;
public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
this.results = results;
}
@Override
public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
TridentCollector collector) {
for (TridentTuple tuple : tuples) {
results.add((AggregatedFoo) tuple.getValue(0));
}
}
}
所以现在代码看起来像:
// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
.partitionPersist(new FieldFetcherFactory(),
new Fields("aggregated_foos"),
new FieldFetcherStateUpdater(results));
LOGGER.info("Done. Checkpoint results={}", results);
但没有...日志显示 Done. Checkpoint results=[]
(空列表)
有办法得到吗?我想它一定是可行的,但我一直没能想出办法...
任何提示或 link 页面或类似内容将不胜感激。提前谢谢你。
您需要使用静态成员变量 result
。如果您有多个并行任务 运行(即 parallelism_hint > 1
),您还需要 synchronize
对 result
.
的写入权限
在你的例子中,result
将为空,因为 Storm 在内部创建了一个新的螺栓实例(包括 ArrayList
的一个新实例)。使用静态变量确保您可以访问正确的对象(因为在您的螺栓的所有实例中只有一个)。
我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否正在执行它们应该执行的操作。
我希望能够检索在 运行 拓扑之后产生的所有值并将它们放入列表中,这样我就可以 "see"他们并检查他们的条件。
FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
// Soo... how do I retrieve the "aggregated_foos" from here?
我是运行拓扑为TrackedTopology
(回复代码来自
这就是我 "launch" 拓扑以及将样本值输入其中的方式:
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());
feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));
当我这样做时,我可以在日志消息中看到拓扑 运行 正确,并且值计算正确,但我想 "fish" 结果出来进入 List
(或任何结构,此时)所以我实际上可以在我的测试中放入一些 Asserts
。
我一直在尝试 [a s**ton] 不同的方法,但其中 none 有效。
最新的想法是在聚合后添加一个 bolt,这样它将 "persist" 我的值放入列表中:
下面你会看到 class 试图遍历 aggregate
发出的所有元组,并将它们放入我之前初始化的列表中:
class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
final List<AggregatedFoo> results;
public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
this.results = results;
}
@Override
public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
TridentCollector collector) {
for (TridentTuple tuple : tuples) {
results.add((AggregatedFoo) tuple.getValue(0));
}
}
}
所以现在代码看起来像:
// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
.partitionPersist(new FieldFetcherFactory(),
new Fields("aggregated_foos"),
new FieldFetcherStateUpdater(results));
LOGGER.info("Done. Checkpoint results={}", results);
但没有...日志显示 Done. Checkpoint results=[]
(空列表)
有办法得到吗?我想它一定是可行的,但我一直没能想出办法...
任何提示或 link 页面或类似内容将不胜感激。提前谢谢你。
您需要使用静态成员变量 result
。如果您有多个并行任务 运行(即 parallelism_hint > 1
),您还需要 synchronize
对 result
.
在你的例子中,result
将为空,因为 Storm 在内部创建了一个新的螺栓实例(包括 ArrayList
的一个新实例)。使用静态变量确保您可以访问正确的对象(因为在您的螺栓的所有实例中只有一个)。