当 运行 使用 Spring Cloud Stream、Spring Data Cassandra 和 Cassandra-Unit-Spring 进行集成测试时,怪异随之而来
Weirdness ensues when running integraton tests with Spring Cloud Stream, Spring Data Cassandra and Cassandra-Unit-Spring
我将 Cassandra 用作我的应用程序的事件存储,Spring Cloud Stream 接收事件并 Spring Cloud Cassandra 保存它们。
为了监听不同的事件,我创建了一个具有两种用户定义类型的 ClientEvent:客户端和产品(类 都用 @UserDefinedType
注释),如下所示:
@Table
public class ClientEvent {
@PrimaryKey
@CassandraType(type = DataType.Name.UUID)
private String id; // created with UUIDs.timeBased().toString()
@CassandraType(type = DataType.Name.UDT, userTypeName = "client")
private Client client; // has an email field
@CassandraType(type = DataType.Name.UDT, userTypeName = "product")
private Product product; // has name and cost fields
}
我的 EventConsumer 看起来像这样,并且适用于不同的事件:
@EnableBinding(Sink.class)
public class EventConsumer {
@Inject
private EventRepository eventRepository;
@Override
@StreamListener(Sink.INPUT)
public void clientEvent(ClientEvent event) {
eventRepository.save(event);
}
}
我还有一个控制器 returns 给定客户端的所有事件(所述控制器的代码非常标准,所以我不会在这里 post 它)。
为了测试所有这些,我设置了一些集成测试 spring-cloud-contract-stub-运行ner 和 cassandra-unit-spring 到运行 嵌入式卡桑德拉。我只有以下配置,在我的 application.yml:
spring:
profiles: test
data:
cassandra:
keyspace-name: testkeyspace
contact-points: localhost
port: 9142
这是 Spock 测试:
@SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
@AutoConfigureStubRunner(ids = ["example.com:clients-service",
"example.com:products-service"], stubsMode = StubsMode.LOCAL)
@TestExecutionListeners(listeners = [
CassandraUnitDependencyInjectionTestExecutionListener.class,
DependencyInjectionTestExecutionListener.class
])
@CassandraDataSet(keyspace = "testkeyspace", value = "dataset.cql")
@EmbeddedCassandra(timeout = 60000L)
@ActiveProfiles("test")
class EventsIntegrationTests extends Specification {
@Inject
StubTrigger stubTrigger
@Inject
private TestRestTemplate restTemplate
def "client created"() {
given: "that a client has been created"
stubTrigger.trigger("client_created") //stub defined in clients-service
when: "I fetch the events for the current client"
def response = restTemplate.getForEntity("/", ClientEvent[])
then: "it should be a clientCreated event"
ClientEvent event = response.body[0]
event.name == "clientCreated"
event.client == new Client("test") // hashCode and equals implemented
}
def "product bought"() {
given: "that a product has been bought"
stubTrigger.trigger("product_bought") //stub defined in products-service
}
when: "I fetch the events for the current client"
def response = restTemplate.getForEntity("/", ClientEvent[])
then: "it should be a productBought event"
ClientEvent event = response.body[0]
event.name == "productBought"
event.client == new Client("test") // hashCode and equals implemented
event.product == new Product("Lamp", 100D)
}
}
dataset.cql 文件创建测试密钥空间、客户端和产品类型,然后是 ClientEvent table。到目前为止,一切都很好。问题是,虽然第一个测试 运行 没问题,但第二个测试在 EventConsumer 中保存事件时给了我这个 NullPointerException:
org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.events.consumers.EventConsumer#clientEvent[1 args]; nested exception is java.lang.NullPointerException
[...]
Caused by: java.lang.NullPointerException: null
at org.springframework.data.cassandra.core.mapping.SimpleUserTypeResolver.resolveType(SimpleUserTypeResolver.java:63) ~[spring-data-cassandra-2.0.6.RELEASE.jar:2.0.6.RELEASE]
我发现如果我在每次测试之间调试或等待一秒钟(在 setup()
方法中使用 Thread.sleep(1000)
),我的测试 运行 很好!我注意到在每次测试之间,cassandra-unit-spring 删除并重新创建数据库。在我看来,第二次测试发生在重新创建用户定义类型之前,这就是它抛出 NullPointerException 的原因:
2018-05-31 22:43:43.552 INFO 14976 --- [port-Requests-1] o.a.cassandra.service.MigrationManager : Drop Keyspace 'testkeyspace'
2018-05-31 22:43:53.445 INFO 14976 --- [igrationStage:1] o.a.cassandra.utils.memory.BufferPool : Global buffer pool is enabled, when pool is exhausted (max is 512.000MiB) it will allocate on heap
2018-05-31 22:43:53.511 INFO 14976 --- [port-Requests-3] o.a.cassandra.service.MigrationManager : Create new Keyspace: KeyspaceMetadata{name=testkeyspace, params=KeyspaceParams{durable_writes=false, replication=ReplicationParams{class=org.apache.cassandra.locator.SimpleStrategy, replication_factor=1}}, tables=[], views=[], functions=[], types=[]}
2018-05-31 22:43:53.603 INFO 14976 --- [port-Requests-2] o.a.cassandra.service.MigrationManager : Create new table: org.apache.cassandra.config.CFMetaData@1f5471ca[cfId=3d9ed530-653d-11e8-b838-27af117b0453,ksName=testkeyspace,cfName=clientevent,flags=[COMPOUND],params=TableParams{comment=, read_repair_chance=0.0, dclocal_read_repair_chance=0.1, bloom_filter_fp_chance=0.01, crc_check_chance=1.0, gc_grace_seconds=864000, default_time_to_live=0, memtable_flush_period_in_ms=0, min_index_interval=128, max_index_interval=2048, speculative_retry=99PERCENTILE, caching={'keys' : 'ALL', 'rows_per_partition' : 'NONE'}, compaction=CompactionParams{class=org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy, options={min_threshold=4, max_threshold=32}}, compression=org.apache.cassandra.schema.CompressionParams@57a6e29, extensions={}, cdc=false},comparator=comparator(),partitionColumns=[[] | [client createdat name product]],partitionKeyColumns=[id],clusteringColumns=[],keyValidator=org.apache.cassandra.db.marshal.TimeUUIDType,columnMetadata=[product, id, client, name, createdat],droppedColumns={},triggers=[],indexes=[]]
2018-05-31 22:43:53.619 INFO 14976 --- [igrationStage:1] o.apache.cassandra.db.ColumnFamilyStore : Initializing testkeyspace.clientevent
我的配置中是否遗漏了什么?如何避免在测试之间等待一秒钟?
好的,我在 this post on CassandraUnit 之后做了一些更改,这不仅解决了我的问题,而且使我的测试 运行 快得多,因为它只创建一次数据库!
我对 EventsIntegrationTest 进行了以下更改:
@SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
@AutoConfigureStubRunner(ids = ["example.com:clients-service",
"example.com:products-service"], stubsMode = StubsMode.LOCAL)
// no more TestExecutionListeners and cassandra annotations
@ActiveProfiles("test")
class EventsIntegrationTests extends Specification {
@Shared
private static Cluster cluster
@Shared
private static Session session
@Inject
StubTrigger stubTrigger
@Inject
private TestRestTemplate restTemplate
// runs only once before all tests, like JUnit 5's @BeforeAll
def setupSpec() {
// the same code from the post, creates the DB.
if (session == null) {
try {
EmbeddedCassandraServerHelper.startEmbeddedCassandra()
cluster = new Cluster.Builder().addContactPoint("localhost").withPort(9142).build()
session = cluster.connect()
CQLDataLoader loader = new CQLDataLoader(session)
ClassPathCQLDataSet dataSet = new ClassPathCQLDataSet("dataset.cql", true, true, "testkeyspace")
loader.load(dataSet)
} catch (Exception e) {
throw new RuntimeException("Could not start cassandra server or obtain a valid session.", e);
}
}
}
// same tests
// runs after every test like JUnit 5's @BeforeEach
def cleanup() {
// truncates the tables
Collection<TableMetadata> tables = cluster.getMetadata().getKeyspace("testkeyspace").getTables()
// Groovy's clojure, slightly different syntax from Java's Lambda
tables.forEach({session.execute(QueryBuilder.truncate(it))})
}
}
当然,因为我只有一个集成测试,所以所有代码都在那class。但是,如果您有多个涉及 Cassandra 的集成测试,我建议您使用这些方法创建一个基础 class。
我将 Cassandra 用作我的应用程序的事件存储,Spring Cloud Stream 接收事件并 Spring Cloud Cassandra 保存它们。
为了监听不同的事件,我创建了一个具有两种用户定义类型的 ClientEvent:客户端和产品(类 都用 @UserDefinedType
注释),如下所示:
@Table
public class ClientEvent {
@PrimaryKey
@CassandraType(type = DataType.Name.UUID)
private String id; // created with UUIDs.timeBased().toString()
@CassandraType(type = DataType.Name.UDT, userTypeName = "client")
private Client client; // has an email field
@CassandraType(type = DataType.Name.UDT, userTypeName = "product")
private Product product; // has name and cost fields
}
我的 EventConsumer 看起来像这样,并且适用于不同的事件:
@EnableBinding(Sink.class)
public class EventConsumer {
@Inject
private EventRepository eventRepository;
@Override
@StreamListener(Sink.INPUT)
public void clientEvent(ClientEvent event) {
eventRepository.save(event);
}
}
我还有一个控制器 returns 给定客户端的所有事件(所述控制器的代码非常标准,所以我不会在这里 post 它)。
为了测试所有这些,我设置了一些集成测试 spring-cloud-contract-stub-运行ner 和 cassandra-unit-spring 到运行 嵌入式卡桑德拉。我只有以下配置,在我的 application.yml:
spring:
profiles: test
data:
cassandra:
keyspace-name: testkeyspace
contact-points: localhost
port: 9142
这是 Spock 测试:
@SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
@AutoConfigureStubRunner(ids = ["example.com:clients-service",
"example.com:products-service"], stubsMode = StubsMode.LOCAL)
@TestExecutionListeners(listeners = [
CassandraUnitDependencyInjectionTestExecutionListener.class,
DependencyInjectionTestExecutionListener.class
])
@CassandraDataSet(keyspace = "testkeyspace", value = "dataset.cql")
@EmbeddedCassandra(timeout = 60000L)
@ActiveProfiles("test")
class EventsIntegrationTests extends Specification {
@Inject
StubTrigger stubTrigger
@Inject
private TestRestTemplate restTemplate
def "client created"() {
given: "that a client has been created"
stubTrigger.trigger("client_created") //stub defined in clients-service
when: "I fetch the events for the current client"
def response = restTemplate.getForEntity("/", ClientEvent[])
then: "it should be a clientCreated event"
ClientEvent event = response.body[0]
event.name == "clientCreated"
event.client == new Client("test") // hashCode and equals implemented
}
def "product bought"() {
given: "that a product has been bought"
stubTrigger.trigger("product_bought") //stub defined in products-service
}
when: "I fetch the events for the current client"
def response = restTemplate.getForEntity("/", ClientEvent[])
then: "it should be a productBought event"
ClientEvent event = response.body[0]
event.name == "productBought"
event.client == new Client("test") // hashCode and equals implemented
event.product == new Product("Lamp", 100D)
}
}
dataset.cql 文件创建测试密钥空间、客户端和产品类型,然后是 ClientEvent table。到目前为止,一切都很好。问题是,虽然第一个测试 运行 没问题,但第二个测试在 EventConsumer 中保存事件时给了我这个 NullPointerException:
org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.events.consumers.EventConsumer#clientEvent[1 args]; nested exception is java.lang.NullPointerException
[...]
Caused by: java.lang.NullPointerException: null
at org.springframework.data.cassandra.core.mapping.SimpleUserTypeResolver.resolveType(SimpleUserTypeResolver.java:63) ~[spring-data-cassandra-2.0.6.RELEASE.jar:2.0.6.RELEASE]
我发现如果我在每次测试之间调试或等待一秒钟(在 setup()
方法中使用 Thread.sleep(1000)
),我的测试 运行 很好!我注意到在每次测试之间,cassandra-unit-spring 删除并重新创建数据库。在我看来,第二次测试发生在重新创建用户定义类型之前,这就是它抛出 NullPointerException 的原因:
2018-05-31 22:43:43.552 INFO 14976 --- [port-Requests-1] o.a.cassandra.service.MigrationManager : Drop Keyspace 'testkeyspace'
2018-05-31 22:43:53.445 INFO 14976 --- [igrationStage:1] o.a.cassandra.utils.memory.BufferPool : Global buffer pool is enabled, when pool is exhausted (max is 512.000MiB) it will allocate on heap
2018-05-31 22:43:53.511 INFO 14976 --- [port-Requests-3] o.a.cassandra.service.MigrationManager : Create new Keyspace: KeyspaceMetadata{name=testkeyspace, params=KeyspaceParams{durable_writes=false, replication=ReplicationParams{class=org.apache.cassandra.locator.SimpleStrategy, replication_factor=1}}, tables=[], views=[], functions=[], types=[]}
2018-05-31 22:43:53.603 INFO 14976 --- [port-Requests-2] o.a.cassandra.service.MigrationManager : Create new table: org.apache.cassandra.config.CFMetaData@1f5471ca[cfId=3d9ed530-653d-11e8-b838-27af117b0453,ksName=testkeyspace,cfName=clientevent,flags=[COMPOUND],params=TableParams{comment=, read_repair_chance=0.0, dclocal_read_repair_chance=0.1, bloom_filter_fp_chance=0.01, crc_check_chance=1.0, gc_grace_seconds=864000, default_time_to_live=0, memtable_flush_period_in_ms=0, min_index_interval=128, max_index_interval=2048, speculative_retry=99PERCENTILE, caching={'keys' : 'ALL', 'rows_per_partition' : 'NONE'}, compaction=CompactionParams{class=org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy, options={min_threshold=4, max_threshold=32}}, compression=org.apache.cassandra.schema.CompressionParams@57a6e29, extensions={}, cdc=false},comparator=comparator(),partitionColumns=[[] | [client createdat name product]],partitionKeyColumns=[id],clusteringColumns=[],keyValidator=org.apache.cassandra.db.marshal.TimeUUIDType,columnMetadata=[product, id, client, name, createdat],droppedColumns={},triggers=[],indexes=[]]
2018-05-31 22:43:53.619 INFO 14976 --- [igrationStage:1] o.apache.cassandra.db.ColumnFamilyStore : Initializing testkeyspace.clientevent
我的配置中是否遗漏了什么?如何避免在测试之间等待一秒钟?
好的,我在 this post on CassandraUnit 之后做了一些更改,这不仅解决了我的问题,而且使我的测试 运行 快得多,因为它只创建一次数据库!
我对 EventsIntegrationTest 进行了以下更改:
@SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
@AutoConfigureStubRunner(ids = ["example.com:clients-service",
"example.com:products-service"], stubsMode = StubsMode.LOCAL)
// no more TestExecutionListeners and cassandra annotations
@ActiveProfiles("test")
class EventsIntegrationTests extends Specification {
@Shared
private static Cluster cluster
@Shared
private static Session session
@Inject
StubTrigger stubTrigger
@Inject
private TestRestTemplate restTemplate
// runs only once before all tests, like JUnit 5's @BeforeAll
def setupSpec() {
// the same code from the post, creates the DB.
if (session == null) {
try {
EmbeddedCassandraServerHelper.startEmbeddedCassandra()
cluster = new Cluster.Builder().addContactPoint("localhost").withPort(9142).build()
session = cluster.connect()
CQLDataLoader loader = new CQLDataLoader(session)
ClassPathCQLDataSet dataSet = new ClassPathCQLDataSet("dataset.cql", true, true, "testkeyspace")
loader.load(dataSet)
} catch (Exception e) {
throw new RuntimeException("Could not start cassandra server or obtain a valid session.", e);
}
}
}
// same tests
// runs after every test like JUnit 5's @BeforeEach
def cleanup() {
// truncates the tables
Collection<TableMetadata> tables = cluster.getMetadata().getKeyspace("testkeyspace").getTables()
// Groovy's clojure, slightly different syntax from Java's Lambda
tables.forEach({session.execute(QueryBuilder.truncate(it))})
}
}
当然,因为我只有一个集成测试,所以所有代码都在那class。但是,如果您有多个涉及 Cassandra 的集成测试,我建议您使用这些方法创建一个基础 class。