为什么日志记录不适用于 Akka Stream

Why Logging is Not Working for Akka Stream

我正在使用 Alpakka,下面有玩具示例:

val system = ActorSystem("system")
implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)

implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
implicit val ec: ExecutionContextExecutor = system.dispatcher

val log = Logger(this.getClass, "Foo")

val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")
val consumerSettings: ConsumerSettings[String, String] =
  ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("my-group")

def start() = {
  Consumer.plainSource(consumerSettings, Subscriptions.topics("test"))
    .log("My Consumer: ")
    .withAttributes(
      Attributes.logLevels(
        onElement = Logging.InfoLevel,
        onFinish = Logging.InfoLevel,
        onFailure = Logging.DebugLevel
      )
    )
    .filter(//some predicate)
    .map(// some process)
    .map(out => ByteString(out))
    .runWith(LogRotatorSink(timeFunc))
    .onComplete {
      case Success(_) => log.info("DONE")
      case Failure(e) => log.error("ERROR")
    }
}

此代码有效。但是我在登录时遇到问题。带有属性的第一部分记录良好。当元素进来时,它会记录到标准输出。但是当 LogRotatorSink 完成并且未来完成时,我想将 DONE 打印到标准输出。这是行不通的。正在生成文件,因此进程正在运行,但没有 "DONE" 向标准输出发送消息。

请问如何将 "DONE" 输出到标准输出?

akka {

  # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "INFO"

  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "INFO"

  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

}


<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%highlight(%date{HH:mm:ss.SSS} %-5level %-50.50([%logger{50}])) - %msg%n</pattern>
        </encoder>
    </appender>

    <logger name="org.apache.kafka" level="INFO"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>

</configuration>

日志正在运行 - 这是你的 Future 没有结束,因为 Kafka Consumer 是一个无限流 - 当它读取所有内容并到达主题中的最新消息时......它会等待新消息出现 - 在许多情况下,例如evens sourcing 突然关闭这样的流将是一场灾难,因此无限 运行 流作为默认是明智的选择。

这个直播应该什么时候结束?明确定义此条件,您将能够使用 .take(n).takeUntil(cond).takeWithin(time) 之类的内容在明确定义的条件下关闭它。然后流将关闭,Future 将完成并且您的 DONE 将被打印。