gpt4 book ai didi

go - 在生产者速度慢,消费者速度快的情况下,如何处理 channel 关闭同步?

转载 作者:数据小太阳 更新时间:2023-10-29 03:36:42 25 4
gpt4 key购买 nike

关闭。这个问题是opinion-based .它目前不接受答案。












想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题.

3年前关闭。




Improve this question




我是新手,找不到这个问题的答案。我正在做的是在生产者中读取 CSV 文件,做一些可能需要时间的事情,然后通过 channel 将输出发送给消费者。有一连串生产者-消费者 s,并且任何生产者最终都可能比它的消费者慢。

producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)



这里最多可以有 15 个消费者。

现在我面临的问题是,如果生产者完成了,我们如何在消费者端决定,我们可以停止处理。

我需要实现的是:
  • 一旦生产者完成,所有消费者最终应该做一些清理并在完成剩余的
  • 后退出。
  • 如果消费者在特定的超时时间内没有获得任何数据,它可以退出(最好是有信号)而不会进一步阻塞。
  • 它发生在整个序列中的所有生产者-消费者对中。

  • 我使用了以下方法。
  • 为每个数据 channel 保留一个信号 channel ,并为其下一个消费者的每个 goroutine 发布一个“完成”。
  • 读完之后,每个消费者应该只读取 channel 中剩余的缓冲数据,然后在下一个信号 channel 上放置,比如 5 “done”。确保每个 goroutine 只有 5 个,而不是 5 个(使用 https://golang.org/pkg/sync/#Once.Do )。
  • 下面是我能想到的。
    processRemaining = false
    for processRemaining == false{
    select {
    case stuff, ok := <-input_messages:
    do_stuff(stuff)
    if ok == false { // if channel has been closed
    processRemaining = true
    }
    if result != nil {
    //send to channel output_messages
    }
    case sig := <-input_signals: // if signaled to stopped.
    fmt.Println("received signal", sig)
    processRemaining = true
    default:
    fmt.Println("no activity")
    }
    }
    if processRemaining {
    for stuff := range input_messages {
    do_stuff(stuff)
    if result != nil {
    //send to channel output_messages
    }
    }
    // send "output_routine" number of "done" to a channel "output_signals".
    }

  • 但即使在这种方法中,我也无法想出任何与关闭的“input_messages” channel 相同的方式,如果没有可用的时间,比如 10 秒。

    我忽略了这种方法有什么问题吗?解决此问题的可能方法(或并发模式)是什么?确保:
  • 一旦第一个“chan0”关闭,所有后续 channel 都会关闭。
  • 所有生产者在关闭它们的输出 channel 之前都会更新,并且只有在它们都完成写入后才会关闭 channel 。
  • 如果消费者在指定的超时时间内未从 channel 获取数据,则应将其视为已关闭,并自行解除阻塞。
  • 最佳答案

    使用 sync.WaitGroup跟踪正在运行的 goroutine 的数量。每个 goroutine 在不再从 channel 获取数据后退出。曾经WaitGroup完成后,清理工作就可以完成了。

    像这样的东西:

    import (
    "sync"
    "time"
    )

    type Data interface{} // just an example

    type Consumer interface {
    Consume(Data) Data
    CleanUp()
    Count() int
    Timeout() time.Duration
    }

    func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
    wg := sync.WaitGroup{}
    for i := 0; i < consumer.Count(); i++ {
    wg.Add(1)
    go func() {
    consumeLoop:
    for {
    select {
    case v, ok := <-inCh: // 'ok' says if the channel is still open
    if !ok {
    break consumeLoop
    }
    outCh <- consumer.Consume(v)
    case <-time.After(consumer.Timeout()):
    break consumeLoop
    }
    }

    wg.Done()
    }()
    }
    wg.Wait()

    consumer.CleanUp()
    close(outCh)
    }

    在管道的每个阶段,您都可以使用与上述类似的过程来启动消费者。

    关于go - 在生产者速度慢,消费者速度快的情况下,如何处理 channel 关闭同步?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52127485/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com