如何在消息中重复实现 gRPC/Proto 端点(即请求和响应中的嵌套类型)
how implement a gRPC/Proto endpoint with repeated in message (i.e nested types in request and response)
目标:我想编写一个微服务代码,公开一个接收和响应重复消息的端点。我尝试应用我从 Proto official guide 中学到的知识并编写了这个原型:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionsProto";
option objc_class_prefix = "HLW";
package com.mybank.endpoint;
import "google/protobuf/wrappers.proto";
service TransactionsService {
rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}
message TransactionsRequest {
string transactionDesc = 1;
repeated Transaction transactions = 2;
}
message Transaction {
string id = 1;
string name = 2;
string description = 3;
}
message TransactionsReply {
string message = 1;
}
我可以 gradle 构建并且我得到了自动生成的这个 TransactionsServiceGrpcKt
package com.mybank.endpoint
import com.mybank.endpoint.TransactionsServiceGrpc.getServiceDescriptor
import io.grpc.CallOptions
import io.grpc.CallOptions.DEFAULT
import io.grpc.Channel
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.ServerServiceDefinition
import io.grpc.ServerServiceDefinition.builder
import io.grpc.ServiceDescriptor
import io.grpc.Status.UNIMPLEMENTED
import io.grpc.StatusException
import io.grpc.kotlin.AbstractCoroutineServerImpl
import io.grpc.kotlin.AbstractCoroutineStub
import io.grpc.kotlin.ClientCalls.unaryRpc
import io.grpc.kotlin.ServerCalls.unaryServerMethodDefinition
import io.grpc.kotlin.StubFor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmOverloads
import kotlin.jvm.JvmStatic
/**
* Holder for Kotlin coroutine-based client and server APIs for
* com.mybank.endpoint.TransactionsService.
*/
object TransactionsServiceGrpcKt {
@JvmStatic
val serviceDescriptor: ServiceDescriptor
get() = TransactionsServiceGrpc.getServiceDescriptor()
val postTransactionsMethod: MethodDescriptor<TransactionsRequest, TransactionsReply>
@JvmStatic
get() = TransactionsServiceGrpc.getPostTransactionsMethod()
/**
* A stub for issuing RPCs to a(n) com.mybank.endpoint.TransactionsService service as suspending
* coroutines.
*/
@StubFor(TransactionsServiceGrpc::class)
class TransactionsServiceCoroutineStub @JvmOverloads constructor(
channel: Channel,
callOptions: CallOptions = DEFAULT
) : AbstractCoroutineStub<TransactionsServiceCoroutineStub>(channel, callOptions) {
override fun build(channel: Channel, callOptions: CallOptions): TransactionsServiceCoroutineStub
= TransactionsServiceCoroutineStub(channel, callOptions)
/**
* Executes this RPC and returns the response message, suspending until the RPC completes
* with [`Status.OK`][io.grpc.Status]. If the RPC completes with another status, a
* corresponding
* [StatusException] is thrown. If this coroutine is cancelled, the RPC is also cancelled
* with the corresponding exception as a cause.
*
* @param request The request message to send to the server.
*
* @return The single response from the server.
*/
suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = unaryRpc(
channel,
TransactionsServiceGrpc.getPostTransactionsMethod(),
request,
callOptions,
Metadata()
)}
/**
* Skeletal implementation of the com.mybank.endpoint.TransactionsService service based on Kotlin
* coroutines.
*/
abstract class TransactionsServiceCoroutineImplBase(
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractCoroutineServerImpl(coroutineContext) {
/**
* Returns the response to an RPC for com.mybank.endpoint.TransactionsService.PostTransactions.
*
* If this method fails with a [StatusException], the RPC will fail with the corresponding
* [io.grpc.Status]. If this method fails with a [java.util.concurrent.CancellationException],
* the RPC will fail
* with status `Status.CANCELLED`. If this method fails for any other reason, the RPC will
* fail with `Status.UNKNOWN` with the exception as a cause.
*
* @param request The request from the client.
*/
open suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = throw
StatusException(UNIMPLEMENTED.withDescription("Method com.mybank.endpoint.TransactionsService.PostTransactions is unimplemented"))
final override fun bindService(): ServerServiceDefinition = builder(getServiceDescriptor())
.addMethod(unaryServerMethodDefinition(
context = this.context,
descriptor = TransactionsServiceGrpc.getPostTransactionsMethod(),
implementation = ::postTransactions
)).build()
}
}
到目前为止一切顺利。现在我想实现它,我完成卡住了。
这是所有三个暂定及其错误
package com.mybank.endpoint
import io.grpc.stub.StreamObserver
import javax.inject.Singleton
@Singleton
class TransactionsEndpoint : TransactionsServiceGrpc.TransactionsServiceImplBase(){
//First tentative
//This complains "'postTransactions' overrides nothing" and IntelliJ suggest second next approach
//override fun postTransactions(request: TransactionsRequest?) : TransactionsReply {
//Second Tentative
// override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
// //it complains Type mismatch... Found: TransactionsReply
// return TransactionsReply.newBuilder().setMessage("teste").build()
// }
//Third Tentative
//This causes:
//Return type is 'TransactionsReply', which is not a subtype of overridden public open
// fun postTransactions(request: TransactionsRequest!, responseObserver: StreamObserver<TransactionsReply!>!):
// Unit defined in com.mybank.endpoint.TransactionsServiceGrpc.TransactionsServiceImplBase
//override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) : TransactionsReply {
// return TransactionsReply.newBuilder().setMessage("teste").build()
// }
}
gradle.build
plugins {
id "org.jetbrains.kotlin.jvm" version "1.3.72"
id "org.jetbrains.kotlin.kapt" version "1.3.72"
id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
id "application"
id 'com.google.protobuf' version '0.8.13'
}
version "0.2"
group "account-control"
repositories {
mavenLocal()
jcenter()
}
configurations {
// for dependencies that are needed for development only
developmentOnly
}
dependencies {
kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
kapt("io.micronaut:micronaut-inject-java")
kapt("io.micronaut:micronaut-validation")
implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
implementation("io.micronaut:micronaut-runtime")
// implementation("io.micronaut.grpc:micronaut-grpc-runtime")
implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")
//Kafka
implementation("io.micronaut.kafka:micronaut-kafka")
runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
kaptTest("io.micronaut:micronaut-inject-java")
testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
testImplementation("io.micronaut.test:micronaut-test-junit5")
testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")
testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}
test.classpath += configurations.developmentOnly
mainClassName = "account-control.Application"
test {
useJUnitPlatform()
}
allOpen {
annotation("io.micronaut.aop.Around")
}
compileKotlin {
kotlinOptions {
jvmTarget = '11'
//Will retain parameter names for Java reflection
javaParameters = true
}
}
//compileKotlin.dependsOn(generateProto)
compileTestKotlin {
kotlinOptions {
jvmTarget = '11'
javaParameters = true
}
}
tasks.withType(JavaExec) {
classpath += configurations.developmentOnly
jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/grpckt'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
}
generateProtoTasks {
all()*.plugins {
grpc {}
grpckt {}
}
}
}
我已经成功地创建了我的第一个 grpc 端点,它具有基于字符串的非常基本的 request/reply,现在我想通过创建消息列表来继续前进。作为类比,假设我想要一个包含实体列表的 DTO/Pojo。
老实说,我完全被卡住了。所以,我的主要问题是:如何实现带有重复消息的原型服务?
一个有用的评论是,为什么我在自动生成的存根中看到一个要用“...,responseObserver:StreamObserver?”实现的方法?而不是我在原型中明确指定的简单“TransactionsReply”? StreamObserver和repeated message有什么关系?
这是整个项目in my GitHub develop branch
您会发现两个原型:一个使用简单 request/reply 且非常成功的实现,另一个如上所述失败。
*** 在 Louis
的第一个回答后编辑
我很困惑。
使用简单的原型 as
...
service Account {
rpc SendDebit (DebitRequest) returns (DebitReply) {}
}
message DebitRequest {
string name = 1;
}
message DebitReply {
string message = 1;
}
我可以用
实现
override suspend fun sendDebit(request: DebitRequest): DebitReply {
return DebitReply.newBuilder().setMessage("teste").build()
}
尽管如此,
...
service TransactionsService {
rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}
message TransactionsRequest {
string transactionDesc = 1;
repeated Transaction transactions = 2;
}
message Transaction {
string id = 1;
string name = 2;
string description = 3;
}
message TransactionsReply {
string message = 1;
}
我不能用相同类型的回复覆盖(注意回复是完全一样的)
IntelliJ 没有提出此实现
override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
super.postTransactions(request, responseObserver)
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
也没有类似的响应方法
override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) :TransactionsReply {
super.postTransactions(request, responseObserver)
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
最重要的是,如果回复完全相同,为什么在我的第一种方法中我没有从 IntelliJ 获得 StreamObserver?
*** 感谢 Louis 的帮助,最终解决方案。我扩展了错误的摘要class
override suspend fun postTransactions(request: TransactionsRequest): TransactionsReply {
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
您正在寻找
class TransactionsEndpoint : TransactionsServiceGrpcKt.TransactionsServiceCoroutineImplBase(){
override suspend fun postTransactions(request: TransactionsRequest) : TransactionsReply {
...
}
}
...在请求中使用 suspend
修饰符并且没有 ?
,使用 Coroutine
扩展 TransactionsServiceCoroutineImplBase
。
请注意,这与重复消息无关。你的 RPC 有一个 single 输入原型,它只发送一次——这很可能就是你想要的,除非你想要一个 stream 请求,在这种情况下,您的原型文件应该看起来像
service TransactionsService {
rpc PostTransactions(stream TransactionsRequest) returns (TransactionsReply);
}
...在这种情况下,Kotlin 生成的代码看起来会有所不同。
目标:我想编写一个微服务代码,公开一个接收和响应重复消息的端点。我尝试应用我从 Proto official guide 中学到的知识并编写了这个原型:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.mybank.endpoint";
option java_outer_classname = "TransactionsProto";
option objc_class_prefix = "HLW";
package com.mybank.endpoint;
import "google/protobuf/wrappers.proto";
service TransactionsService {
rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}
message TransactionsRequest {
string transactionDesc = 1;
repeated Transaction transactions = 2;
}
message Transaction {
string id = 1;
string name = 2;
string description = 3;
}
message TransactionsReply {
string message = 1;
}
我可以 gradle 构建并且我得到了自动生成的这个 TransactionsServiceGrpcKt
package com.mybank.endpoint
import com.mybank.endpoint.TransactionsServiceGrpc.getServiceDescriptor
import io.grpc.CallOptions
import io.grpc.CallOptions.DEFAULT
import io.grpc.Channel
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.ServerServiceDefinition
import io.grpc.ServerServiceDefinition.builder
import io.grpc.ServiceDescriptor
import io.grpc.Status.UNIMPLEMENTED
import io.grpc.StatusException
import io.grpc.kotlin.AbstractCoroutineServerImpl
import io.grpc.kotlin.AbstractCoroutineStub
import io.grpc.kotlin.ClientCalls.unaryRpc
import io.grpc.kotlin.ServerCalls.unaryServerMethodDefinition
import io.grpc.kotlin.StubFor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmOverloads
import kotlin.jvm.JvmStatic
/**
* Holder for Kotlin coroutine-based client and server APIs for
* com.mybank.endpoint.TransactionsService.
*/
object TransactionsServiceGrpcKt {
@JvmStatic
val serviceDescriptor: ServiceDescriptor
get() = TransactionsServiceGrpc.getServiceDescriptor()
val postTransactionsMethod: MethodDescriptor<TransactionsRequest, TransactionsReply>
@JvmStatic
get() = TransactionsServiceGrpc.getPostTransactionsMethod()
/**
* A stub for issuing RPCs to a(n) com.mybank.endpoint.TransactionsService service as suspending
* coroutines.
*/
@StubFor(TransactionsServiceGrpc::class)
class TransactionsServiceCoroutineStub @JvmOverloads constructor(
channel: Channel,
callOptions: CallOptions = DEFAULT
) : AbstractCoroutineStub<TransactionsServiceCoroutineStub>(channel, callOptions) {
override fun build(channel: Channel, callOptions: CallOptions): TransactionsServiceCoroutineStub
= TransactionsServiceCoroutineStub(channel, callOptions)
/**
* Executes this RPC and returns the response message, suspending until the RPC completes
* with [`Status.OK`][io.grpc.Status]. If the RPC completes with another status, a
* corresponding
* [StatusException] is thrown. If this coroutine is cancelled, the RPC is also cancelled
* with the corresponding exception as a cause.
*
* @param request The request message to send to the server.
*
* @return The single response from the server.
*/
suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = unaryRpc(
channel,
TransactionsServiceGrpc.getPostTransactionsMethod(),
request,
callOptions,
Metadata()
)}
/**
* Skeletal implementation of the com.mybank.endpoint.TransactionsService service based on Kotlin
* coroutines.
*/
abstract class TransactionsServiceCoroutineImplBase(
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractCoroutineServerImpl(coroutineContext) {
/**
* Returns the response to an RPC for com.mybank.endpoint.TransactionsService.PostTransactions.
*
* If this method fails with a [StatusException], the RPC will fail with the corresponding
* [io.grpc.Status]. If this method fails with a [java.util.concurrent.CancellationException],
* the RPC will fail
* with status `Status.CANCELLED`. If this method fails for any other reason, the RPC will
* fail with `Status.UNKNOWN` with the exception as a cause.
*
* @param request The request from the client.
*/
open suspend fun postTransactions(request: TransactionsRequest): TransactionsReply = throw
StatusException(UNIMPLEMENTED.withDescription("Method com.mybank.endpoint.TransactionsService.PostTransactions is unimplemented"))
final override fun bindService(): ServerServiceDefinition = builder(getServiceDescriptor())
.addMethod(unaryServerMethodDefinition(
context = this.context,
descriptor = TransactionsServiceGrpc.getPostTransactionsMethod(),
implementation = ::postTransactions
)).build()
}
}
到目前为止一切顺利。现在我想实现它,我完成卡住了。
这是所有三个暂定及其错误
package com.mybank.endpoint
import io.grpc.stub.StreamObserver
import javax.inject.Singleton
@Singleton
class TransactionsEndpoint : TransactionsServiceGrpc.TransactionsServiceImplBase(){
//First tentative
//This complains "'postTransactions' overrides nothing" and IntelliJ suggest second next approach
//override fun postTransactions(request: TransactionsRequest?) : TransactionsReply {
//Second Tentative
// override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
// //it complains Type mismatch... Found: TransactionsReply
// return TransactionsReply.newBuilder().setMessage("teste").build()
// }
//Third Tentative
//This causes:
//Return type is 'TransactionsReply', which is not a subtype of overridden public open
// fun postTransactions(request: TransactionsRequest!, responseObserver: StreamObserver<TransactionsReply!>!):
// Unit defined in com.mybank.endpoint.TransactionsServiceGrpc.TransactionsServiceImplBase
//override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) : TransactionsReply {
// return TransactionsReply.newBuilder().setMessage("teste").build()
// }
}
gradle.build
plugins {
id "org.jetbrains.kotlin.jvm" version "1.3.72"
id "org.jetbrains.kotlin.kapt" version "1.3.72"
id "org.jetbrains.kotlin.plugin.allopen" version "1.3.72"
id "application"
id 'com.google.protobuf' version '0.8.13'
}
version "0.2"
group "account-control"
repositories {
mavenLocal()
jcenter()
}
configurations {
// for dependencies that are needed for development only
developmentOnly
}
dependencies {
kapt(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
kapt("io.micronaut:micronaut-inject-java")
kapt("io.micronaut:micronaut-validation")
implementation(enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlinVersion}")
implementation("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion")
implementation("io.micronaut:micronaut-runtime")
// implementation("io.micronaut.grpc:micronaut-grpc-runtime")
implementation("io.micronaut.grpc:micronaut-grpc-server-runtime:$micronautGrpcVersion")
implementation("io.micronaut.grpc:micronaut-grpc-client-runtime:$micronautGrpcVersion")
implementation("io.grpc:grpc-kotlin-stub:${grpcKotlinVersion}")
//Kafka
implementation("io.micronaut.kafka:micronaut-kafka")
runtimeOnly("ch.qos.logback:logback-classic:1.2.3")
runtimeOnly("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8")
kaptTest("io.micronaut:micronaut-inject-java")
testImplementation enforcedPlatform("io.micronaut:micronaut-bom:$micronautVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.3.0")
testImplementation("io.micronaut.test:micronaut-test-junit5")
testImplementation("org.mockito:mockito-junit-jupiter:2.22.0")
testRuntime("org.junit.jupiter:junit-jupiter-engine:5.3.0")
testRuntime("org.jetbrains.spek:spek-junit-platform-engine:1.1.5")
}
test.classpath += configurations.developmentOnly
mainClassName = "account-control.Application"
test {
useJUnitPlatform()
}
allOpen {
annotation("io.micronaut.aop.Around")
}
compileKotlin {
kotlinOptions {
jvmTarget = '11'
//Will retain parameter names for Java reflection
javaParameters = true
}
}
//compileKotlin.dependsOn(generateProto)
compileTestKotlin {
kotlinOptions {
jvmTarget = '11'
javaParameters = true
}
}
tasks.withType(JavaExec) {
classpath += configurations.developmentOnly
jvmArgs('-XX:TieredStopAtLevel=1', '-Dcom.sun.management.jmxremote')
}
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/grpckt'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
grpckt { artifact = "io.grpc:protoc-gen-grpc-kotlin:${grpcKotlinVersion}" }
}
generateProtoTasks {
all()*.plugins {
grpc {}
grpckt {}
}
}
}
我已经成功地创建了我的第一个 grpc 端点,它具有基于字符串的非常基本的 request/reply,现在我想通过创建消息列表来继续前进。作为类比,假设我想要一个包含实体列表的 DTO/Pojo。
老实说,我完全被卡住了。所以,我的主要问题是:如何实现带有重复消息的原型服务?
一个有用的评论是,为什么我在自动生成的存根中看到一个要用“...,responseObserver:StreamObserver?”实现的方法?而不是我在原型中明确指定的简单“TransactionsReply”? StreamObserver和repeated message有什么关系?
这是整个项目in my GitHub develop branch
您会发现两个原型:一个使用简单 request/reply 且非常成功的实现,另一个如上所述失败。
*** 在 Louis
的第一个回答后编辑我很困惑。 使用简单的原型 as
...
service Account {
rpc SendDebit (DebitRequest) returns (DebitReply) {}
}
message DebitRequest {
string name = 1;
}
message DebitReply {
string message = 1;
}
我可以用
实现override suspend fun sendDebit(request: DebitRequest): DebitReply {
return DebitReply.newBuilder().setMessage("teste").build()
}
尽管如此,
...
service TransactionsService {
rpc PostTransactions(TransactionsRequest) returns (TransactionsReply);
}
message TransactionsRequest {
string transactionDesc = 1;
repeated Transaction transactions = 2;
}
message Transaction {
string id = 1;
string name = 2;
string description = 3;
}
message TransactionsReply {
string message = 1;
}
我不能用相同类型的回复覆盖(注意回复是完全一样的)
IntelliJ 没有提出此实现
override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) {
super.postTransactions(request, responseObserver)
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
也没有类似的响应方法
override fun postTransactions(request: TransactionsRequest?, responseObserver: StreamObserver<TransactionsReply>?) :TransactionsReply {
super.postTransactions(request, responseObserver)
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
最重要的是,如果回复完全相同,为什么在我的第一种方法中我没有从 IntelliJ 获得 StreamObserver?
*** 感谢 Louis 的帮助,最终解决方案。我扩展了错误的摘要class
override suspend fun postTransactions(request: TransactionsRequest): TransactionsReply {
return TransactionsReply.newBuilder().setMessage("testReply").build()
}
您正在寻找
class TransactionsEndpoint : TransactionsServiceGrpcKt.TransactionsServiceCoroutineImplBase(){
override suspend fun postTransactions(request: TransactionsRequest) : TransactionsReply {
...
}
}
...在请求中使用 suspend
修饰符并且没有 ?
,使用 Coroutine
扩展 TransactionsServiceCoroutineImplBase
。
请注意,这与重复消息无关。你的 RPC 有一个 single 输入原型,它只发送一次——这很可能就是你想要的,除非你想要一个 stream 请求,在这种情况下,您的原型文件应该看起来像
service TransactionsService {
rpc PostTransactions(stream TransactionsRequest) returns (TransactionsReply);
}
...在这种情况下,Kotlin 生成的代码看起来会有所不同。