处理分页结果的 Akka Streams 流程未完成
Akka Streams flow to handle paginated results doesn't complete
我想实现一个 Flow 来处理分页结果(例如,底层服务 returns 一些结果,但也表明通过发出另一个请求、传入游标等方式可以获得更多结果)。
到目前为止我完成的事情:
我已经实现了以下流程并进行了测试,但是流程没有完成。
object AdditionalRequestsFlow {
private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = builder.add(Flow[Request])
val bcast = builder.add(Broadcast[Request](2))
val merge = builder.add(Zip[Request, Response]())
in ~> bcast ~> merge.in0
bcast ~> flow ~> merge.in1
FlowShape(in.in, merge.out)
})
}
def flow[Request, Response, Output](
inputFlow: Flow[Request, Response, NotUsed],
anotherRequest: (Request, Response) => Option[Request],
extractOutput: Response => Output,
mergeOutput: (Output, Output) => Output
): Flow[Request, Output, NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val start = b.add(Flow[Request])
val merge = b.add(Merge[Request](2))
val underlying = b.add(keepRequest(inputFlow))
val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList))
val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) =>
(res, anotherRequest(req, res))
})
val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work
start ~> merge ~> underlying ~> unzip.in
unzip.out0 ~> finish
merge <~ unOption <~ unzip.out1
FlowShape(start.in, finish.out)
})
}
}
测试:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.FlatSpec
import org.scalatest.Matchers._
import cats.syntax.option._
import org.scalatest.concurrent.ScalaFutures.whenReady
class AdditionalRequestsFlowSpec extends FlatSpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
case class Request(max: Int, batchSize: Int, offset: Option[Int] = None)
case class Response(values: List[Int], nextOffset: Option[Int])
private val flow: Flow[Request, Response, NotUsed] = {
Flow[Request]
.map { request =>
val start = request.offset.getOrElse(0)
val end = Math.min(request.max, start + request.batchSize)
val nextOffset = if (end == request.max) None else Some(end)
val result = Response((start until end).toList, nextOffset)
result
}
}
"AdditionalRequestsFlow" should "collect additional responses" in {
def anotherRequest(request: Request, response: Response): Option[Request] = {
response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) }
}
def extract(x: Response): List[Int] = x.values
def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b
val requests =
Request(max = 35, batchSize = 10) ::
Request(max = 5, batchSize = 10) ::
Request(max = 100, batchSize = 1) ::
Nil
val expected = requests.map { x =>
(0 until x.max).toList
}
val future = Source(requests)
.via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge))
.runWith(Sink.seq)
whenReady(future) { x =>
x shouldEqual expected
}
}
}
以糟糕的、阻塞的方式实现了相同的流程来说明我正在努力实现的目标:
def uglyHackFlow[Request, Response, Output](
inputFlow: Flow[Request, Response, NotUsed],
anotherRequest: (Request, Response) => Option[Request],
extractOutput: Response => Output,
mergeOutput: (Output, Output) => Output
): Flow[Request, Output, NotUsed] = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
Flow[Request]
.map { x =>
def grab(request: Request): Output = {
val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :(
val another = anotherRequest(request, response)
val output = extractOutput(response)
another.map { another =>
mergeOutput(output, grab(another))
} getOrElse output
}
grab(x)
}
}
这行得通(但我们现在不应该实现任何东西/Await
-ing)。
已查看 http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks 我认为其中包含答案,但我似乎无法在那里找到答案。在我的例子中,我希望循环最多包含一个元素,这样既不会发生缓冲区溢出也不会发生完全饥饿——但显然会发生。
尝试使用 .withAttributes(Attributes(LogLevels(...)))
调试流,但是尽管记录器配置看似正确,但它没有产生任何输出。
我正在寻找如何修复 flow
方法以保持相同的签名和语义(测试会通过)的提示。
或者我在这里做的事情可能完全不合常理(例如,akka-stream-contrib
中有一个现有功能可以解决这个问题)?
我认为使用 Source.unfold
比创建自定义图表更安全。这是我通常做的事情(根据 API 会有细微的变化)。
override def getArticles(lastTokenOpt: Option[String], filterIds: (Seq[Id]) => Seq[Id]): Source[Either[String, ImpArticle], NotUsed] = {
val maxRows = 1000
def getUri(cursor: String, count: Int) = s"/works?rows=$count&filter=type:journal-article&order=asc&sort=deposited&cursor=${URLEncoder.encode(cursor, "UTF-8")}"
Source.unfoldAsync(lastTokenOpt.getOrElse("*")) { cursor =>
println(s"Getting ${getUri(cursor, maxRows)}")
if (cursor.nonEmpty) {
sendGetRequest[CrossRefResponse[CrossRefList[JsValue]]](getUri(cursor, maxRows)).map {
case Some(response) =>
response.message match {
case Left(list) if response.status == "ok" =>
println(s"Got ${list.items.length} items")
val items = list.items.flatMap { js =>
try {
parseArticle(js)
} catch {
case ex: Throwable =>
logger.error(s"Error on parsing: ${js.compactPrint}")
throw ex
}
}
list.`next-cursor` match {
case Some(nextCursor) =>
Some(nextCursor -> (items.map(Right.apply).toList ::: List(Left(nextCursor))))
case None =>
logger.error(s"`next-cursor` is missing when fetching from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
Some("" -> items.map(Right.apply).toList)
}
case Left(jsvalue) if response.status != "ok" =>
logger.error(s"API error on fetching data from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
None
case Right(someError) =>
val cause = someError.fold(errors => errors.map(_.message).mkString(", "), ex => ex.message)
logger.error(s"API error on fetching data from CrossRef [status $cause}][${getUri(cursor, maxRows)}]")
None
}
case None =>
logger.error(s"Got error on fetching ${getUri(cursor, maxRows)} from CrossRef")
None
}
} else
Future.successful(None)
}.mapConcat(identity)
}
在您的情况下,您可能甚至不需要将光标推到流中。我这样做是因为我将最后一个成功的游标存储在数据库中,以便在以后发生故障时能够恢复。
感觉像这样 video 涵盖了您正在尝试做的事情的要点。他们创建了一个自定义 Graphstage 来维护状态并将其发送回服务器,响应流取决于发回的状态,他们也有一个事件来表示完成(在您的情况下,它就是您进行此检查的地方
if (end == request.max) None
我想实现一个 Flow 来处理分页结果(例如,底层服务 returns 一些结果,但也表明通过发出另一个请求、传入游标等方式可以获得更多结果)。
到目前为止我完成的事情:
我已经实现了以下流程并进行了测试,但是流程没有完成。
object AdditionalRequestsFlow { private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val in = builder.add(Flow[Request]) val bcast = builder.add(Broadcast[Request](2)) val merge = builder.add(Zip[Request, Response]()) in ~> bcast ~> merge.in0 bcast ~> flow ~> merge.in1 FlowShape(in.in, merge.out) }) } def flow[Request, Response, Output]( inputFlow: Flow[Request, Response, NotUsed], anotherRequest: (Request, Response) => Option[Request], extractOutput: Response => Output, mergeOutput: (Output, Output) => Output ): Flow[Request, Output, NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val start = b.add(Flow[Request]) val merge = b.add(Merge[Request](2)) val underlying = b.add(keepRequest(inputFlow)) val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList)) val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) => (res, anotherRequest(req, res)) }) val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work start ~> merge ~> underlying ~> unzip.in unzip.out0 ~> finish merge <~ unOption <~ unzip.out1 FlowShape(start.in, finish.out) }) } }
测试:
import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} import org.scalatest.FlatSpec import org.scalatest.Matchers._ import cats.syntax.option._ import org.scalatest.concurrent.ScalaFutures.whenReady class AdditionalRequestsFlowSpec extends FlatSpec { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() case class Request(max: Int, batchSize: Int, offset: Option[Int] = None) case class Response(values: List[Int], nextOffset: Option[Int]) private val flow: Flow[Request, Response, NotUsed] = { Flow[Request] .map { request => val start = request.offset.getOrElse(0) val end = Math.min(request.max, start + request.batchSize) val nextOffset = if (end == request.max) None else Some(end) val result = Response((start until end).toList, nextOffset) result } } "AdditionalRequestsFlow" should "collect additional responses" in { def anotherRequest(request: Request, response: Response): Option[Request] = { response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) } } def extract(x: Response): List[Int] = x.values def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b val requests = Request(max = 35, batchSize = 10) :: Request(max = 5, batchSize = 10) :: Request(max = 100, batchSize = 1) :: Nil val expected = requests.map { x => (0 until x.max).toList } val future = Source(requests) .via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge)) .runWith(Sink.seq) whenReady(future) { x => x shouldEqual expected } } }
以糟糕的、阻塞的方式实现了相同的流程来说明我正在努力实现的目标:
def uglyHackFlow[Request, Response, Output]( inputFlow: Flow[Request, Response, NotUsed], anotherRequest: (Request, Response) => Option[Request], extractOutput: Response => Output, mergeOutput: (Output, Output) => Output ): Flow[Request, Output, NotUsed] = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() Flow[Request] .map { x => def grab(request: Request): Output = { val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :( val another = anotherRequest(request, response) val output = extractOutput(response) another.map { another => mergeOutput(output, grab(another)) } getOrElse output } grab(x) } }
这行得通(但我们现在不应该实现任何东西/
Await
-ing)。已查看 http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks 我认为其中包含答案,但我似乎无法在那里找到答案。在我的例子中,我希望循环最多包含一个元素,这样既不会发生缓冲区溢出也不会发生完全饥饿——但显然会发生。
尝试使用
.withAttributes(Attributes(LogLevels(...)))
调试流,但是尽管记录器配置看似正确,但它没有产生任何输出。
我正在寻找如何修复 flow
方法以保持相同的签名和语义(测试会通过)的提示。
或者我在这里做的事情可能完全不合常理(例如,akka-stream-contrib
中有一个现有功能可以解决这个问题)?
我认为使用 Source.unfold
比创建自定义图表更安全。这是我通常做的事情(根据 API 会有细微的变化)。
override def getArticles(lastTokenOpt: Option[String], filterIds: (Seq[Id]) => Seq[Id]): Source[Either[String, ImpArticle], NotUsed] = {
val maxRows = 1000
def getUri(cursor: String, count: Int) = s"/works?rows=$count&filter=type:journal-article&order=asc&sort=deposited&cursor=${URLEncoder.encode(cursor, "UTF-8")}"
Source.unfoldAsync(lastTokenOpt.getOrElse("*")) { cursor =>
println(s"Getting ${getUri(cursor, maxRows)}")
if (cursor.nonEmpty) {
sendGetRequest[CrossRefResponse[CrossRefList[JsValue]]](getUri(cursor, maxRows)).map {
case Some(response) =>
response.message match {
case Left(list) if response.status == "ok" =>
println(s"Got ${list.items.length} items")
val items = list.items.flatMap { js =>
try {
parseArticle(js)
} catch {
case ex: Throwable =>
logger.error(s"Error on parsing: ${js.compactPrint}")
throw ex
}
}
list.`next-cursor` match {
case Some(nextCursor) =>
Some(nextCursor -> (items.map(Right.apply).toList ::: List(Left(nextCursor))))
case None =>
logger.error(s"`next-cursor` is missing when fetching from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
Some("" -> items.map(Right.apply).toList)
}
case Left(jsvalue) if response.status != "ok" =>
logger.error(s"API error on fetching data from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
None
case Right(someError) =>
val cause = someError.fold(errors => errors.map(_.message).mkString(", "), ex => ex.message)
logger.error(s"API error on fetching data from CrossRef [status $cause}][${getUri(cursor, maxRows)}]")
None
}
case None =>
logger.error(s"Got error on fetching ${getUri(cursor, maxRows)} from CrossRef")
None
}
} else
Future.successful(None)
}.mapConcat(identity)
}
在您的情况下,您可能甚至不需要将光标推到流中。我这样做是因为我将最后一个成功的游标存储在数据库中,以便在以后发生故障时能够恢复。
感觉像这样 video 涵盖了您正在尝试做的事情的要点。他们创建了一个自定义 Graphstage 来维护状态并将其发送回服务器,响应流取决于发回的状态,他们也有一个事件来表示完成(在您的情况下,它就是您进行此检查的地方
if (end == request.max) None