无法通过 REST 请求收到的命令在 kafka 主题上 subscribe/unsubscribe
Unable to subscribe/unsubscribe on kafka topic by command received by REST request
我想编写一个完全基于 ZIO 堆栈设计的应用程序。
我是这个框架的新手,所以解决方案可能很简单,而且我误解了一些重要的事情。
并面临以下问题。
我需要使用 REST 收到的命令取消订阅 kafka 主题。
此外,我还需要通过 REST 订阅主题。
我使用 zio-kafka 编写了以下代码来描述订阅主题并将事件打印到控制台的效果:
private val consumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("MyConsumerGroup")
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
private val managedConsumer = Consumer.make(consumerSettings)
val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] = ZLayer.fromManaged(managedConsumer)
def startStream: ZIO[Console with Any with Has[Consumer] with Clock, Throwable, Unit] =
Consumer.subscribeAnd(Subscription.topics("myTopic"))
.plainStream(Serde.string, Serde.string)
.tap(cr => zio.console.putStrLn(cr.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.run(ZSink.foreach(_.commit))
然后我使用 zhttp 描述了 REST 端点:
private val app = HttpApp.fromEffectFunction{
case Method.POST -> Root / "stop" => for {
_ <- ZIO.serviceWith[Consumer](_.unsubscribe)
_ <- zio.console.putStrLn("stopped")
} yield Response.ok
case Method.POST -> Root / "start" => for {
_ <- startStream.fork
_ <- zio.console.putStrLn("started")
} yield Response.ok
}
private val server = Server.port(8080) ++ Server.app(app)
最后,我运行使用 main 方法编写我的简单程序:
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
_ <- startStream.provideSomeLayer(consumer ++ Console.live).fork
_ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
.provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
} yield ()).exitCode
它 运行 没问题,但问题是当我 运行 程序响应 /stop
请求但 Consumer
仍然订阅并且消息仍然从主题中读取时。
如果我 运行 我的程序只具有服务器效果,如下所示:
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
.provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
.exitCode
在我调用 /start
端点后,在控制台中我可以看到消费者还活着,我可以看到一些关于 kafka 集群的信息,但是没有从主题中读取任何消息。
请告诉我我错在哪里,我的误解在哪里。
谢谢。
你通过了两次有效的 consumer
层。这意味着取消订阅的消费者不一样。
怎么样
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
_ <- startStream.fork
_ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
} yield ()).provideSomeLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
.exitCode
注意:可能无法编译,未经测试
我想编写一个完全基于 ZIO 堆栈设计的应用程序。 我是这个框架的新手,所以解决方案可能很简单,而且我误解了一些重要的事情。 并面临以下问题。 我需要使用 REST 收到的命令取消订阅 kafka 主题。 此外,我还需要通过 REST 订阅主题。 我使用 zio-kafka 编写了以下代码来描述订阅主题并将事件打印到控制台的效果:
private val consumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("MyConsumerGroup")
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
private val managedConsumer = Consumer.make(consumerSettings)
val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] = ZLayer.fromManaged(managedConsumer)
def startStream: ZIO[Console with Any with Has[Consumer] with Clock, Throwable, Unit] =
Consumer.subscribeAnd(Subscription.topics("myTopic"))
.plainStream(Serde.string, Serde.string)
.tap(cr => zio.console.putStrLn(cr.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.run(ZSink.foreach(_.commit))
然后我使用 zhttp 描述了 REST 端点:
private val app = HttpApp.fromEffectFunction{
case Method.POST -> Root / "stop" => for {
_ <- ZIO.serviceWith[Consumer](_.unsubscribe)
_ <- zio.console.putStrLn("stopped")
} yield Response.ok
case Method.POST -> Root / "start" => for {
_ <- startStream.fork
_ <- zio.console.putStrLn("started")
} yield Response.ok
}
private val server = Server.port(8080) ++ Server.app(app)
最后,我运行使用 main 方法编写我的简单程序:
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
_ <- startStream.provideSomeLayer(consumer ++ Console.live).fork
_ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
.provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
} yield ()).exitCode
它 运行 没问题,但问题是当我 运行 程序响应 /stop
请求但 Consumer
仍然订阅并且消息仍然从主题中读取时。
如果我 运行 我的程序只具有服务器效果,如下所示:
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
.provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
.exitCode
在我调用 /start
端点后,在控制台中我可以看到消费者还活着,我可以看到一些关于 kafka 集群的信息,但是没有从主题中读取任何消息。
请告诉我我错在哪里,我的误解在哪里。
谢谢。
你通过了两次有效的 consumer
层。这意味着取消订阅的消费者不一样。
怎么样
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
_ <- startStream.fork
_ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
} yield ()).provideSomeLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
.exitCode
注意:可能无法编译,未经测试