如何使它纯净?

How to it make pure?

我有以下 scala 代码:

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.Future

object TestConsumer {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem("KafkaConsumer")
    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")


    val result = Consumer
      .committableSource(consumerSettings, Subscriptions.topics("test"))
      .mapAsync(2)(rec => Future.successful(rec.record.value()))
      .runWith(Sink.foreach(ele => {
        print(ele)
        system.terminate()
      }))
  }
}

如您所知,应用程序使用来自 kafka 的消息并在 shell 上打印出来。

runWith 不是纯粹的,它会产生一些副作用,打印出收到的消息并关闭 actor。

问题是,如何用cats IO effects让它变纯?有可能吗?

你不需要猫 IO 来让它变得纯净。请注意,您的 sink 已经是纯值,因为它只是描述使用它时会发生什么的值(在这种情况下 using 表示 "connecting to the Source and running the stream")。

    val sink: Sink[String, Future[Done]] = Sink.foreach(ele => {
      print(ele)
      // system.terminate() // PROBLEM: terminating the system before stream completes!
    })

您描述的问题与纯度无关。问题是上面的 sink 关闭了 system 的值,然后在处理源的每个元素时尝试 terminate 它。

终止 system 意味着您正在破坏用于 运行 流的整个 运行 时间环境(由 ActorMaterializer 使用)。这应该只在您的流完成时完成。

val result: Future[Done] = Consumer
  .committableSource(consumerSettings, Subscriptions.topics("test"))
  .mapAsync(2)(rec => Future.successful(rec.record.value()))
  .runWith(sink)

result.onComplete(_ => system.terminate())