Micronaut 3:如何使用 PubSubEmulatorContainer
Micronaut 3: How to use PubSubEmulatorContainer
更新:Link 回购已移至答案,因为回购现在已使用以下答案中的代码更新。
问题描述
当前代码有效,但它使用 google/cloud-sdk 的 gcloud beta 模拟器 pubsub 进行集成测试。
- 由于 google/cloud-sdk 图片的大小,集成测试很慢
- pubsub 模拟器必须 运行 在固定端口上,似乎没有办法告诉 Micronaut 模拟器 运行 正在
上的哪个端口
我需要在 maven-surefire-plugin
中设置以下环境变量。
<environmentVariables>
<PUBSUB_EMULATOR_HOST>localhost:8085</PUBSUB_EMULATOR_HOST>
</environmentVariables>
如何在 Spring Boot
中完成
根据测试容器 | Gcloud Module,在Spring Boot中使用PubSubEmulatorContainer实现集成测试的正确方式是这样的:
https://github.com/saturnism/testcontainers-gcloud-examples/blob/main/springboot/pubsub-example/src/test/java/com/example/springboot/pubsub/PubSubIntegrationTests.java
这将在随机端口上启动容器,这是可能的,因为 Spring 中的 DynamicPropertyRegistry
。 Micronaut 似乎缺少这种可能性。
文档:https://www.testcontainers.org/modules/gcloud/
我正在寻找在 Micronaut 3.x 中实现的 JUnit5 或 Spock 集成测试的工作示例,它使用 PubSubEmulatorContainer
,如上述文档中所述。
相关文档:https://micronaut-projects.github.io/micronaut-gcp/latest/guide/#emulator
关于 GitHub 配置 TransportChannelProvider
有一些评论。我能够注入一个实例并检查它,但我仍然没有找到确切的操作。
到目前为止,这些是最近的线索:
https://github.com/micronaut-projects/micronaut-gcp/issues/257
https://github.com/micronaut-projects/micronaut-gcp/pull/259
TL;DR
我们需要先启动测试容器,获取模拟器主机地址,然后像这样调用ApplicationContext.run
:
applicationContext = ApplicationContext.run(
["pubsub.emulator.host": emulatorHost])
带有示例代码的小型 Github 回购:https://github.com/roar-skinderviken/pubsub-emulator-demo
带代码的长答案
我终于设法使用 Micronaut 3.0.2 和 Spock 制定了一个可行的解决方案。一个相关的 Micronaut PR 让我走上正轨,连同这篇文章:Micronaut 测试最佳实践 https://objectcomputing.com/files/9815/9259/7089/slide_deck_Micronaut_Testing_Best_Practices_webinar.pdf
首先是 PubSubEmulator class (Groovy)
package no.myproject.testframework.testcontainers
import org.testcontainers.containers.PubSubEmulatorContainer
import org.testcontainers.utility.DockerImageName
class PubSubEmulator {
static PubSubEmulatorContainer pubSubEmulatorContainer
static init() {
if (pubSubEmulatorContainer == null) {
pubSubEmulatorContainer = new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:emulators"))
pubSubEmulatorContainer.start()
}
}
}
然后是 PubSubEmulator 的固定装置 (Groovy)
package no.myproject.testframework.testcontainers
trait PubSubEmulatorFixture {
Map<String, Object> getPubSubConfiguration() {
if (PubSubEmulator.pubSubEmulatorContainer == null || !PubSubEmulator.pubSubEmulatorContainer.isRunning()) {
PubSubEmulator.init()
}
[
"pubsub.emulator-host": PubSubEmulator.pubSubEmulatorContainer.getEmulatorEndpoint()
]
}
}
然后是启动容器、创建主题和订阅的规范 class (Groovy)。
这里的线索是在调用ApplicationContext.run
时传入pubsub.emulator.host
作为配置的一部分。
代码的其余部分与我在问题中链接到的 Spring 引导示例非常相似。
package no.myproject.testframework
import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.FixedTransportChannelProvider
import com.google.cloud.pubsub.v1.SubscriptionAdminClient
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.cloud.pubsub.v1.TopicAdminClient
import com.google.cloud.pubsub.v1.TopicAdminSettings
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.PushConfig
import com.google.pubsub.v1.TopicName
import io.grpc.ManagedChannelBuilder
import io.micronaut.context.ApplicationContext
import no.myproject.configuration.GcpConfigProperties
import no.myproject.configuration.PubSubConfigProperties
import no.myproject.testframework.testcontainers.PubSubEmulatorFixture
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
abstract class PubSubSpecification extends Specification
implements PubSubEmulatorFixture, EnvironmentFixture {
@AutoCleanup
@Shared
EmbeddedServer embeddedServer
@AutoCleanup
@Shared
ApplicationContext applicationContext
def setupSpec() {
// start the pubsub emulator
def emulatorHost = getPubSubConfiguration().get("pubsub.emulator-host")
// start a temporary applicationContext in order to read config
// keep any pubsub subscriptions out of context at this stage
applicationContext = ApplicationContext.run()
def gcpConfigProperties = applicationContext.getBean(GcpConfigProperties)
def pubSubConfigProperties = applicationContext.getBean(PubSubConfigProperties)
def channel = ManagedChannelBuilder.forTarget("dns:///" + emulatorHost)
.usePlaintext()
.build()
def channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))
// START creating topic
def topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider)
.build())
def topic = TopicName.of(
gcpConfigProperties.getProjectId(),
pubSubConfigProperties.getTopicName())
try {
topicAdminClient.createTopic(topic)
} catch (AlreadyExistsException) {
// this is fine, already created
topicAdminClient.getTopic(topic)
}
// START creating subscription
pubSubConfigProperties.getSubscriptionNames().forEach(it -> {
def subscription =
ProjectSubscriptionName.of(gcpConfigProperties.getProjectId(), it)
def subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build())
try {
subscriptionAdminClient
.createSubscription(
subscription,
topic,
PushConfig.getDefaultInstance(),
100)
System.out.println("Subscription created " + subscriptionAdminClient.getSubscription(subscription))
} catch (AlreadyExistsException) {
// this is fine, already created
subscriptionAdminClient.getSubscription(subscription)
}
})
channel.shutdown()
// stop the temporary applicationContext
applicationContext.stop()
// start the actual applicationContext
embeddedServer = ApplicationContext.run(
EmbeddedServer,
[
'spec.name' : "PubSubEmulatorSpec",
"pubsub.emulator.host": emulatorHost
],
environments)
applicationContext = embeddedServer.applicationContext
}
}
然后工厂 class (Groovy) 用于模拟凭据
package no.myproject.pubsub
import com.google.auth.oauth2.AccessToken
import com.google.auth.oauth2.GoogleCredentials
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import javax.inject.Singleton
@Factory
@Requires(property = 'spec.name', value = 'PubSubEmulatorSpec')
class EmptyCredentialsFactory {
@Singleton
@Replaces(GoogleCredentials)
GoogleCredentials mockCredentials() {
return GoogleCredentials.create(new AccessToken("", new Date()))
}
}
最后,Spock 测试规范。
package no.myproject.pubsub
import no.myproject.testframework.PubSubSpecification
import java.util.stream.IntStream
class PubSubIntegrationSpec extends PubSubSpecification {
def NUMBER_OF_MESSAGES_IN_TEST = 5
def DELAY_IN_MILLISECONDS_PER_MSG = 100
def "when a number of messages is sent, same amount of messages is received"() {
given:
def documentPublisher = applicationContext.getBean(DocumentPublisher)
def listener = applicationContext.getBean(IncomingDocListenerWithAck)
def initialReceiveCount = listener.getReceiveCount()
when:
IntStream.rangeClosed(1, NUMBER_OF_MESSAGES_IN_TEST)
.forEach(it -> documentPublisher.send("Hello World!"))
// wait a bit in order to let all messages propagate through the queue
Thread.sleep(NUMBER_OF_MESSAGES_IN_TEST * DELAY_IN_MILLISECONDS_PER_MSG)
then:
NUMBER_OF_MESSAGES_IN_TEST == listener.getReceiveCount() - initialReceiveCount
}
}
更新:Link 回购已移至答案,因为回购现在已使用以下答案中的代码更新。
问题描述
当前代码有效,但它使用 google/cloud-sdk 的 gcloud beta 模拟器 pubsub 进行集成测试。
- 由于 google/cloud-sdk 图片的大小,集成测试很慢
- pubsub 模拟器必须 运行 在固定端口上,似乎没有办法告诉 Micronaut 模拟器 运行 正在 上的哪个端口
我需要在 maven-surefire-plugin
中设置以下环境变量。
<environmentVariables>
<PUBSUB_EMULATOR_HOST>localhost:8085</PUBSUB_EMULATOR_HOST>
</environmentVariables>
如何在 Spring Boot
中完成根据测试容器 | Gcloud Module,在Spring Boot中使用PubSubEmulatorContainer实现集成测试的正确方式是这样的: https://github.com/saturnism/testcontainers-gcloud-examples/blob/main/springboot/pubsub-example/src/test/java/com/example/springboot/pubsub/PubSubIntegrationTests.java
这将在随机端口上启动容器,这是可能的,因为 Spring 中的 DynamicPropertyRegistry
。 Micronaut 似乎缺少这种可能性。
文档:https://www.testcontainers.org/modules/gcloud/
我正在寻找在 Micronaut 3.x 中实现的 JUnit5 或 Spock 集成测试的工作示例,它使用 PubSubEmulatorContainer
,如上述文档中所述。
相关文档:https://micronaut-projects.github.io/micronaut-gcp/latest/guide/#emulator
关于 GitHub 配置 TransportChannelProvider
有一些评论。我能够注入一个实例并检查它,但我仍然没有找到确切的操作。
到目前为止,这些是最近的线索: https://github.com/micronaut-projects/micronaut-gcp/issues/257 https://github.com/micronaut-projects/micronaut-gcp/pull/259
TL;DR
我们需要先启动测试容器,获取模拟器主机地址,然后像这样调用ApplicationContext.run
:
applicationContext = ApplicationContext.run(
["pubsub.emulator.host": emulatorHost])
带有示例代码的小型 Github 回购:https://github.com/roar-skinderviken/pubsub-emulator-demo
带代码的长答案
我终于设法使用 Micronaut 3.0.2 和 Spock 制定了一个可行的解决方案。一个相关的 Micronaut PR 让我走上正轨,连同这篇文章:Micronaut 测试最佳实践 https://objectcomputing.com/files/9815/9259/7089/slide_deck_Micronaut_Testing_Best_Practices_webinar.pdf
首先是 PubSubEmulator class (Groovy)
package no.myproject.testframework.testcontainers
import org.testcontainers.containers.PubSubEmulatorContainer
import org.testcontainers.utility.DockerImageName
class PubSubEmulator {
static PubSubEmulatorContainer pubSubEmulatorContainer
static init() {
if (pubSubEmulatorContainer == null) {
pubSubEmulatorContainer = new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:emulators"))
pubSubEmulatorContainer.start()
}
}
}
然后是 PubSubEmulator 的固定装置 (Groovy)
package no.myproject.testframework.testcontainers
trait PubSubEmulatorFixture {
Map<String, Object> getPubSubConfiguration() {
if (PubSubEmulator.pubSubEmulatorContainer == null || !PubSubEmulator.pubSubEmulatorContainer.isRunning()) {
PubSubEmulator.init()
}
[
"pubsub.emulator-host": PubSubEmulator.pubSubEmulatorContainer.getEmulatorEndpoint()
]
}
}
然后是启动容器、创建主题和订阅的规范 class (Groovy)。
这里的线索是在调用ApplicationContext.run
时传入pubsub.emulator.host
作为配置的一部分。
代码的其余部分与我在问题中链接到的 Spring 引导示例非常相似。
package no.myproject.testframework
import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.FixedTransportChannelProvider
import com.google.cloud.pubsub.v1.SubscriptionAdminClient
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.cloud.pubsub.v1.TopicAdminClient
import com.google.cloud.pubsub.v1.TopicAdminSettings
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.PushConfig
import com.google.pubsub.v1.TopicName
import io.grpc.ManagedChannelBuilder
import io.micronaut.context.ApplicationContext
import no.myproject.configuration.GcpConfigProperties
import no.myproject.configuration.PubSubConfigProperties
import no.myproject.testframework.testcontainers.PubSubEmulatorFixture
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
abstract class PubSubSpecification extends Specification
implements PubSubEmulatorFixture, EnvironmentFixture {
@AutoCleanup
@Shared
EmbeddedServer embeddedServer
@AutoCleanup
@Shared
ApplicationContext applicationContext
def setupSpec() {
// start the pubsub emulator
def emulatorHost = getPubSubConfiguration().get("pubsub.emulator-host")
// start a temporary applicationContext in order to read config
// keep any pubsub subscriptions out of context at this stage
applicationContext = ApplicationContext.run()
def gcpConfigProperties = applicationContext.getBean(GcpConfigProperties)
def pubSubConfigProperties = applicationContext.getBean(PubSubConfigProperties)
def channel = ManagedChannelBuilder.forTarget("dns:///" + emulatorHost)
.usePlaintext()
.build()
def channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))
// START creating topic
def topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider)
.build())
def topic = TopicName.of(
gcpConfigProperties.getProjectId(),
pubSubConfigProperties.getTopicName())
try {
topicAdminClient.createTopic(topic)
} catch (AlreadyExistsException) {
// this is fine, already created
topicAdminClient.getTopic(topic)
}
// START creating subscription
pubSubConfigProperties.getSubscriptionNames().forEach(it -> {
def subscription =
ProjectSubscriptionName.of(gcpConfigProperties.getProjectId(), it)
def subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build())
try {
subscriptionAdminClient
.createSubscription(
subscription,
topic,
PushConfig.getDefaultInstance(),
100)
System.out.println("Subscription created " + subscriptionAdminClient.getSubscription(subscription))
} catch (AlreadyExistsException) {
// this is fine, already created
subscriptionAdminClient.getSubscription(subscription)
}
})
channel.shutdown()
// stop the temporary applicationContext
applicationContext.stop()
// start the actual applicationContext
embeddedServer = ApplicationContext.run(
EmbeddedServer,
[
'spec.name' : "PubSubEmulatorSpec",
"pubsub.emulator.host": emulatorHost
],
environments)
applicationContext = embeddedServer.applicationContext
}
}
然后工厂 class (Groovy) 用于模拟凭据
package no.myproject.pubsub
import com.google.auth.oauth2.AccessToken
import com.google.auth.oauth2.GoogleCredentials
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import javax.inject.Singleton
@Factory
@Requires(property = 'spec.name', value = 'PubSubEmulatorSpec')
class EmptyCredentialsFactory {
@Singleton
@Replaces(GoogleCredentials)
GoogleCredentials mockCredentials() {
return GoogleCredentials.create(new AccessToken("", new Date()))
}
}
最后,Spock 测试规范。
package no.myproject.pubsub
import no.myproject.testframework.PubSubSpecification
import java.util.stream.IntStream
class PubSubIntegrationSpec extends PubSubSpecification {
def NUMBER_OF_MESSAGES_IN_TEST = 5
def DELAY_IN_MILLISECONDS_PER_MSG = 100
def "when a number of messages is sent, same amount of messages is received"() {
given:
def documentPublisher = applicationContext.getBean(DocumentPublisher)
def listener = applicationContext.getBean(IncomingDocListenerWithAck)
def initialReceiveCount = listener.getReceiveCount()
when:
IntStream.rangeClosed(1, NUMBER_OF_MESSAGES_IN_TEST)
.forEach(it -> documentPublisher.send("Hello World!"))
// wait a bit in order to let all messages propagate through the queue
Thread.sleep(NUMBER_OF_MESSAGES_IN_TEST * DELAY_IN_MILLISECONDS_PER_MSG)
then:
NUMBER_OF_MESSAGES_IN_TEST == listener.getReceiveCount() - initialReceiveCount
}
}