gpt4 book ai didi

scala - Akka 流重试重复结果

转载 作者:行者123 更新时间:2023-12-01 12:25:58 26 4
gpt4 key购买 nike

我正在为 HTTP 资源实现一个迭代器,它可以恢复分页元素的列表,我尝试使用普通的 Iterator 来执行此操作,但它是一个阻塞实现,并且由于我我正在使用 akka,这让我的调度员有点发疯。

我的意愿是使用 akka-stream 实现相同的迭代器。问题是我需要一些不同的重试策略。

该服务返回一个元素列表,由 id 标识,有时当我查询下一页时,该服务返回当前页面上的相同产品。

我目前的算法是这样的。

var seenIds = Set.empty
var position = 0

def isProblematicPage(elements: Seq[Element]) Boolean = {
val currentIds = elements.map(_.id)
val intersection = seenIds & currentIds
val hasOnlyNewIds = intersection.isEmpty
if (hasOnlyNewIds) {
seenIds = seenIds | currentIds
}
!hasOnlyNewIds
}

def incrementPage(): Unit = {
position += 10
}

def doBackOff(attempt: Int): Unit = {
// Backoff logic
}

@tailrec
def fetchPage(attempt: Int = 0): Iterator[Element] = {
if (attempt > MaxRetries) {
incrementPage()
return Iterator.empty
}

val eventualPage = service.retrievePage(position, position + 10)

val page = Await.result(eventualPage, 5 minutes)

if (isProblematicPage(page)) {
doBackOff(attempt)
fetchPage(attempt + 1)
} else {
incrementPage()
page.iterator
}
}

我正在使用 akka-streams 进行实现,但我不知道如何使用流结构累积页面和测试重复。

有什么建议吗?

最佳答案

Flow.scan 方法在这种情况下很有用。

我会以职位来源开始您的直播:

type Position = Int

//0,10,20,...
def positionIterator() : Iterator[Position] = Iterator from (0,10)

val positionSource : Source[Position,_] = Source fromIterator positionIterator

然后可以将此位置源定向到 Flow.scan,它使用类似于您的 fetchPage 的功能(旁注:您应该尽可能避免等待,有一种方法可以在您的代码中不等待,但这超出了您原始问题的范围)。新函数需要接受已经看到的元素的“状态”:

def fetchPageWithState(service : Service)
(seenEls : Set[Element], position : Position) : Set[Elements] = {

val maxRetries = 10

val seenIds = seenEls map (_.id)

@tailrec
def readPosition(attempt : Int) : Seq[Elements] = {
if(attempt > maxRetries)
Iterator.empty
else {
val eventualPage : Seq[Element] =
Await.result(service.retrievePage(position, position + 10), 5 minutes)

if(eventualPage.map(_.id).exists(seenIds.contains)) {
doBackOff(attempt)
readPosition(attempt + 1)
}
else
eventualPage
}
}//end def readPosition

seenEls ++ readPosition(0).toSet
}//end def fetchPageWithState

这现在可以在 Flow 中使用:

def fetchFlow(service : Service) : Flow[Position, Set[Element],_] = 
Flow[Position].scan(Set.empty[Element])(fetchPageWithState(service))

新的 Flow 可以很容易地连接到您的 Position Source 以创建 Set[Element] 的 Source:

def elementsSource(service : Service) : Source[Set[Element], _] = 
positionSource via fetchFlow(service)

elementsSource 中的每个新值都将是来自已获取页面的一组不断增长的独特元素。

关于scala - Akka 流重试重复结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39619872/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com