gpt4 book ai didi

go - 如何中断 channel 上的发送

转载 作者:行者123 更新时间:2023-12-03 10:07:34 24 4
gpt4 key购买 nike

我正在尝试实现一个组件,该组件查询数据库中的事件并通知用户是否有新事件通过 channel 可用。它基本上是这样的:

type struct Watcher {
events chan Event
}

func (w *Watcher) Watch() <- chan Event {
w.fetch()
return w.events
}

func (w *Watcher) Stop() {
// ???
}

func (w *Watcher) fetch() {
//...
for {
//...
e := fetchEvent()
w.events <- e
time.sleep(10)
//...
}
}
客户端看起来像这样:
events := NewWatcher().Events()
e := <- events
//...

我的问题是如何正确实现 Stop()方法?
特别是,如果发送方当前正在等待能够向 channel 发送事件,我该如何中断发送方我知道从接收方关闭 channel 是没有选择的。此类问题的最佳做法是什么?

最佳答案

或者,您可以根据上下文停止。这更好,因为如果由于某种原因另一端已经停止或被其他东西阻塞等,用于停止它的 channel 可能会阻塞。

type Watcher struct {
events chan Event
cancel context.CancelFunc
}

func (w *Watcher) Watch() <-chan Event {
ctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
go w.fetch(ctx)
return w.events
}

func (w *Watcher) Stop() {
w.cancel()
}

func (w *Watcher) fetch(ctx context.Context) {
for {
e := fetchEvent()
select {
case <-ctx.Done():
return
case w.events <- e:
}
}
}
此示例创建一个 ContextWatch函数(在获取开始之前)并注册 cancel Watcher 中的函数。 ctx然后传递给 fetch检查 ctx.Done在每次迭代中。如果上下文是 Done ,它将停止获取。

您可能想要初始化 cancel函数是一个空函数,所以它永远不会 nilStop可以在没有 panic 的情况下调用即使 Watcher从未开始。
func NewWatcher() *Watcher {
return &Watcher{
// ...
cancel: func() {},
}
}

如果你还想从外部传入一个上下文,你可以重写你的 Watch功能:
func (w *Watcher) Watch(ctx context.Context) <-chan Event {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.fetch(ctx)
return w.events
}
现在 Watcher会停到 fetch如果 Stop函数被调用或外部 ctx被取消(例如,如果您的应用程序关闭)。

如果 Watcher.events其他任何地方都没有使用 channel ,您可以在 Watch 中创建它函数并将其传递给 fetch以及退货。这样 Watch函数也可以被多次调用,它们将有单独的 channel ,而不是多个读取器从同一 channel 读取。
如果你这样做,你可能还想有办法单独取消它们,这样你就可以返回 cancel来自 Watch 的函数函数并将其从 Watcher 中删除结构。还有 Stop现在已经过时了。
func (w *Watcher) Watch() (<-chan Event, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

// with small buffer for channel to smooth sending/receiving
ch := make(chan Event, 5)

go w.fetch(ctx, cancel, ch)
return ch, cancel
}

func (w *Watcher) fetch(ctx context.Context, cancel context.CancelFunc, ch chan<- Event) {
// make sure the context is cancelled always if `fetch` stops to avoid goroutine leak.
defer cancel()

for {
select {
case <-ctx.Done():
// now you can close it without problems
close(ch)
return
case ch <- fetchEvent():
}
}
}

关于go - 如何中断 channel 上的发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65666597/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com