Kafka 使用 LogRotator 提交

Kafka commit with Akka and LogRotator

我正在尝试使用 Consumer.committableSource 通过 Akka 从 Kafka 读取数据。然后我想将数据写入共享文件夹中的文件中。 提交时,我们通常使用 via(Committer.flow(committerSettings).

之类的东西

但是,此方法不会 return Kafka 流的值,因此之后我无法调用 .runWith(LogRotatorSink.withSinkFactory(rotator, sink)) 之类的方法来写入数据。 这是没有提交的代码:

Consumer.committableSource(settings, Subscriptions.topics(kafkaTopics.toSet))
  .via(processor)
  .prepend(headerCSVSource)
  .via(CsvFormatting.format(delimiter =
    CsvFormatting.SemiColon))
  .runWith(LogRotatorSink.withSinkFactory(rotator, sink))

这是我认为我需要的:

 Consumer
        .committableSource(settings, Subscriptions.topics(kafkaTopics.toSet))
          .via(processor)
          .prepend(headerCSVSource)
          .via(CsvFormatting.format(delimiter =
            CsvFormatting.SemiColon))
          .via(Committer.flow(committerSettings))
          .runWith(LogRotatorSink.withSinkFactory(rotator, sink))

但这行不通,因为 via(Committer.flow) 没有 return 流值(但是 Flow[Committable, Done, NotUsed])。

我需要的是只有在数据写入文件后才提交偏移量。 如果您觉得其他选项(例如使用 plainSource / auto-commit)更合适,我愿意考虑它们。

看来您需要将流元素传递到一个接收器,并在成功后传递到另一个接收器。

您可以 运行 在您的信息流中添加子信息流。沿着这条线:

.via(CsvFormatting.format(delimiter = CsvFormatting.SemiColon))
.mapAsync(1) { c =>
  Source.single(c).runWith(LogRotatorSink.withSinkFactory(rotator, sink)).map(_ => c)
}
.runWith(Committer.sink(committerSettings))

它应该可以工作,但是,经过一番思考,我认为最好不要使用 sink 来写入日志,而是使用其他不会终止流的方式。