使用流的内容更改源中的物化值
Change a materialized value in a source using the contents of the stream
Alpakka 提供了一种访问数十种不同数据源的好方法。 HDFS 和 FTP 等面向文件的源作为 Source[ByteString, Future[IOResult]
交付。但是,通过 Akka HTTP 的 HTTP 请求作为 Source[ByteString, NotUsed]
的实体流传送。在我的用例中,我想从 HTTP 源检索内容为 Source[ByteString, Future[IOResult]
,这样我就可以构建一个统一的资源获取器,该资源获取器适用于多种方案(本例中为 hdfs、文件、ftp 和 S3)。
特别是,我想将 Source[ByteString, NotUsed]
源转换为
Source[ByteString, Future[IOResult]
我可以从传入的字节流中计算出 IOResult。有很多方法,如 flatMapConcat
和 viaMat
但 none 似乎能够从输入流中提取详细信息(例如读取的字节数)或初始化 IOResult
结构合理。理想情况下,我正在寻找一种具有以下签名的方法,它将在流进入时更新 IOResult。
def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
}
这可以通过使用 Promise
进行物化值传播来完成。
val completion = Promise[IoResult]
val httpWithIoResult = http.mapMaterializedValue(_ => completion.future)
现在剩下的就是在相关数据可用时完成completion
承诺。
另一种方法是下降到 GraphStage
API,在那里您可以对物化值传播进行较低级别的控制。但即使在那里使用 Promises
通常也是物化值传播的选择实现。看一下内置的运算符实现,例如 Ignore
.
我不记得有任何现成的功能可以立即执行此操作,但是您可以使用 alsoToMat (surprisingly didn't find it in akka streams docs, although you can look it in source code documentation & java api) flow function together with Sink.fold 来积累一些价值并在最后给予它。例如:
def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
问题在于 alsoToMat
将输入垫值与 alsoToMat
中提供的值相结合。同时,source 产生的值不受 alsoToMat
:
中 sink 的影响
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
viaMat(alsoToGraph(that))(matF)
这个函数适配returnIOResult
并不难,根据源码:
final case class IOResult(count: Long, status: Try[Done]) { ... }
还有最后一件事你需要注意——你希望你的来源是这样的:
Source[ByteString, Future[IOResult]]
但是如果你不想将这些 mat 值携带到流定义的最后,然后根据这个未来的完成做 smth,那可能是容易出错的方法。例如,在这个例子中我完成了基于那个未来的工作,所以最后一个值将不会被处理:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
object App extends App {
private implicit val sys: ActorSystem = ActorSystem()
private implicit val mat: ActorMaterializer = ActorMaterializer()
private implicit val ec: ExecutionContext = sys.dispatcher
val source: Source[Int, Any] = Source((1 to 5).toList)
def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
f.onComplete(t => println(s"f1 completed - $t"))
Await.ready(f, 5.minutes)
mat.shutdown()
sys.terminate()
}
Alpakka 提供了一种访问数十种不同数据源的好方法。 HDFS 和 FTP 等面向文件的源作为 Source[ByteString, Future[IOResult]
交付。但是,通过 Akka HTTP 的 HTTP 请求作为 Source[ByteString, NotUsed]
的实体流传送。在我的用例中,我想从 HTTP 源检索内容为 Source[ByteString, Future[IOResult]
,这样我就可以构建一个统一的资源获取器,该资源获取器适用于多种方案(本例中为 hdfs、文件、ftp 和 S3)。
特别是,我想将 Source[ByteString, NotUsed]
源转换为
Source[ByteString, Future[IOResult]
我可以从传入的字节流中计算出 IOResult。有很多方法,如 flatMapConcat
和 viaMat
但 none 似乎能够从输入流中提取详细信息(例如读取的字节数)或初始化 IOResult
结构合理。理想情况下,我正在寻找一种具有以下签名的方法,它将在流进入时更新 IOResult。
def matCalc(src: Source[ByteString, Any]) = Source[ByteString, Future[IOResult]] = {
src.someMatFoldMagic[ByteString, IOResult](IOResult.createSuccessful(0))(m, b) => m.withCount(m.count + b.length))
}
这可以通过使用 Promise
进行物化值传播来完成。
val completion = Promise[IoResult]
val httpWithIoResult = http.mapMaterializedValue(_ => completion.future)
现在剩下的就是在相关数据可用时完成completion
承诺。
另一种方法是下降到 GraphStage
API,在那里您可以对物化值传播进行较低级别的控制。但即使在那里使用 Promises
通常也是物化值传播的选择实现。看一下内置的运算符实现,例如 Ignore
.
我不记得有任何现成的功能可以立即执行此操作,但是您可以使用 alsoToMat (surprisingly didn't find it in akka streams docs, although you can look it in source code documentation & java api) flow function together with Sink.fold 来积累一些价值并在最后给予它。例如:
def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
问题在于 alsoToMat
将输入垫值与 alsoToMat
中提供的值相结合。同时,source 产生的值不受 alsoToMat
:
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] =
viaMat(alsoToGraph(that))(matF)
这个函数适配returnIOResult
并不难,根据源码:
final case class IOResult(count: Long, status: Try[Done]) { ... }
还有最后一件事你需要注意——你希望你的来源是这样的:
Source[ByteString, Future[IOResult]]
但是如果你不想将这些 mat 值携带到流定义的最后,然后根据这个未来的完成做 smth,那可能是容易出错的方法。例如,在这个例子中我完成了基于那个未来的工作,所以最后一个值将不会被处理:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
object App extends App {
private implicit val sys: ActorSystem = ActorSystem()
private implicit val mat: ActorMaterializer = ActorMaterializer()
private implicit val ec: ExecutionContext = sys.dispatcher
val source: Source[Int, Any] = Source((1 to 5).toList)
def magic(source: Source[Int, Any]): Source[Int, Future[Int]] =
source.alsoToMat(Sink.fold(0)((acc, _) => acc + 1))((_, f) => f)
val f = magic(source).throttle(1, 1.second).toMat(Sink.foreach(println))(Keep.left).run()
f.onComplete(t => println(s"f1 completed - $t"))
Await.ready(f, 5.minutes)
mat.shutdown()
sys.terminate()
}