Google pubsub 模拟器不断推送相同的消息?
Google pubsub emulator constantly pushes the same messages?
我正在构建一个测试库,它抽象出一些第 3 方资源,例如 Google PubSub。
问题是当我创建主题和订阅时,消息不断到达。示例输出:
ack_id: "projects/test-project-123/subscriptions/my-subscription:2"
message { data: "My message!-1" message_id: "2" publish_time {
seconds: 1614597484 } }
ack_id: "projects/test-project-123/subscriptions/my-subscription:4"
message { data: "My message!-1" message_id: "2" publish_time {
seconds: 1614597484 } }
奇怪的是,发布时间是一样的。
拉取消息的代码:
fun poll(size: Int, subscriptionId: String): List<String> {
val subscriberStubSettings: SubscriberStubSettings = SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build()
GrpcSubscriberStub.create(subscriberStubSettings).use { subscriber ->
val pullRequest: PullRequest = PullRequest.newBuilder()
.setMaxMessages(size)
.setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
.build()
val pullResponse: PullResponse = subscriber.pullCallable().call(pullRequest)
val acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
.addAllAckIds(
pullResponse.receivedMessagesList
.stream()
.map { it.ackId }.toList()
).build()
return pullResponse.receivedMessagesList
.map { it.message.data.toStringUtf8() }
.toList()
}
}
我正在尝试从每个订阅中一条一条地提取消息:
fun purge() {
for (subscription in listSubscriptionIds()) {
var messages = poll(MESSAGE_BATCH_SIZE, subscription)
while (messages.isNotEmpty()) {
messages = poll(MESSAGE_BATCH_SIZE, subscription)
}
}
}
额外功能:
private val channelProvider: TransportChannelProvider
get() {
return FixedTransportChannelProvider
.create(
GrpcTransportChannel.create(channel())
)
}
private fun channel(): ManagedChannel {
return if (channels.isEmpty()) {
val endpoint = emulator.emulatorEndpoint
val channel = ManagedChannelBuilder
.forTarget(endpoint)
.usePlaintext()
.build()
channels.add(channel)
channel
} else {
channels.first()
}
}
var emulator: PubSubEmulatorContainer = PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:latest")
)
我该如何克服呢?是bug还是我哪里做错了?
您的代码应确认您已收到(并且可能已处理)每条消息:
Once a message is sent to a subscriber, the subscriber should acknowledge the message. A message is considered outstanding once it has been sent out for delivery and before a subscriber acknowledges it. Pub/Sub will repeatedly attempt to deliver any message that has not been acknowledged. (ref)
那些重复的尝试就是你在这里看到的,所以看起来确认没有发生。尝试 Acknowledge RPC 调用。
我正在构建一个测试库,它抽象出一些第 3 方资源,例如 Google PubSub。
问题是当我创建主题和订阅时,消息不断到达。示例输出:
ack_id: "projects/test-project-123/subscriptions/my-subscription:2" message { data: "My message!-1" message_id: "2" publish_time { seconds: 1614597484 } }
ack_id: "projects/test-project-123/subscriptions/my-subscription:4" message { data: "My message!-1" message_id: "2" publish_time { seconds: 1614597484 } }
奇怪的是,发布时间是一样的。
拉取消息的代码:
fun poll(size: Int, subscriptionId: String): List<String> {
val subscriberStubSettings: SubscriberStubSettings = SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build()
GrpcSubscriberStub.create(subscriberStubSettings).use { subscriber ->
val pullRequest: PullRequest = PullRequest.newBuilder()
.setMaxMessages(size)
.setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
.build()
val pullResponse: PullResponse = subscriber.pullCallable().call(pullRequest)
val acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
.addAllAckIds(
pullResponse.receivedMessagesList
.stream()
.map { it.ackId }.toList()
).build()
return pullResponse.receivedMessagesList
.map { it.message.data.toStringUtf8() }
.toList()
}
}
我正在尝试从每个订阅中一条一条地提取消息:
fun purge() {
for (subscription in listSubscriptionIds()) {
var messages = poll(MESSAGE_BATCH_SIZE, subscription)
while (messages.isNotEmpty()) {
messages = poll(MESSAGE_BATCH_SIZE, subscription)
}
}
}
额外功能:
private val channelProvider: TransportChannelProvider
get() {
return FixedTransportChannelProvider
.create(
GrpcTransportChannel.create(channel())
)
}
private fun channel(): ManagedChannel {
return if (channels.isEmpty()) {
val endpoint = emulator.emulatorEndpoint
val channel = ManagedChannelBuilder
.forTarget(endpoint)
.usePlaintext()
.build()
channels.add(channel)
channel
} else {
channels.first()
}
}
var emulator: PubSubEmulatorContainer = PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:latest")
)
我该如何克服呢?是bug还是我哪里做错了?
您的代码应确认您已收到(并且可能已处理)每条消息:
Once a message is sent to a subscriber, the subscriber should acknowledge the message. A message is considered outstanding once it has been sent out for delivery and before a subscriber acknowledges it. Pub/Sub will repeatedly attempt to deliver any message that has not been acknowledged. (ref)
那些重复的尝试就是你在这里看到的,所以看起来确认没有发生。尝试 Acknowledge RPC 调用。