gpt4 book ai didi

f# - API速率限制器间歇性挂起

转载 作者:行者123 更新时间:2023-12-02 05:53:27 24 4
gpt4 key购买 nike

我编写了一个简单的(我认为...)速率限制器,以将事件驱动系统保持在我们许可的 API 命中限制之下。由于某种原因,在发送 400-500 个请求后,它有时会占用。

我最好的想法是我搞砸了等待函数,因此在某些情况下它永远不会返回,但我一直无法找到有缺陷的逻辑。另一个想法是我搞砸了异步/任务互操作,导致了问题。它总是首先有效,然后再有效。 ApiRateLimiter 的单个实例在多个组件之间共享,以便遵守系统范围内的命中限制。

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>

type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =

let requestLimit = Math.Max(limitCount,1)

let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox ->

let rec waitUntilUnderLimit (recentRequestsTimeSent: seq<DateTimeOffset>) = async{
let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
let requestsWithinLimit =
recentRequestsTimeSent
|> Seq.filter(fun x -> x >= cutoffTime)
|> Seq.toList

if requestsWithinLimit.Length >= requestLimit then
let! _ = Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
return! waitUntilUnderLimit requestsWithinLimit
else
return requestsWithinLimit
}

let rec messageLoop (mostRecentRequestsTimeSent: seq<DateTimeOffset>) = async{
// read a message
let! keyedRequest,replyChannel = inbox.Receive()
// wait until we are under our rate limit
let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent

let rightNow = DateTimeOffset.UtcNow

let! response =
keyedRequest.Request
|> httpClient.SendAsync
|> Async.AwaitTask

replyChannel.Reply { Key = keyedRequest.Key; Response = response }

return! messageLoop (seq {
yield rightNow
yield! remainingRecentRequests
})
}

// start the loop
messageLoop (Seq.empty<DateTimeOffset>)
)

member this.QueueApiRequest keyedRequest =
async {
return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
} |> Async.StartAsTask

有些请求很大并且需要一点时间,但没有什么会导致我在这个东西上看到的请求发送完全死亡。

感谢您花一点时间查看!

最佳答案

我注意到您正在使用 seq 构建最近发送请求的时间列表:

seq {
yield rightNow
yield! remainingRecentRequests
}

由于 F# 序列是惰性的,因此会生成一个枚举器,当询问其下一个值时,该枚举器将首先生成一个值,然后开始迭代其子序列并生成一个值。每次产生新的请求时,都会添加一个新的枚举器——但是旧的枚举器什么时候被处理掉呢?您可能认为它们一旦过期就会被处置,即一旦 Seq.filterwaitUntilUnderLimit返回假。但想一想:F# 编译器如何知道过滤条件一旦为 false 一次就始终为 false?如果没有深入的代码分析(编译器不会这样做),它就不能。因此,“旧”的 seq 永远不会被垃圾收集,因为它们仍然被保留以备不时之需。我不能 100% 确定这一点,因为我还没有测量代码的内存使用情况,但如果您要测量 ApiRateLimiter 的内存使用情况例如,我打赌您会看到它稳步增长而不会下降。

我还注意到您正在将新项目添加到序列的前面。这与 F# 列表使用的语义完全相同,但对于列表,没有要分配的 IEnumerable 对象,并且一旦列表项失败 List.filter条件下,将被处置。因此,我重写了您的代码以使用最近时间列表而不是 seq,并且为了提高效率我还进行了另一项更改:因为您创建列表的方式保证了它将被排序,最近的事件在前,最旧的在最后,我替换了List.filterList.takeWhile 。这样,当第一个日期早于截止日期的那一刻,它将停止检查较旧的日期。

通过此更改,您现在应该拥有旧日期实际上即将到期,以及 ApiRateLimiter 的内存使用情况类(Class)应该波动但保持不变。 (每次调用 waitUntilUnderLimit 时都会创建新列表,因此会产生一些 GC 压力,但这些都应该在第 0 代中)。我不知道这是否能解决您的挂起问题,但这是我在您的代码中看到的唯一问题。

顺便说一句,我也替换了你的线路let! _ = Async.Sleep 100do! Async.Sleep 100 ,这更简单。这里没有效率提升,但没有必要使用 let! _ =等待Async<unit>回来;这正是 do! 的内容关键字是for。

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>

type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =

let requestLimit = Math.Max(limitCount,1)

let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox ->

let rec waitUntilUnderLimit (recentRequestsTimeSent: DateTimeOffset list) = async{
let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
let requestsWithinLimit =
recentRequestsTimeSent
|> List.takeWhile (fun x -> x >= cutoffTime)

if List.length requestsWithinLimit >= requestLimit then
do! Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
return! waitUntilUnderLimit requestsWithinLimit
else
return requestsWithinLimit
}

let rec messageLoop (mostRecentRequestsTimeSent: DateTimeOffset list) = async{
// read a message
let! keyedRequest,replyChannel = inbox.Receive()
// wait until we are under our rate limit
let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent

let rightNow = DateTimeOffset.UtcNow

let! response =
keyedRequest.Request
|> httpClient.SendAsync
|> Async.AwaitTask

replyChannel.Reply { Key = keyedRequest.Key; Response = response }

return! messageLoop (rightNow :: remainingRecentRequests)
}

// start the loop
messageLoop []
)

member this.QueueApiRequest keyedRequest =
async {
return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
} |> Async.StartAsTask

关于f# - API速率限制器间歇性挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49619467/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com