Zio-Kafka:为主题生成消息

Zio-Kafka: Producing messages to topic

我正在尝试使用库 zio-kafka,版本 0.15.0.

向 Kafka 主题生成一些消息

很明显,我对 ZIO 生态系统的理解不够理想,因为我无法提供简单的信息。我的程序如下:

object KafkaProducerExample extends zio.App {

  val producerSettings: ProducerSettings = ProducerSettings(List("localhost:9092"))

  val producer: ZLayer[Blocking, Throwable, Producer[Nothing, String, String]] =
    ZLayer.fromManaged(Producer.make(producerSettings, Serde.string, Serde.string))

  val effect: RIO[Nothing with Producer[Nothing, String, String], RecordMetadata] =
    Producer.produce("topic", "key", "value")

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    effect.provideSomeLayer(producer).exitCode
  }
}

编译器给我以下错误:

[error] KafkaProducerExample.scala:19:28: Cannot prove that zio.blocking.Blocking with zio.Has[zio.kafka.producer.Producer.Service[Nothing,String,String]] <:< Nothing with zio.kafka.producer.Producer[Nothing,String,String].
[error]     effect.provideSomeLayer(producer).exitCode
[error]                            ^
[error] one error found

任何人都可以帮助我了解发生了什么吗?

好的,是 ZIO 在创建 producer 层时需要一些关于类型的提示:

val producer: ZLayer[Blocking, Throwable, Producer[Any, String, String]] =
    ZLayer.fromManaged(Producer.make[Any, String, String](producerSettings, Serde.string, Serde.string))

在调用make智能构造函数的时候,我们要给他我们要使用的类型。第一个表示构建键和值序列化程序所需的环境,而最后两个是消息的键和值的类型。

在这种情况下,我们根本不需要环境来构建这两个序列化程序,所以我们通过 Any.

最后,Producer.produce 函数也需要一些类型提示:

val effect: RIO[Producer[Any, String, String], RecordMetadata] =
    Producer.produce[Any, String, String]("topic", "key", "value")

进行上述更改后,类型完美对齐,编译器再次开心。