Akka 流重试重复结果
Akka stream retry repeated result
我正在为 HTTP 资源实现一个迭代器,我可以用它恢复分页元素列表,我试图用普通的 Iterator
来做到这一点,但它是一个阻塞实现,因为我'我使用 akka
它让我的调度员有点疯狂。
我的意愿是使用 akka-stream
实现相同的迭代器。问题是我需要一些不同的重试策略。
服务 returns 元素列表,由 id
标识,有时当我查询下一页时,服务 returns 当前页面上的相同产品.
我现在的算法是这样的。
var seenIds = Set.empty
var position = 0
def isProblematicPage(elements: Seq[Element]) Boolean = {
val currentIds = elements.map(_.id)
val intersection = seenIds & currentIds
val hasOnlyNewIds = intersection.isEmpty
if (hasOnlyNewIds) {
seenIds = seenIds | currentIds
}
!hasOnlyNewIds
}
def incrementPage(): Unit = {
position += 10
}
def doBackOff(attempt: Int): Unit = {
// Backoff logic
}
@tailrec
def fetchPage(attempt: Int = 0): Iterator[Element] = {
if (attempt > MaxRetries) {
incrementPage()
return Iterator.empty
}
val eventualPage = service.retrievePage(position, position + 10)
val page = Await.result(eventualPage, 5 minutes)
if (isProblematicPage(page)) {
doBackOff(attempt)
fetchPage(attempt + 1)
} else {
incrementPage()
page.iterator
}
}
我正在使用 akka-streams
进行实现,但我不知道如何使用流结构累积页面和测试重复。
有什么建议吗?
Flow.scan
方法在这种情况下很有用。
我会以职位来源开始您的直播:
type Position = Int
//0,10,20,...
def positionIterator() : Iterator[Position] = Iterator from (0,10)
val positionSource : Source[Position,_] = Source fromIterator positionIterator
然后可以将此位置源定向到 Flow.scan
,它使用类似于您的 fetchPage
的功能(旁注:您应该尽可能避免等待,有一种方法可以避免在您的代码中等待,但这超出了您原始问题的范围)。新函数需要接受已经看到的元素的"state":
def fetchPageWithState(service : Service)
(seenEls : Set[Element], position : Position) : Set[Elements] = {
val maxRetries = 10
val seenIds = seenEls map (_.id)
@tailrec
def readPosition(attempt : Int) : Seq[Elements] = {
if(attempt > maxRetries)
Iterator.empty
else {
val eventualPage : Seq[Element] =
Await.result(service.retrievePage(position, position + 10), 5 minutes)
if(eventualPage.map(_.id).exists(seenIds.contains)) {
doBackOff(attempt)
readPosition(attempt + 1)
}
else
eventualPage
}
}//end def readPosition
seenEls ++ readPosition(0).toSet
}//end def fetchPageWithState
现在可以在 Flow
:
中使用
def fetchFlow(service : Service) : Flow[Position, Set[Element],_] =
Flow[Position].scan(Set.empty[Element])(fetchPageWithState(service))
新的 Flow 可以很容易地连接到您的 Position Source 以创建 Set[Element]
:
的 Source
def elementsSource(service : Service) : Source[Set[Element], _] =
positionSource via fetchFlow(service)
elementsSource
中的每个新值都将是来自已获取页面的一组不断增长的唯一元素。
Flow.scan
阶段是一个很好的建议,但是它缺少处理期货的功能,所以我实现了它的异步版本Flow.scanAsync
它现在可以在 akka 2.4.12
上使用。
当前的实现是:
val service: WebService
val maxTries: Int
val backOff: FiniteDuration
def retry[T](zero: T, attempt: Int = 0)(f: => Future[T]): Future[T] = {
f.recoverWith {
case ex if attempt >= maxAttempts =>
Future(zero)
case ex =>
akka.pattern.after(backOff, system.scheduler)(retry(zero, attempt + 1)(f))
}
}
def isProblematicPage(lastPage: Seq[Element], currPage: Seq[Element]): Boolean = {
val lastPageIds = lastPage.map(_.id).toSet
val currPageIds = currPage.map(_.id).toSet
val intersection = lastPageIds & currPageIds
intersection.nonEmpty
}
def retrievePage(lastPage: Seq[Element], startIndex: Int): Future[Seq[Element]] = {
retry(Seq.empty) {
service.fetchPage(startIndex).map { currPage: Seq[Element] =>
if (isProblematicPage(lastPage, currPage)) throw new ProblematicPageException(startIndex)
else currPage
}
}
}
val pagesRange: Range = Range(0, maxItems, pageSize)
val scanAsyncFlow = Flow[Int].via(ScanAsync(Seq.empty)(retrievePage))
Source(pagesRange)
.via(scanAsyncFlow)
.mapConcat(identity)
.runWith(Sink.seq)
感谢拉蒙的建议:)
我正在为 HTTP 资源实现一个迭代器,我可以用它恢复分页元素列表,我试图用普通的 Iterator
来做到这一点,但它是一个阻塞实现,因为我'我使用 akka
它让我的调度员有点疯狂。
我的意愿是使用 akka-stream
实现相同的迭代器。问题是我需要一些不同的重试策略。
服务 returns 元素列表,由 id
标识,有时当我查询下一页时,服务 returns 当前页面上的相同产品.
我现在的算法是这样的。
var seenIds = Set.empty
var position = 0
def isProblematicPage(elements: Seq[Element]) Boolean = {
val currentIds = elements.map(_.id)
val intersection = seenIds & currentIds
val hasOnlyNewIds = intersection.isEmpty
if (hasOnlyNewIds) {
seenIds = seenIds | currentIds
}
!hasOnlyNewIds
}
def incrementPage(): Unit = {
position += 10
}
def doBackOff(attempt: Int): Unit = {
// Backoff logic
}
@tailrec
def fetchPage(attempt: Int = 0): Iterator[Element] = {
if (attempt > MaxRetries) {
incrementPage()
return Iterator.empty
}
val eventualPage = service.retrievePage(position, position + 10)
val page = Await.result(eventualPage, 5 minutes)
if (isProblematicPage(page)) {
doBackOff(attempt)
fetchPage(attempt + 1)
} else {
incrementPage()
page.iterator
}
}
我正在使用 akka-streams
进行实现,但我不知道如何使用流结构累积页面和测试重复。
有什么建议吗?
Flow.scan
方法在这种情况下很有用。
我会以职位来源开始您的直播:
type Position = Int
//0,10,20,...
def positionIterator() : Iterator[Position] = Iterator from (0,10)
val positionSource : Source[Position,_] = Source fromIterator positionIterator
然后可以将此位置源定向到 Flow.scan
,它使用类似于您的 fetchPage
的功能(旁注:您应该尽可能避免等待,有一种方法可以避免在您的代码中等待,但这超出了您原始问题的范围)。新函数需要接受已经看到的元素的"state":
def fetchPageWithState(service : Service)
(seenEls : Set[Element], position : Position) : Set[Elements] = {
val maxRetries = 10
val seenIds = seenEls map (_.id)
@tailrec
def readPosition(attempt : Int) : Seq[Elements] = {
if(attempt > maxRetries)
Iterator.empty
else {
val eventualPage : Seq[Element] =
Await.result(service.retrievePage(position, position + 10), 5 minutes)
if(eventualPage.map(_.id).exists(seenIds.contains)) {
doBackOff(attempt)
readPosition(attempt + 1)
}
else
eventualPage
}
}//end def readPosition
seenEls ++ readPosition(0).toSet
}//end def fetchPageWithState
现在可以在 Flow
:
def fetchFlow(service : Service) : Flow[Position, Set[Element],_] =
Flow[Position].scan(Set.empty[Element])(fetchPageWithState(service))
新的 Flow 可以很容易地连接到您的 Position Source 以创建 Set[Element]
:
def elementsSource(service : Service) : Source[Set[Element], _] =
positionSource via fetchFlow(service)
elementsSource
中的每个新值都将是来自已获取页面的一组不断增长的唯一元素。
Flow.scan
阶段是一个很好的建议,但是它缺少处理期货的功能,所以我实现了它的异步版本Flow.scanAsync
它现在可以在 akka 2.4.12
上使用。
当前的实现是:
val service: WebService
val maxTries: Int
val backOff: FiniteDuration
def retry[T](zero: T, attempt: Int = 0)(f: => Future[T]): Future[T] = {
f.recoverWith {
case ex if attempt >= maxAttempts =>
Future(zero)
case ex =>
akka.pattern.after(backOff, system.scheduler)(retry(zero, attempt + 1)(f))
}
}
def isProblematicPage(lastPage: Seq[Element], currPage: Seq[Element]): Boolean = {
val lastPageIds = lastPage.map(_.id).toSet
val currPageIds = currPage.map(_.id).toSet
val intersection = lastPageIds & currPageIds
intersection.nonEmpty
}
def retrievePage(lastPage: Seq[Element], startIndex: Int): Future[Seq[Element]] = {
retry(Seq.empty) {
service.fetchPage(startIndex).map { currPage: Seq[Element] =>
if (isProblematicPage(lastPage, currPage)) throw new ProblematicPageException(startIndex)
else currPage
}
}
}
val pagesRange: Range = Range(0, maxItems, pageSize)
val scanAsyncFlow = Flow[Int].via(ScanAsync(Seq.empty)(retrievePage))
Source(pagesRange)
.via(scanAsyncFlow)
.mapConcat(identity)
.runWith(Sink.seq)
感谢拉蒙的建议:)