gpt4 book ai didi

go - 限制来自 channel 的已处理消息的数量

转载 作者:行者123 更新时间:2023-12-01 22:22:35 24 4
gpt4 key购买 nike

我通过 channel 收到大约 200 000 条消息/秒给我的工作人员,我需要将发送给客户端的消息数量限制为每秒 20 条。
这使它每 50 毫秒 1 条消息

在 LOOP 的帮助下,worker 在整个程序生命周期中仍然活着(而不是为每条消息打开一个 channel )。

我的目标:
- 由于消息的顺序很重要,我想跳过在阻塞的 50 毫秒内收到的所有消息,只保存最新的消息
- 如果最新的消息在阻塞的 50 毫秒内出现,我希望在循环内的阻塞时间结束并且没有新消息到来时处理保存的消息! <--这是我的问题

我的策略
- 继续发送尚未处理的最新消息到同一 channel

但是它的问题是,如果该消息是在(来自应用程序的)新消息之后发送的呢?

下面的代码更像是一个作为工作代码的算法,只是想要一个关于如何做到这一点的提示/方法。

func example (new_message_from_channel <-chan *message) {
default = message
time = now_milliseconds
diff_accepted = 50milli
for this_message := range new_message_from_channel {
if now_millisecond - time >= diff_accepted {
send_it_to_the_client
time = now_milliseconds
} else {
//save the latest message
default = this_message

//My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!

//My strategy - keep sending it to the same channel
theChannel <- default
}

}
}

如果你有一个优雅的方法,欢迎你与我分享:)

最佳答案

使用速率限制器,您可以创建 throttle函数将采用:速率和 channel 作为输入;并返回两个 channel - 一个包含所有原始 channel 项目,另一个仅以固定速率中继项目:

func throttle(r time.Duration, in <-chan event) (C, tC <-chan event) {

// "writeable" channels
var (
wC = make(chan event)
wtC = make(chan event)
)

// read-only channels - returned to caller
C = wC
tC = wtC

go func() {
defer close(wC)
defer close(wtC)

rl := rate.NewLimiter(
rate.Every(r),
1,
)

// relays input channel's items to two channels:
// (1) gets all writes from original channel
// (2) only writes at a fixed frequency
for ev := range in {
wC <- ev
if rl.Allow() {
wtC <- ev
}
}
}()
return
}

工作示例: https://play.golang.org/p/upei0TiyzNr

编辑:

为了避免使用速率限制器,而是使用简单的 time.Ticker :
tick := time.NewTicker(r)

for ev := range in {
select {
case wC <- ev: // write to main
case <-tick.C:
wC <- ev // write to main ...
wtC <- ev // ... plus throttle channel
}
}

工作示例: https://play.golang.org/p/UTRXh72BvRl

关于go - 限制来自 channel 的已处理消息的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62239320/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com