如何在消息中重复实现 gRPC/Proto 端点(即请求和响应中的嵌套类型)

how implement a gRPC/Proto endpoint with repeated in message (i.e nested types in request and response)

目标:我想编写一个微服务代码,公开一个接收和响应重复消息的端点。我尝试应用我从 Proto official guide 中学到的知识并编写了这个原型:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionsProto";
option objc_class_prefix = "HLW";

package com.mybank.endpoint;

import "google/protobuf/wrappers.proto";

service TransactionsService {
  rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}

message TransactionsRequest {
  string transactionDesc = 1;
  repeated Transaction transactions = 2;
}
message Transaction {
  string id = 1;
  string name = 2;
  string description = 3;
}

message TransactionsReply {
  string message = 1;
}

我可以 gradle 构建并且我得到了自动生成的这个 TransactionsServiceGrpcKt

package com.mybank.endpoint

import com.mybank.endpoint.TransactionsServiceGrpc.getServiceDescriptor
import io.grpc.CallOptions
import io.grpc.CallOptions.DEFAULT
import io.grpc.Channel
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.ServerServiceDefinition
import io.grpc.ServerServiceDefinition.builder
import io.grpc.ServiceDescriptor
import io.grpc.Status.UNIMPLEMENTED
import io.grpc.StatusException
import io.grpc.kotlin.AbstractCoroutineServerImpl
import io.grpc.kotlin.AbstractCoroutineStub
import io.grpc.kotlin.ClientCalls.unaryRpc
import io.grpc.kotlin.ServerCalls.unaryServerMethodDefinition
import io.grpc.kotlin.StubFor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmOverloads
import kotlin.jvm.JvmStatic

/**
 * Holder for Kotlin coroutine-based client and server APIs for
 * com.mybank.endpoint.TransactionsService.
 */
object TransactionsServiceGrpcKt {
  @JvmStatic
  val serviceDescriptor: ServiceDescriptor
    get() = TransactionsServiceGrpc.getServiceDescriptor()

  val postTransactionsMethod: MethodDescriptor<TransactionsRequest, TransactionsReply>
    @JvmStatic
    get() = TransactionsServiceGrpc.getPostTransactionsMethod()

  /**
   * A stub for issuing RPCs to a(n) com.mybank.endpoint.TransactionsService service as suspending
   * coroutines.
   */
  @StubFor(TransactionsServiceGrpc::class)
  class TransactionsServiceCoroutineStub @JvmOverloads constructor(
    channel: Channel,
    callOptions: CallOptions = DEFAULT
  ) : AbstractCoroutineStub<TransactionsServiceCoroutineStub>(channel, callOptions) {
    override fun build(channel: Channel, callOptions: CallOptions): TransactionsServiceCoroutineStub
        = TransactionsServiceCoroutineStub(channel, callOptions)

    /**
     * Executes this RPC and returns the response message, suspending until the RPC completes
     * with [`Status.OK`][io.grpc.Status].  If the RPC completes with another status, a
     * corresponding
     * [StatusException] is thrown.  If this coroutine is cancelled, the RPC is also cancelled
     * with the corresponding exception as a cause.
     *
     * @param request The request message to send to the server.
     *
     * @return The single response from the server.
     */
    suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = unaryRpc(
      channel,
      TransactionsServiceGrpc.getPostTransactionsMethod(),
      request,
      callOptions,
      Metadata()
    )}

  /**
   * Skeletal implementation of the com.mybank.endpoint.TransactionsService service based on Kotlin
   * coroutines.
   */
  abstract class TransactionsServiceCoroutineImplBase(
    coroutineContext: CoroutineContext = EmptyCoroutineContext
  ) : AbstractCoroutineServerImpl(coroutineContext) {
    /**
     * Returns the response to an RPC for com.mybank.endpoint.TransactionsService.PostTransactions.
     *
     * If this method fails with a [StatusException], the RPC will fail with the corresponding
     * [io.grpc.Status].  If this method fails with a [java.util.concurrent.CancellationException],
     * the RPC will fail
     * with status `Status.CANCELLED`.  If this method fails for any other reason, the RPC will
     * fail with `Status.UNKNOWN` with the exception as a cause.
     *
     * @param request The request from the client.
     */
    open suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = throw
        StatusException(UNIMPLEMENTED.withDescription("Method com.mybank.endpoint.TransactionsService.PostTransactions is unimplemented"))

    final override fun bindService(): ServerServiceDefinition = builder(getServiceDescriptor())
      .addMethod(unaryServerMethodDefinition(
      context = this.context,
      descriptor = TransactionsServiceGrpc.getPostTransactionsMethod(),
      implementation = ::postTransactions
    )).build()
  }
}

到目前为止一切顺利。现在我想实现它,我完成卡住了。

这是所有三个暂定及其错误

package com.mybank.endpoint

import io.grpc.stub.StreamObserver
import javax.inject.Singleton

@Singleton
class TransactionsEndpoint : TransactionsServiceGrpc.TransactionsServiceImplBase(){

    //First tentative
    //This complains "'postTransactions' overrides nothing" and IntelliJ suggest second next approach
    //override fun postTransactions(request: TransactionsRequest?) : TransactionsReply {

    //Second Tentative
//    override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
//        //it complains Type mismatch... Found: TransactionsReply
//        return TransactionsReply.newBuilder().setMessage("teste").build()
//    }

