gpt4 book ai didi

具有并发读者的 Golang 缓冲区

转载 作者:IT王子 更新时间:2023-10-29 01:26:30 26 4
gpt4 key购买 nike

我想在 Go 中构建一个支持多个并发读取器和一个写入器的缓冲区。所有写入缓冲区的内容都应由所有读者读取。允许新读者随时加入,这意味着已经写入的数据必须能够为迟到的读者回放。

缓冲区应满足以下接口(interface):

type MyBuffer interface {
Write(p []byte) (n int, err error)
NextReader() io.Reader
}

对于最好使用内置类型的此类实现,您有什么建议吗?

最佳答案

根据作者的性质以及您的使用方式,将所有内容保存在内存中(以便能够为以后加入的读者重新播放所有内容)风险很大,可能需要大量内存,或者导致您的应用由于内存不足而崩溃。

将它用于“低流量”记录器,将所有内容保存在内存中可能没问题,但例如流式传输一些音频或视频很可能不行。

如果下面的读取器实现读取了所有写入缓冲区的数据,它们的 Read() 方法将正确地报告 io.EOF。必须小心,因为某些构造(例如 bufio.Scanner)在遇到 io.EOF 时可能不会读取更多数据(但这不是我们实现的缺陷)。

如果您希望我们的缓冲区的读者在缓冲区中没有更多可用数据时等待,等待新数据写入而不是返回 io.EOF,您可以包装返回的读者在此处显示的“尾部阅读器”中:Go: "tail -f"-like generator .

“内存安全”文件实现

这是一个极其简单而优雅的解决方案。它使用文件写入,也使用文件读取。同步基本上由操作系统提供。这不会有内存不足错误的风险,因为数据仅存储在磁盘上。根据您的作者的性质,这可能足够也可能不够。

我宁愿使用以下接口(interface),因为 Close() 对于文件来说很重要。

type MyBuf interface {
io.WriteCloser
NewReader() (io.ReadCloser, error)
}

实现起来非常简单:

type mybuf struct {
*os.File
}

func (mb *mybuf) NewReader() (io.ReadCloser, error) {
f, err := os.Open(mb.Name())
if err != nil {
return nil, err
}
return f, nil
}

func NewMyBuf(name string) (MyBuf, error) {
f, err := os.Create(name)
if err != nil {
return nil, err
}
return &mybuf{File: f}, nil
}

我们的 mybuf 类型嵌入 *os.File , 所以我们得到了“免费”的 Write()Close() 方法。

NewReader() 只是打开现有的支持文件进行读取(以只读模式)并返回它,再次利用它实现 io.ReadCloser.

创建一个新的 MyBuf 值是在 NewMyBuf() 函数中实现的,如果创建文件失败,该函数也可能返回一个错误

注意事项:

请注意,由于 mybuf 嵌入了 *os.File,因此可以使用 type assertion “访问”os.File 的其他导出方法,即使它们不是 MyBuf 接口(interface)的一部分。我不认为这是一个缺陷,但如果你想禁止它,你必须改变 mybuf 的实现,使其不嵌入 os.File,而是将它作为一个命名字段(但是你必须自己添加 Write()Close() 方法,正确转发到 os.File 字段) .

内存中实现

如果文件实现不够,这里有内存实现。

由于我们现在只在内存中,我们将使用以下接口(interface):

type MyBuf interface {
io.Writer
NewReader() io.Reader
}

我们的想法是存储所有传递给我们缓冲区的 byte slice 。读取器将在调用 Read() 时提供存储的 slice ,每个读取器将跟踪其 Read() 方法提供了多少存储的 slice 。必须处理同步,我们将使用一个简单的 sync.RWMutex .

事不宜迟,下面是实现:

type mybuf struct {
data [][]byte
sync.RWMutex
}

func (mb *mybuf) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
// Cannot retain p, so we must copy it:
p2 := make([]byte, len(p))
copy(p2, p)
mb.Lock()
mb.data = append(mb.data, p2)
mb.Unlock()
return len(p), nil
}

type mybufReader struct {
mb *mybuf // buffer we read from
i int // next slice index
data []byte // current data slice to serve
}

func (mbr *mybufReader) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
// Do we have data to send?
if len(mbr.data) == 0 {
mb := mbr.mb
mb.RLock()
if mbr.i < len(mb.data) {
mbr.data = mb.data[mbr.i]
mbr.i++
}
mb.RUnlock()
}
if len(mbr.data) == 0 {
return 0, io.EOF
}

n = copy(p, mbr.data)
mbr.data = mbr.data[n:]
return n, nil
}

func (mb *mybuf) NewReader() io.Reader {
return &mybufReader{mb: mb}
}

func NewMyBuf() MyBuf {
return &mybuf{}
}

请注意,Writer.Write() 的一般契约包括实现不得保留传递的 slice ,因此我们必须在“存储”之前复制它。

另请注意,读取器的 Read() 会尝试锁定最短的时间。也就是说,它只在我们需要来自缓冲区的新数据片时锁定,并且只进行读锁定,这意味着如果读取器有部分数据片,将在 Read() 中发送它而不锁定和触摸缓冲区。

关于具有并发读者的 Golang 缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44310982/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com