什么可以用作 CassandraWriterBolt 的测试存根?
What can be used as a test stub for CassandraWriterBolt?
我从 Kafka 读取了一个 json,FieldExtractionBolt 读取 json 将数据提取到元组值中并将它们传递给 CassandraWriterBolt,后者又在 Cassandra 中写入一条记录,将所有这些元组值写入单独的列。
JSON 关于 Kafka 的消息 -
{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}
FieldExtractionBolt -
String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));
CassandraWriterBolt -
return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))
我尝试根据此处给出的答案编写测试 -
在我的项目中,我在 Spring 配置中定义了所有螺栓、喷口和流。这使得 writing/reading 我的拓扑非常简单。我通过从 ApplicationContext 获取螺栓、喷口和流 bean 来构建拓扑。在我的 Spring 配置中,KafkaSpout 和 CassandraWriterBolt 在 'prod' 配置文件下定义,因此它们仅在产品中使用,而在 'test' 配置文件下,我为 KafkaSpout 和 CassandraWriterBolt 定义了存根。对于 KafkaSpout,我使用了 FixedToupleSpout,对于 CassandraWriterBolt,我使用了 TestWordCounter。
这是我的测试
@Test
public void testTopology(){
StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
MockedSources mocked = new MockedSources();
mocked.addMockData("kafkaSpout",
new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));
Config topoConf = new Config();
topoConf.setNumWorkers(2);
CompleteTopologyParam ctp = new CompleteTopologyParam();
ctp.setMockedSources(mocked);
ctp.setStormConf(topoConf);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
MkClusterParam param = new MkClusterParam();
param.setSupervisors(4);
Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}
@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
@Bean
public IRichSpout kafkaSpout(){
return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
}
@Bean
public IBasicBolt cassandraWriterBolt(){
return new TestWordCounter();
}
}
我得到的结果不是我所期望的。我收到以下错误 -
java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]
看起来,TestWordCounter 只是将第一个值读取为元组(仅货币对并且跳过出价和要价)。似乎 TestWordCounter 在这里不是一个正确的选择。 CassandraWriterBolt 的正确存根是什么,这样我就可以断言它会收到 2 条记录,一条是 GBPJPY,另一条是 GBPUSD,还有他们的出价和要价?
Testing.readTuples(results, "cassandraWriterBolt")
将 return "cassandraWriterBolt" 发出的元组。那是你要测试的吗?我认为您正在尝试断言 "cassandraWriterBolt" 接收到哪些元组,而不是它发出什么。
你可以在这里做两件事。您可以使用 readTuples
从发射到 Cassandra 螺栓的螺栓中读取,而不是从 Cassandra 螺栓中读取。如果您的拓扑结构很简单(例如,没有很多不同的螺栓写入 Cassandra 螺栓),这是一个不错的解决方案。
更好的解决方案 (IMO) 是编写一个简单的短螺栓来替换 TestWordCounter
。螺栓唯一应该做的就是接收输入元组,确认它,并在新元组中发出值。
execute(Tuple input, BasicOutputCollector collector) {
collector.emit(input.getValues());
}
然后您可以使用 readTuples
读取 bolt 发出的元组,这将与它接收的值相同。
我从 Kafka 读取了一个 json,FieldExtractionBolt 读取 json 将数据提取到元组值中并将它们传递给 CassandraWriterBolt,后者又在 Cassandra 中写入一条记录,将所有这些元组值写入单独的列。
JSON 关于 Kafka 的消息 -
{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}
FieldExtractionBolt -
String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));
CassandraWriterBolt -
return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))
我尝试根据此处给出的答案编写测试 -
在我的项目中,我在 Spring 配置中定义了所有螺栓、喷口和流。这使得 writing/reading 我的拓扑非常简单。我通过从 ApplicationContext 获取螺栓、喷口和流 bean 来构建拓扑。在我的 Spring 配置中,KafkaSpout 和 CassandraWriterBolt 在 'prod' 配置文件下定义,因此它们仅在产品中使用,而在 'test' 配置文件下,我为 KafkaSpout 和 CassandraWriterBolt 定义了存根。对于 KafkaSpout,我使用了 FixedToupleSpout,对于 CassandraWriterBolt,我使用了 TestWordCounter。
这是我的测试
@Test
public void testTopology(){
StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
MockedSources mocked = new MockedSources();
mocked.addMockData("kafkaSpout",
new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));
Config topoConf = new Config();
topoConf.setNumWorkers(2);
CompleteTopologyParam ctp = new CompleteTopologyParam();
ctp.setMockedSources(mocked);
ctp.setStormConf(topoConf);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
MkClusterParam param = new MkClusterParam();
param.setSupervisors(4);
Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}
@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
@Bean
public IRichSpout kafkaSpout(){
return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
}
@Bean
public IBasicBolt cassandraWriterBolt(){
return new TestWordCounter();
}
}
我得到的结果不是我所期望的。我收到以下错误 -
java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]
看起来,TestWordCounter 只是将第一个值读取为元组(仅货币对并且跳过出价和要价)。似乎 TestWordCounter 在这里不是一个正确的选择。 CassandraWriterBolt 的正确存根是什么,这样我就可以断言它会收到 2 条记录,一条是 GBPJPY,另一条是 GBPUSD,还有他们的出价和要价?
Testing.readTuples(results, "cassandraWriterBolt")
将 return "cassandraWriterBolt" 发出的元组。那是你要测试的吗?我认为您正在尝试断言 "cassandraWriterBolt" 接收到哪些元组,而不是它发出什么。
你可以在这里做两件事。您可以使用 readTuples
从发射到 Cassandra 螺栓的螺栓中读取,而不是从 Cassandra 螺栓中读取。如果您的拓扑结构很简单(例如,没有很多不同的螺栓写入 Cassandra 螺栓),这是一个不错的解决方案。
更好的解决方案 (IMO) 是编写一个简单的短螺栓来替换 TestWordCounter
。螺栓唯一应该做的就是接收输入元组,确认它,并在新元组中发出值。
execute(Tuple input, BasicOutputCollector collector) {
collector.emit(input.getValues());
}
然后您可以使用 readTuples
读取 bolt 发出的元组,这将与它接收的值相同。