如何使用 Kafka、Alpakka Kafka、Play Framework 和 Websocket 处理 POST 请求?

How handle a POST request with Kafka, Alpakka Kafka, Play Framework and Websocket?

假设我有两个 kafka 主题,request_topic 用于我的 Post 请求,response_topic 用于我的响应。

这是型号:

case class Request(requestId: String, body: String)
case class Response(responseId: String, body: String, requestId: String)

这是我的套接字处理程序

def socket = WebSocket.accept[String, String] { req =>
  val requestId = ??? // Generate a unique requestId

  val in: Sink[String, Future[Done]] = Sink.foreach[String]{ msg =>
    val record = new ProducerRecord[String, Request]("request_topic", "key", Request(requestId, msg))
    val producer: KafkaProducer[String, Request] = ???
    Future { producer.send(record).get }
  }

  // Once produced, some stream processing apps will manage to process request and publish the reponse to response_topic
  // The Request and Response object are linked by the requestId field.

  val consumerSettings = ???
  val out: Source[ConsumerRecord[String, Response], _] = Consumer
    .plainSource(consumerSettings, Subscriptions.topics("response_topic"))
    .filter(cr => cr.value.requestId == requestId)
    .map(cr => someResponseString(cr.value))

  Flow.formSinkAndSource(in, out)
}

def someResponseString(res: Response): String = ???

基本上,对于每条传入消息,我都会向 Kafka 发布一个 Request 对象,然后该请求由一些流处理应用程序(此处未显示)处理,并希望将响应发布回 Kafka。

我有一些顾虑:

1 - Alpakka Kafka Connector 是否会为每个传入消息创建一个新的连接器实例,或者只要 Play 是 运行 它就会使用相同的实例?

2 - 基于单个 requestId 过滤响应是个好主意,还是我应该将整个流发送回每个客户端,让他们根据他们感兴趣的 requestId 过滤响应。

3 - 我是不是什么都错了? (我是Websocket的真正新手)

提前致谢。

1) 看你怎么配置了。例如,在 in: Sink 正文中,您要为每条消息创建一个新的 KafkaProducer。相反,您应该为整个应用程序配备一个生产者。

我不确定 Akka / Play 的线程模型是如何工作的,但大多数网络服务器为每个传入连接启动一个新线程,线程池中的线程数最多为固定数量。

2) 我认为最好尽快进行过滤,并尽可能在服务器端进行过滤。这节省了返回客户端的带宽。

此外,如果您只想将数据从 Web 服务器上的 Kafka 单向推送到客户端,您可能需要 SSE, not Websocket