Zio-Kafka:为主题生成消息
Zio-Kafka: Producing messages to topic
我正在尝试使用库 zio-kafka
,版本 0.15.0.
向 Kafka 主题生成一些消息
很明显,我对 ZIO 生态系统的理解不够理想,因为我无法提供简单的信息。我的程序如下:
object KafkaProducerExample extends zio.App {
val producerSettings: ProducerSettings = ProducerSettings(List("localhost:9092"))
val producer: ZLayer[Blocking, Throwable, Producer[Nothing, String, String]] =
ZLayer.fromManaged(Producer.make(producerSettings, Serde.string, Serde.string))
val effect: RIO[Nothing with Producer[Nothing, String, String], RecordMetadata] =
Producer.produce("topic", "key", "value")
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
effect.provideSomeLayer(producer).exitCode
}
}
编译器给我以下错误:
[error] KafkaProducerExample.scala:19:28: Cannot prove that zio.blocking.Blocking with zio.Has[zio.kafka.producer.Producer.Service[Nothing,String,String]] <:< Nothing with zio.kafka.producer.Producer[Nothing,String,String].
[error] effect.provideSomeLayer(producer).exitCode
[error] ^
[error] one error found
任何人都可以帮助我了解发生了什么吗?
好的,是 ZIO 在创建 producer
层时需要一些关于类型的提示:
val producer: ZLayer[Blocking, Throwable, Producer[Any, String, String]] =
ZLayer.fromManaged(Producer.make[Any, String, String](producerSettings, Serde.string, Serde.string))
在调用make
智能构造函数的时候,我们要给他我们要使用的类型。第一个表示构建键和值序列化程序所需的环境,而最后两个是消息的键和值的类型。
在这种情况下,我们根本不需要环境来构建这两个序列化程序,所以我们通过 Any
.
最后,Producer.produce
函数也需要一些类型提示:
val effect: RIO[Producer[Any, String, String], RecordMetadata] =
Producer.produce[Any, String, String]("topic", "key", "value")
进行上述更改后,类型完美对齐,编译器再次开心。
我正在尝试使用库 zio-kafka
,版本 0.15.0.
很明显,我对 ZIO 生态系统的理解不够理想,因为我无法提供简单的信息。我的程序如下:
object KafkaProducerExample extends zio.App {
val producerSettings: ProducerSettings = ProducerSettings(List("localhost:9092"))
val producer: ZLayer[Blocking, Throwable, Producer[Nothing, String, String]] =
ZLayer.fromManaged(Producer.make(producerSettings, Serde.string, Serde.string))
val effect: RIO[Nothing with Producer[Nothing, String, String], RecordMetadata] =
Producer.produce("topic", "key", "value")
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
effect.provideSomeLayer(producer).exitCode
}
}
编译器给我以下错误:
[error] KafkaProducerExample.scala:19:28: Cannot prove that zio.blocking.Blocking with zio.Has[zio.kafka.producer.Producer.Service[Nothing,String,String]] <:< Nothing with zio.kafka.producer.Producer[Nothing,String,String].
[error] effect.provideSomeLayer(producer).exitCode
[error] ^
[error] one error found
任何人都可以帮助我了解发生了什么吗?
好的,是 ZIO 在创建 producer
层时需要一些关于类型的提示:
val producer: ZLayer[Blocking, Throwable, Producer[Any, String, String]] =
ZLayer.fromManaged(Producer.make[Any, String, String](producerSettings, Serde.string, Serde.string))
在调用make
智能构造函数的时候,我们要给他我们要使用的类型。第一个表示构建键和值序列化程序所需的环境,而最后两个是消息的键和值的类型。
在这种情况下,我们根本不需要环境来构建这两个序列化程序,所以我们通过 Any
.
最后,Producer.produce
函数也需要一些类型提示:
val effect: RIO[Producer[Any, String, String], RecordMetadata] =
Producer.produce[Any, String, String]("topic", "key", "value")
进行上述更改后,类型完美对齐,编译器再次开心。