无法通过 REST 请求收到的命令在 kafka 主题上 subscribe/unsubscribe

Unable to subscribe/unsubscribe on kafka topic by command received by REST request

我想编写一个完全基于 ZIO 堆栈设计的应用程序。 我是这个框架的新手,所以解决方案可能很简单,而且我误解了一些重要的事情。 并面临以下问题。 我需要使用 REST 收到的命令取消订阅 kafka 主题。 此外,我还需要通过 REST 订阅主题。 我使用 zio-kafka 编写了以下代码来描述订阅主题并将事件打印到控制台的效果:

private val consumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("MyConsumerGroup")
    .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))

private val managedConsumer = Consumer.make(consumerSettings)

val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] = ZLayer.fromManaged(managedConsumer)

def startStream: ZIO[Console with Any with Has[Consumer] with Clock, Throwable, Unit] =
    Consumer.subscribeAnd(Subscription.topics("myTopic"))
      .plainStream(Serde.string, Serde.string)
      .tap(cr => zio.console.putStrLn(cr.value))
      .map(_.offset)
      .aggregateAsync(Consumer.offsetBatches)
      .run(ZSink.foreach(_.commit))

然后我使用 zhttp 描述了 REST 端点:

  private val app = HttpApp.fromEffectFunction{
    case Method.POST -> Root / "stop" => for {
      _ <- ZIO.serviceWith[Consumer](_.unsubscribe)
      _ <- zio.console.putStrLn("stopped")
    } yield Response.ok
    case Method.POST -> Root / "start" => for {
      _ <- startStream.fork
      _ <- zio.console.putStrLn("started")
    } yield Response.ok
  }

private val server = Server.port(8080) ++ Server.app(app)

最后,我运行使用 main 方法编写我的简单程序:

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
    _ <- startStream.provideSomeLayer(consumer ++ Console.live).fork
    _ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
      .provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
} yield ()).exitCode

它 运行 没问题,但问题是当我 运行 程序响应 /stop 请求但 Consumer 仍然订阅并且消息仍然从主题中读取时。 如果我 运行 我的程序只具有服务器效果,如下所示:

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = 
  server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
    .provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
  .exitCode

在我调用 /start 端点后,在控制台中我可以看到消费者还活着,我可以看到一些关于 kafka 集群的信息,但是没有从主题中读取任何消息。 请告诉我我错在哪里,我的误解在哪里。
谢谢。

你通过了两次有效的 consumer 层。这意味着取消订阅的消费者不一样。

怎么样

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
    _ <- startStream.fork
    _ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
} yield ()).provideSomeLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
.exitCode

注意:可能无法编译,未经测试