Akka 流入门

Getting Started With Akka Streams

最近几天我一直在阅读有关 Akka Streams 的内容,并且在过去几个月里我一直在使用 Scala 中的 Rx 库。对我来说,这两个库所提供的功能似乎有些重叠。 RxScala 更容易上手、理解和使用。例如,这是一个简单的用例,我使用 Scala 的 Rx 库连接到 Kafka 主题,将其包装到一个 Observable 中,这样我就可以让订阅者收到这些消息。

val consumerStream = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head
val observableConsumer = Observable.fromIterator(consumerStream).map(_.message())

这非常简单明了。关于我应该如何开始使用 akka 流的任何线索?我想在上面使用相同的示例,我想从 Source 发出事件。我稍后会有一个 Flow 和一个 Sink。最后,在我的主要 class 中,我将把这 3 个组合到 运行 应用程序数据流中。

有什么建议吗?

所以这是我想出的:

val kafkaStreamItr = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head
Source.fromIterator(() => kafkaStreamItr).map(_.message)