运行 Storm TrackedTopology 单元测试中的三叉戟拓扑
Running Trident Topology in Storm TrackedTopology Unit Test
如何运行 Trident 拓扑的 JUnit 测试允许元组在拓扑中流动,同时测试和验证每个阶段的输出?我已经在 Storm 的测试框架中尝试了 运行ning,但它无法实现 Trident 的验证和一致执行。
这是一个拓扑示例,其中包含一些我遇到最多问题的内嵌注释。
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.MemoryMapState;
import storm.trident.testing.Split;
import backtype.storm.Config;
import backtype.storm.ILocalCluster;
import backtype.storm.Testing;
import backtype.storm.testing.FeederSpout;
import backtype.storm.testing.TestJob;
import backtype.storm.testing.TrackedTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class WordCountTopologyTest {
@Test
public void testWordCountTopology() throws Exception {
Testing.withTrackedCluster(new WordCountTestJob());
}
public class WordCountTestJob implements TestJob {
@Override
public void run(ILocalCluster cluster) throws Exception {
// Create the test topology to submit
TridentTopology termCountTopology = new TridentTopology();
FeederSpout feeder = new FeederSpout(new Fields("text", "author"));
TridentState tridentState = termCountTopology.newStream("inputSpout", feeder)
.each(new Fields("text"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.name("counter-output")
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, termCountTopology.build());
// Feed some random data into the topology
feeder.feed(Arrays.asList("Nearly all men can stand adversity, but if you want to test a man's character, give him power.", "Abraham Lincoln"));
feeder.feed(Arrays.asList("No man has a good enough memory to be a successful liar.", "Abraham Lincoln"));
feeder.feed(Arrays.asList("Either write something worth reading or do something worth writing.", "Benjamin Franklin"));
feeder.feed(Arrays.asList("Well done is better than well said.", "Benjamin Franklin"));
cluster.submitTopology("word-count-testing", new Config(), tracked.getTopology());
// (!!) Runs, but bad to sleep for any time when may run faster or slower on other systems
// Utils.sleep(5000);
// (!!) Fails with 5000ms Topology timeout
// Testing.trackedWait(tracked, 3);
/*
* (!!) Always 0. Trident creates the streams and bolts internally with
* different names, so how can we read them to verify?
*/
List outputTuples = Testing.readTuples(tracked, "counter-output");
assertEquals(0, outputTuples.size());
}
}
}
除此之外,我尝试编写自己的 BaseFilter 以标记到存储所有元组的流的末尾,但似乎必须有更好的方法。此外,这并没有解决 运行 以受控方式设置拓扑的问题。这是 Trident 支持的东西吗?
使用 class FeederBatchSpout(用于 Trident)而不是 FeederSpout。 FeederBatchSpout 自己阻塞,不需要使用 Testing.trackedWait() 或类似的东西。
来源:https://groups.google.com/forum/#!topic/storm-user/CrAdQEXo5OU
如何运行 Trident 拓扑的 JUnit 测试允许元组在拓扑中流动,同时测试和验证每个阶段的输出?我已经在 Storm 的测试框架中尝试了 运行ning,但它无法实现 Trident 的验证和一致执行。
这是一个拓扑示例,其中包含一些我遇到最多问题的内嵌注释。
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.MemoryMapState;
import storm.trident.testing.Split;
import backtype.storm.Config;
import backtype.storm.ILocalCluster;
import backtype.storm.Testing;
import backtype.storm.testing.FeederSpout;
import backtype.storm.testing.TestJob;
import backtype.storm.testing.TrackedTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class WordCountTopologyTest {
@Test
public void testWordCountTopology() throws Exception {
Testing.withTrackedCluster(new WordCountTestJob());
}
public class WordCountTestJob implements TestJob {
@Override
public void run(ILocalCluster cluster) throws Exception {
// Create the test topology to submit
TridentTopology termCountTopology = new TridentTopology();
FeederSpout feeder = new FeederSpout(new Fields("text", "author"));
TridentState tridentState = termCountTopology.newStream("inputSpout", feeder)
.each(new Fields("text"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.name("counter-output")
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, termCountTopology.build());
// Feed some random data into the topology
feeder.feed(Arrays.asList("Nearly all men can stand adversity, but if you want to test a man's character, give him power.", "Abraham Lincoln"));
feeder.feed(Arrays.asList("No man has a good enough memory to be a successful liar.", "Abraham Lincoln"));
feeder.feed(Arrays.asList("Either write something worth reading or do something worth writing.", "Benjamin Franklin"));
feeder.feed(Arrays.asList("Well done is better than well said.", "Benjamin Franklin"));
cluster.submitTopology("word-count-testing", new Config(), tracked.getTopology());
// (!!) Runs, but bad to sleep for any time when may run faster or slower on other systems
// Utils.sleep(5000);
// (!!) Fails with 5000ms Topology timeout
// Testing.trackedWait(tracked, 3);
/*
* (!!) Always 0. Trident creates the streams and bolts internally with
* different names, so how can we read them to verify?
*/
List outputTuples = Testing.readTuples(tracked, "counter-output");
assertEquals(0, outputTuples.size());
}
}
}
除此之外,我尝试编写自己的 BaseFilter 以标记到存储所有元组的流的末尾,但似乎必须有更好的方法。此外,这并没有解决 运行 以受控方式设置拓扑的问题。这是 Trident 支持的东西吗?
使用 class FeederBatchSpout(用于 Trident)而不是 FeederSpout。 FeederBatchSpout 自己阻塞,不需要使用 Testing.trackedWait() 或类似的东西。
来源:https://groups.google.com/forum/#!topic/storm-user/CrAdQEXo5OU