Kafka 使用 LogRotator 提交
Kafka commit with Akka and LogRotator
我正在尝试使用 Consumer.committableSource 通过 Akka 从 Kafka 读取数据。然后我想将数据写入共享文件夹中的文件中。
提交时,我们通常使用 via(Committer.flow(committerSettings)
.
之类的东西
但是,此方法不会 return Kafka 流的值,因此之后我无法调用 .runWith(LogRotatorSink.withSinkFactory(rotator, sink))
之类的方法来写入数据。
这是没有提交的代码:
Consumer.committableSource(settings, Subscriptions.topics(kafkaTopics.toSet))
.via(processor)
.prepend(headerCSVSource)
.via(CsvFormatting.format(delimiter =
CsvFormatting.SemiColon))
.runWith(LogRotatorSink.withSinkFactory(rotator, sink))
这是我认为我需要的:
Consumer
.committableSource(settings, Subscriptions.topics(kafkaTopics.toSet))
.via(processor)
.prepend(headerCSVSource)
.via(CsvFormatting.format(delimiter =
CsvFormatting.SemiColon))
.via(Committer.flow(committerSettings))
.runWith(LogRotatorSink.withSinkFactory(rotator, sink))
但这行不通,因为 via(Committer.flow)
没有 return 流值(但是 Flow[Committable, Done, NotUsed])。
我需要的是只有在数据写入文件后才提交偏移量。
如果您觉得其他选项(例如使用 plainSource / auto-commit)更合适,我愿意考虑它们。
看来您需要将流元素传递到一个接收器,并在成功后传递到另一个接收器。
您可以 运行 在您的信息流中添加子信息流。沿着这条线:
.via(CsvFormatting.format(delimiter = CsvFormatting.SemiColon))
.mapAsync(1) { c =>
Source.single(c).runWith(LogRotatorSink.withSinkFactory(rotator, sink)).map(_ => c)
}
.runWith(Committer.sink(committerSettings))
它应该可以工作,但是,经过一番思考,我认为最好不要使用 sink 来写入日志,而是使用其他不会终止流的方式。
我正在尝试使用 Consumer.committableSource 通过 Akka 从 Kafka 读取数据。然后我想将数据写入共享文件夹中的文件中。
提交时,我们通常使用 via(Committer.flow(committerSettings)
.
但是,此方法不会 return Kafka 流的值,因此之后我无法调用 .runWith(LogRotatorSink.withSinkFactory(rotator, sink))
之类的方法来写入数据。
这是没有提交的代码:
Consumer.committableSource(settings, Subscriptions.topics(kafkaTopics.toSet))
.via(processor)
.prepend(headerCSVSource)
.via(CsvFormatting.format(delimiter =
CsvFormatting.SemiColon))
.runWith(LogRotatorSink.withSinkFactory(rotator, sink))
这是我认为我需要的:
Consumer
.committableSource(settings, Subscriptions.topics(kafkaTopics.toSet))
.via(processor)
.prepend(headerCSVSource)
.via(CsvFormatting.format(delimiter =
CsvFormatting.SemiColon))
.via(Committer.flow(committerSettings))
.runWith(LogRotatorSink.withSinkFactory(rotator, sink))
但这行不通,因为 via(Committer.flow)
没有 return 流值(但是 Flow[Committable, Done, NotUsed])。
我需要的是只有在数据写入文件后才提交偏移量。 如果您觉得其他选项(例如使用 plainSource / auto-commit)更合适,我愿意考虑它们。
看来您需要将流元素传递到一个接收器,并在成功后传递到另一个接收器。
您可以 运行 在您的信息流中添加子信息流。沿着这条线:
.via(CsvFormatting.format(delimiter = CsvFormatting.SemiColon))
.mapAsync(1) { c =>
Source.single(c).runWith(LogRotatorSink.withSinkFactory(rotator, sink)).map(_ => c)
}
.runWith(Committer.sink(committerSettings))
它应该可以工作,但是,经过一番思考,我认为最好不要使用 sink 来写入日志,而是使用其他不会终止流的方式。