Alpaca Cassandra 流量使用
Alpakka Cassandra Flow Usage
我正在阅读 Alpakka Cassandra 的文档here
这使得将 Cassandra 用作源和接收器变得非常容易。但是流量使用情况呢。
我所说的流用法是指我没有将 Cassandra 用作源或接收器。但要查找数据。
可以使用 Alpakka 吗?还是我必须自己在流程中编写 Cassandra jdbc 代码?
1) Sink.如果你查看Alpakka的source code,你会发现Sink的构造如下
Flow[T]
.mapAsyncUnordered(parallelism)(t ⇒ session.executeAsync(statementBinder(t, statement)).asScala())
.toMat(Sink.ignore)(Keep.right)
如果你只需要一个路过的流量,你总是可以 trim 出 Sink.ignore
部分,你就会有
Flow[T]
.mapAsyncUnordered(parallelism)(t ⇒ session.executeAsync(statementBinder(t, statement)).asScala())
您只需要公开 Guava 期货转换器,它目前在 Alpakka 中是私有的。
2) 来源。 你总是可以通过 .flatMapConcat(x => CassandraSource(...)
)
从 Source
获得 Flow
我正在阅读 Alpakka Cassandra 的文档here
这使得将 Cassandra 用作源和接收器变得非常容易。但是流量使用情况呢。
我所说的流用法是指我没有将 Cassandra 用作源或接收器。但要查找数据。
可以使用 Alpakka 吗?还是我必须自己在流程中编写 Cassandra jdbc 代码?
1) Sink.如果你查看Alpakka的source code,你会发现Sink的构造如下
Flow[T]
.mapAsyncUnordered(parallelism)(t ⇒ session.executeAsync(statementBinder(t, statement)).asScala())
.toMat(Sink.ignore)(Keep.right)
如果你只需要一个路过的流量,你总是可以 trim 出 Sink.ignore
部分,你就会有
Flow[T]
.mapAsyncUnordered(parallelism)(t ⇒ session.executeAsync(statementBinder(t, statement)).asScala())
您只需要公开 Guava 期货转换器,它目前在 Alpakka 中是私有的。
2) 来源。 你总是可以通过 .flatMapConcat(x => CassandraSource(...)
)
Source
获得 Flow