用于服务器流式传输的 Akka Streams(gRPC、Scala)

Akka Streams for server streaming (gRPC, Scala)

我是 Akka Streams 和 gRPC 的新手,我正在尝试构建一个客户端发送单个请求而服务器发送多个响应的端点。

这是我的protobuf

syntax = "proto3";

option java_multiple_files = true;
option java_package = "customer.service.proto";

service CustomerService {

  rpc CreateCustomer(CustomerRequest) returns (stream CustomerResponse) {}

}

message CustomerRequest {
  string customerId = 1;
  string customerName = 2;
}

message CustomerResponse {
  enum Status {
    No_Customer = 0;
    Creating_Customer = 1;
    Customer_Created = 2;
  }

  string customerId = 1;
  Status status = 2;
}

我试图通过发送客户请求来实现这一点,然后服务器将首先检查并响应 No_Customer 然后它会发送 Creating_Customer 最后服务器会说 Customer_Created.

我不知道从哪里开始实施,找了几个小时但仍然一无所知,如果有人能指出正确的方向,我将不胜感激。

开始的地方是Akka gRPC documentation and, in particular, the service WalkThrough。让示例在干净的项目中工作非常简单。

相关服务器示例方法是这样的:

override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
  println(s"sayHello to ${in.name} with stream of chars...")
  Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
}

现在的问题是创建一个 Source returns 正确的结果,但这取决于您计划如何实施服务器,因此很难回答。检查 Akka Streams documentation 的各种选项。

客户端代码更简单,只需在 CreateCustomer 返回的 Source 上调用 runForeach,如示例中所示:

def runStreamingReplyExample(): Unit = {
  val responseStream = client.itKeepsReplying(HelloRequest("Alice"))
  val done: Future[Done] =
    responseStream.runForeach(reply => println(s"got streaming reply: ${reply.message}"))

  done.onComplete {
    case Success(_) =>
      println("streamingReply done")
    case Failure(e) =>
      println(s"Error streamingReply: $e")
  }
}