Google pubsub 模拟器不断推送相同的消息?

Google pubsub emulator constantly pushes the same messages?

我正在构建一个测试库,它抽象出一些第 3 方资源,例如 Google PubSub。

问题是当我创建主题和订阅时,消息不断到达。示例输出:

ack_id: "projects/test-project-123/subscriptions/my-subscription:2" message { data: "My message!-1" message_id: "2" publish_time { seconds: 1614597484 } }

ack_id: "projects/test-project-123/subscriptions/my-subscription:4" message { data: "My message!-1" message_id: "2" publish_time { seconds: 1614597484 } }

奇怪的是,发布时间是一样的。

拉取消息的代码:

fun poll(size: Int, subscriptionId: String): List<String> {
    val subscriberStubSettings: SubscriberStubSettings = SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(channelProvider)
        .setCredentialsProvider(credentialsProvider)
        .build()
    GrpcSubscriberStub.create(subscriberStubSettings).use { subscriber ->
        val pullRequest: PullRequest = PullRequest.newBuilder()
            .setMaxMessages(size)
            .setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
            .build()
        val pullResponse: PullResponse = subscriber.pullCallable().call(pullRequest)


        val acknowledgeRequest = AcknowledgeRequest.newBuilder()
            .setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
            .addAllAckIds(
                pullResponse.receivedMessagesList
                    .stream()
                    .map { it.ackId }.toList()
            ).build()

        return pullResponse.receivedMessagesList
            .map { it.message.data.toStringUtf8() }
            .toList()
    }
}

我正在尝试从每个订阅中一条一条地提取消息:

fun purge() {
    for (subscription in listSubscriptionIds()) {
        var messages = poll(MESSAGE_BATCH_SIZE, subscription)
        while (messages.isNotEmpty()) {
            messages = poll(MESSAGE_BATCH_SIZE, subscription)
        }
    }
}

额外功能:

private val channelProvider: TransportChannelProvider
    get() {
        return FixedTransportChannelProvider
            .create(
                GrpcTransportChannel.create(channel())
            )
    }

private fun channel(): ManagedChannel {
    return if (channels.isEmpty()) {
        val endpoint = emulator.emulatorEndpoint
        val channel = ManagedChannelBuilder
            .forTarget(endpoint)
            .usePlaintext()
            .build()
        channels.add(channel)
        channel
    } else {
        channels.first()
    }
}

var emulator: PubSubEmulatorContainer = PubSubEmulatorContainer(
        DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:latest")
    )

我该如何克服呢?是bug还是我哪里做错了?

您的代码应确认您已收到(并且可能已处理)每条消息:

Once a message is sent to a subscriber, the subscriber should acknowledge the message. A message is considered outstanding once it has been sent out for delivery and before a subscriber acknowledges it. Pub/Sub will repeatedly attempt to deliver any message that has not been acknowledged. (ref)

那些重复的尝试就是你在这里看到的,所以看起来确认没有发生。尝试 Acknowledge RPC 调用。