EmbeddedKafka AdminClient 在 Spring 应用程序启动测试之前关闭
EmbeddedKafka AdminClient shuts down before Spring app starts for tests
我正在尝试为 Spring Kafka 应用程序(Spring Boot 2.0.6,Spring Kafka 2.1.10)编写集成测试,我看到了很多实例INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x166e432ebec0001 type:create cxid:0x5e zxid:0x24 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-topic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/my-topic/partitions
和 Spring 应用程序启动前在日志中显示的各种路径(/brokers
、/brokers/topics
等)。然后 AdminClient 关闭并记录此消息:
DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=0] Connection with /127.0.0.1 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at kafka.network.Processor.poll(SocketServer.scala:575)
at kafka.network.Processor.run(SocketServer.scala:492)
at java.lang.Thread.run(Thread.java:748)
我在测试中使用 @ClassRule 启动选项,如下所示:
@ClassRule
@Shared
private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 'my-topic')
,自动装配 KafkaTemplate
,并根据嵌入的 Kafka 值设置连接的 Spring 属性:
def setupSpec() {
System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
System.setProperty('spring.cloud.stream.kafka.binder.zkNodes', embeddedKafka.getZookeeperConnectionString());
}
Spring 应用程序启动后,我可以再次看到用户级 KeeperException 消息的实例:o.a.z.server.PrepRequestProcessor : Got user-level KeeperException when processing sessionid:0x166e445836d0001 type:setData cxid:0x6b zxid:0x2b txntype:-1 reqpath:n/a Error Path:/config/topics/__consumer_offsets Error:KeeperErrorCode = NoNode for /config/topics/__consumer_offsets
。
知道我哪里出错了吗?我可以提供其他设置信息和日志消息,但只是对最初可能最有帮助的内容进行了有根据的猜测。
我不熟悉 Spock,但我知道 @KafkaListener
方法是在它自己的线程上调用的,因此你不能直接在 then:
块中断言它。
您需要以某种方式确保测试用例中的阻塞等待。
我尝试使用 BlockingVariable
来对抗真正的服务而不是模拟,我在日志中看到你的 println(message)
。但是 BlockingVariable
仍然对我不起作用:
@DirtiesContext
@SpringBootTest(classes = [KafkaIntTestApplication.class])
@ActiveProfiles('test')
class CustomListenerSpec extends Specification {
@ClassRule
@Shared
public KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, 'my-topic')
@Autowired
private KafkaTemplate<String, String> template
@SpyBean
private SimpleService service
final def TOPIC_NAME = 'my-topic'
def setupSpec() {
System.setProperty('spring.kafka.bootstrapServers', embeddedKafka.getBrokersAsString());
}
def 'Sample test'() {
given:
def testMessagePayload = "Test message"
def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
def result = new BlockingVariable<Boolean>(5)
service.handleMessage(_) >> {
result.set(true)
}
when: 'We put a message on the topic'
template.send(message)
then: 'the service should be called'
result.get()
}
}
日志是这样的:
2018-11-05 13:38:51.089 INFO 8888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [my-topic-0, my-topic-1]
Test message
BlockingVariable.get() timed out after 5,00 seconds
at spock.util.concurrent.BlockingVariable.get(BlockingVariable.java:113)
at com.example.CustomListenerSpec.Sample test(CustomListenerSpec.groovy:54)
2018-11-05 13:38:55.917 INFO 8888 --- [ main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@11ebb1b6: startup date [Mon Nov 05 13:38:49 EST 2018]; root of context hierarchy
我还必须添加此依赖项:
testImplementation "org.hamcrest:hamcrest-core"
更新
好的。真正的问题是 MockConfig
对于测试上下文配置不可见,而 @Import(MockConfig.class)
可以解决问题。其中 @Primary
还为我们提供了额外的信号,即在测试 class.
中选择什么 bean 进行注入
@ArtemBilan 的回复让我走上了正确的道路,所以感谢他的插话,我在查看其他 BlockingVariable
文章和示例后能够弄明白。我在模拟的响应中使用 BlockingVariable
而不是作为回调。当调用模拟的响应时,将值设置为 true,then
块将执行 result.get()
并且测试通过。
@DirtiesContext
@ActiveProfiles('test')
@SpringBootTest
@Import(MockConfig.class)
class CustomListenerSpec extends TestSpecBase {
@ClassRule
@Shared
private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, TOPIC_NAME)
@Autowired
private KafkaTemplate<String, String> template
@Autowired
private SimpleService service
final def TOPIC_NAME = 'my-topic'
def setupSpec() {
System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
}
def 'Sample test'() {
def testMessagePayload = "Test message"
def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
def result = new BlockingVariable<Boolean>(5)
service.handleMessage(_ as String) >> {
result.set(true)
}
when: 'We put a message on the topic'
template.send(message)
then: 'the service should be called'
result.get()
}
}
我正在尝试为 Spring Kafka 应用程序(Spring Boot 2.0.6,Spring Kafka 2.1.10)编写集成测试,我看到了很多实例INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x166e432ebec0001 type:create cxid:0x5e zxid:0x24 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-topic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/my-topic/partitions
和 Spring 应用程序启动前在日志中显示的各种路径(/brokers
、/brokers/topics
等)。然后 AdminClient 关闭并记录此消息:
DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=0] Connection with /127.0.0.1 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at kafka.network.Processor.poll(SocketServer.scala:575)
at kafka.network.Processor.run(SocketServer.scala:492)
at java.lang.Thread.run(Thread.java:748)
我在测试中使用 @ClassRule 启动选项,如下所示:
@ClassRule
@Shared
private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 'my-topic')
,自动装配 KafkaTemplate
,并根据嵌入的 Kafka 值设置连接的 Spring 属性:
def setupSpec() {
System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
System.setProperty('spring.cloud.stream.kafka.binder.zkNodes', embeddedKafka.getZookeeperConnectionString());
}
Spring 应用程序启动后,我可以再次看到用户级 KeeperException 消息的实例:o.a.z.server.PrepRequestProcessor : Got user-level KeeperException when processing sessionid:0x166e445836d0001 type:setData cxid:0x6b zxid:0x2b txntype:-1 reqpath:n/a Error Path:/config/topics/__consumer_offsets Error:KeeperErrorCode = NoNode for /config/topics/__consumer_offsets
。
知道我哪里出错了吗?我可以提供其他设置信息和日志消息,但只是对最初可能最有帮助的内容进行了有根据的猜测。
我不熟悉 Spock,但我知道 @KafkaListener
方法是在它自己的线程上调用的,因此你不能直接在 then:
块中断言它。
您需要以某种方式确保测试用例中的阻塞等待。
我尝试使用 BlockingVariable
来对抗真正的服务而不是模拟,我在日志中看到你的 println(message)
。但是 BlockingVariable
仍然对我不起作用:
@DirtiesContext
@SpringBootTest(classes = [KafkaIntTestApplication.class])
@ActiveProfiles('test')
class CustomListenerSpec extends Specification {
@ClassRule
@Shared
public KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, 'my-topic')
@Autowired
private KafkaTemplate<String, String> template
@SpyBean
private SimpleService service
final def TOPIC_NAME = 'my-topic'
def setupSpec() {
System.setProperty('spring.kafka.bootstrapServers', embeddedKafka.getBrokersAsString());
}
def 'Sample test'() {
given:
def testMessagePayload = "Test message"
def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
def result = new BlockingVariable<Boolean>(5)
service.handleMessage(_) >> {
result.set(true)
}
when: 'We put a message on the topic'
template.send(message)
then: 'the service should be called'
result.get()
}
}
日志是这样的:
2018-11-05 13:38:51.089 INFO 8888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [my-topic-0, my-topic-1]
Test message
BlockingVariable.get() timed out after 5,00 seconds
at spock.util.concurrent.BlockingVariable.get(BlockingVariable.java:113)
at com.example.CustomListenerSpec.Sample test(CustomListenerSpec.groovy:54)
2018-11-05 13:38:55.917 INFO 8888 --- [ main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@11ebb1b6: startup date [Mon Nov 05 13:38:49 EST 2018]; root of context hierarchy
我还必须添加此依赖项:
testImplementation "org.hamcrest:hamcrest-core"
更新
好的。真正的问题是 MockConfig
对于测试上下文配置不可见,而 @Import(MockConfig.class)
可以解决问题。其中 @Primary
还为我们提供了额外的信号,即在测试 class.
@ArtemBilan 的回复让我走上了正确的道路,所以感谢他的插话,我在查看其他 BlockingVariable
文章和示例后能够弄明白。我在模拟的响应中使用 BlockingVariable
而不是作为回调。当调用模拟的响应时,将值设置为 true,then
块将执行 result.get()
并且测试通过。
@DirtiesContext
@ActiveProfiles('test')
@SpringBootTest
@Import(MockConfig.class)
class CustomListenerSpec extends TestSpecBase {
@ClassRule
@Shared
private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, TOPIC_NAME)
@Autowired
private KafkaTemplate<String, String> template
@Autowired
private SimpleService service
final def TOPIC_NAME = 'my-topic'
def setupSpec() {
System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
}
def 'Sample test'() {
def testMessagePayload = "Test message"
def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
def result = new BlockingVariable<Boolean>(5)
service.handleMessage(_ as String) >> {
result.set(true)
}
when: 'We put a message on the topic'
template.send(message)
then: 'the service should be called'
result.get()
}
}