如何使它纯净?
How to it make pure?
我有以下 scala
代码:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object TestConsumer {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("KafkaConsumer")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val result = Consumer
.committableSource(consumerSettings, Subscriptions.topics("test"))
.mapAsync(2)(rec => Future.successful(rec.record.value()))
.runWith(Sink.foreach(ele => {
print(ele)
system.terminate()
}))
}
}
如您所知,应用程序使用来自 kafka
的消息并在 shell 上打印出来。
runWith
不是纯粹的,它会产生一些副作用,打印出收到的消息并关闭 actor。
问题是,如何用cats IO effects让它变纯?有可能吗?
你不需要猫 IO
来让它变得纯净。请注意,您的 sink
已经是纯值,因为它只是描述使用它时会发生什么的值(在这种情况下 using 表示 "connecting to the Source
and running the stream")。
val sink: Sink[String, Future[Done]] = Sink.foreach(ele => {
print(ele)
// system.terminate() // PROBLEM: terminating the system before stream completes!
})
您描述的问题与纯度无关。问题是上面的 sink
关闭了 system
的值,然后在处理源的每个元素时尝试 terminate
它。
终止 system
意味着您正在破坏用于 运行 流的整个 运行 时间环境(由 ActorMaterializer
使用)。这应该只在您的流完成时完成。
val result: Future[Done] = Consumer
.committableSource(consumerSettings, Subscriptions.topics("test"))
.mapAsync(2)(rec => Future.successful(rec.record.value()))
.runWith(sink)
result.onComplete(_ => system.terminate())
我有以下 scala
代码:
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object TestConsumer {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("KafkaConsumer")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val result = Consumer
.committableSource(consumerSettings, Subscriptions.topics("test"))
.mapAsync(2)(rec => Future.successful(rec.record.value()))
.runWith(Sink.foreach(ele => {
print(ele)
system.terminate()
}))
}
}
如您所知,应用程序使用来自 kafka
的消息并在 shell 上打印出来。
runWith
不是纯粹的,它会产生一些副作用,打印出收到的消息并关闭 actor。
问题是,如何用cats IO effects让它变纯?有可能吗?
你不需要猫 IO
来让它变得纯净。请注意,您的 sink
已经是纯值,因为它只是描述使用它时会发生什么的值(在这种情况下 using 表示 "connecting to the Source
and running the stream")。
val sink: Sink[String, Future[Done]] = Sink.foreach(ele => {
print(ele)
// system.terminate() // PROBLEM: terminating the system before stream completes!
})
您描述的问题与纯度无关。问题是上面的 sink
关闭了 system
的值,然后在处理源的每个元素时尝试 terminate
它。
终止 system
意味着您正在破坏用于 运行 流的整个 运行 时间环境(由 ActorMaterializer
使用)。这应该只在您的流完成时完成。
val result: Future[Done] = Consumer
.committableSource(consumerSettings, Subscriptions.topics("test"))
.mapAsync(2)(rec => Future.successful(rec.record.value()))
.runWith(sink)
result.onComplete(_ => system.terminate())