    //Third Tentative
    //This causes:
    //Return type is 'TransactionsReply', which is not a subtype of overridden public open
    // fun postTransactions(request: TransactionsRequest!, responseObserver: StreamObserver<TransactionsReply!>!):
    // Unit defined in com.mybank.endpoint.TransactionsServiceGrpc.TransactionsServiceImplBase
    //override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) : TransactionsReply {
//        return TransactionsReply.newBuilder().setMessage("teste").build()
//    }
}

gradle.build

plugins {
    id "org.jetbrains.kotlin.jvm" version "1.3.72"
    id "org.jetbrains.kotlin.kapt" version "1.3.72"
    id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
    id "application"
    id 'com.google.protobuf' version '0.8.13'
}

version "0.2"
group "account-control"

repositories {
    mavenLocal()
    jcenter()
}

configurations {
    // for dependencies that are needed for development only
    developmentOnly
}

dependencies {
    kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
    kapt("io.micronaut:micronaut-inject-java")
    kapt("io.micronaut:micronaut-validation")

    implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
    implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
    implementation("io.micronaut:micronaut-runtime")
//    implementation("io.micronaut.grpc:micronaut-grpc-runtime")
    implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
    implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
    implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")

    //Kafka
    implementation("io.micronaut.kafka:micronaut-kafka")

    runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
    runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")

    kaptTest("io.micronaut:micronaut-inject-java")

    testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
    testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
    testImplementation("io.micronaut.test:micronaut-test-junit5")
    testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")

    testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
    testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}

test.classpath += configurations.developmentOnly

mainClassName = "account-control.Application"

test {
    useJUnitPlatform()
}

allOpen {
    annotation("io.micronaut.aop.Around")
}

compileKotlin {
    kotlinOptions {
        jvmTarget = '11' 
        //Will retain parameter names for Java reflection
        javaParameters = true 
    }
}
//compileKotlin.dependsOn(generateProto)

compileTestKotlin {
    kotlinOptions {
        jvmTarget = '11' 
        javaParameters = true 
    }
}

tasks.withType(JavaExec) {
    classpath += configurations.developmentOnly
    jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}

sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/grpckt'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}

protobuf {
    protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
    plugins {
        grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
        grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
            grpckt {}
        }
    }
}

我已经成功地创建了我的第一个 grpc 端点,它具有基于字符串的非常基本的 request/reply,现在我想通过创建消息列表来继续前进。作为类比,假设我想要一个包含实体列表的 DTO/Pojo。

老实说,我完全被卡住了。所以,我的主要问题是:如何实现带有重复消息的原型服务?

一个有用的评论是,为什么我在自动生成的存根中看到一个要用“...,responseObserver:StreamObserver?”实现的方法?而不是我在原型中明确指定的简单“TransactionsReply”? StreamObserver和repeated message有什么关系?

这是整个项目in my GitHub develop branch

您会发现两个原型:一个使用简单 request/reply 且非常成功的实现,另一个如上所述失败。

*** 在 Louis

的第一个回答后编辑

我很困惑。 使用简单的原型 as

...
service Account {
  rpc SendDebit (DebitRequest) returns (DebitReply) {}
}

message DebitRequest {
  string name = 1;
}

message DebitReply {
  string message = 1;
}

我可以用

实现
override suspend fun sendDebit(request: DebitRequest): DebitReply {
    return DebitReply.newBuilder().setMessage("teste").build()
}

尽管如此,

...
service TransactionsService {
  rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}

message TransactionsRequest {
  string transactionDesc = 1;
  repeated Transaction transactions = 2;
}
message Transaction {
  string id = 1;
  string name = 2;
  string description = 3;
}

message TransactionsReply {
  string message = 1;
}

我不能用相同类型的回复覆盖(注意回复是完全一样的)

IntelliJ 没有提出此实现

override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
        super.postTransactions(request, responseObserver)
        return TransactionsReply.newBuilder().setMessage("testReply").build()
    }

也没有类似的响应方法

override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) :TransactionsReply {
    super.postTransactions(request, responseObserver)
    return TransactionsReply.newBuilder().setMessage("testReply").build()
}

最重要的是,如果回复完全相同,为什么在我的第一种方法中我没有从 IntelliJ 获得 StreamObserver?

*** 感谢 Louis 的帮助,最终解决方案。我扩展了错误的摘要class

override suspend fun postTransactions(request: TransactionsRequest): TransactionsReply {
    return TransactionsReply.newBuilder().setMessage("testReply").build()
}

您正在寻找

class TransactionsEndpoint : TransactionsServiceGrpcKt.TransactionsServiceCoroutineImplBase(){
   override suspend fun postTransactions(request: TransactionsRequest) : TransactionsReply {
      ...
   }
}

...在请求中使用 suspend 修饰符并且没有 ?,使用 Coroutine 扩展 TransactionsServiceCoroutineImplBase

请注意,这与重复消息无关。你的 RPC 有一个 single 输入原型,它只发送一次——这很可能就是你想要的,除非你想要一个 stream 请求,在这种情况下,您的原型文件应该看起来像

service TransactionsService {
  rpc PostTransactions(stream TransactionsRequest) returns (TransactionsReply);
}

...在这种情况下,Kotlin 生成的代码看起来会有所不同。