具有并发读者的 Golang 缓冲区

Golang buffer with concurrent readers

我想在 Go 中构建一个支持多个并发读取器和一个写入器的缓冲区。所有写入缓冲区的内容都应由所有读者读取。允许新读者随时加入,这意味着已经写入的数据必须能够为迟到的读者回放。

缓冲区应满足以下接口:

type MyBuffer interface {
    Write(p []byte) (n int, err error)
    NextReader() io.Reader
}

对于最好使用内置类型的此类实现,您有什么建议吗?

我链接到 append only commit log,因为它看起来与您的要求非常相似。我对分布式系统和提交日志还很陌生,所以我可能会混淆一些概念,但是 kafka 的介绍用漂亮的图表清楚地解释了一切。

Go 对我来说也是一个新手,所以我相信有更好的方法:

但也许你可以将你的缓冲区建模为一个切片,我认为有几种情况:

  • 缓冲区没有readers,新数据写入缓冲区,缓冲区长度增长
  • 缓冲区有 one/many reader(s):

    • reader 订阅缓冲区
    • 缓冲区创建并 returns 到该客户端的通道
    • buffer 维护一个客户端通道列表
    • 发生写入 -> 遍历所有客户端通道并发布到它(pub sub)

这解决了一个 pubsub 实时消费者流,消息在其中散开,但没有解决回填问题。

Kafka 启用回填及其 intro illustrates 如何完成:)

This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now".

This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers.

根据作者的性质以及您的使用方式,将所有内容保存在内存中(以便能够为 reader 稍后加入重新播放所有内容)是非常冒险的,可能需要很多内存,或导致您的应用程序因内存不足而崩溃。

将它用于 "low-traffic" 记录器将所有内容保存在内存中可能没问题,但例如流式传输某些音频或视频很可能不行。

如果下面的 reader 实现读取了写入缓冲区的所有数据,它们的 Read() 方法将正确报告 io.EOF。必须小心,因为某些构造(例如 bufio.Scanner)一旦遇到 io.EOF 可能不会读取更多数据(但这不是我们实现的缺陷)。

如果您希望缓冲区中的 readers 在缓冲区中没有更多可用数据时等待,则等待新数据写入而不是 returning io.EOF ,您可以将 returned reader 包装在此处显示的 "tail reader" 中:.

"Memory-safe" 文件实现

这是一个极其简单而优雅的解决方案。它使用文件写入,也使用文件读取。同步基本上由操作系统提供。这不会有内存不足错误的风险,因为数据仅存储在磁盘上。根据您的作者的性质,这可能足够也可能不够。

我宁愿使用下面的界面,因为Close()对于文件来说很重要。

type MyBuf interface {
    io.WriteCloser
    NewReader() (io.ReadCloser, error)
}

实现起来非常简单:

type mybuf struct {
    *os.File
}

func (mb *mybuf) NewReader() (io.ReadCloser, error) {
    f, err := os.Open(mb.Name())
    if err != nil {
        return nil, err
    }
    return f, nil
}

func NewMyBuf(name string) (MyBuf, error) {
    f, err := os.Create(name)
    if err != nil {
        return nil, err
    }
    return &mybuf{File: f}, nil
}

我们的 mybuf 类型嵌入 *os.File,所以我们得到了 "free".

Write()Close() 方法

NewReader() 只是打开现有的支持文件进行读取(在只读模式下)并 returns 它,再次利用它实现 io.ReadCloser

创建新的 MyBuf 值正在 NewMyBuf() 函数中实现,如果创建文件失败,该函数也可能 return 和 error

备注:

请注意,由于 mybuf 嵌入了 *os.File,因此可以使用 type assertion 到 "reach" os.File 的其他导出方法,即使它们不是MyBuf 界面的一部分。我不认为这是一个缺陷,但是如果你想禁止这个,你必须改变 mybuf 的实现来不嵌入 os.File 而是将它作为一个命名字段(但是你必须自己添加 Write()Close() 方法,正确转发到 os.File 字段。

内存中实现

如果文件实现不够,这里有内存实现。

由于我们现在只在内存中,我们将使用以下接口:

type MyBuf interface {
    io.Writer
    NewReader() io.Reader
}

我们的想法是存储所有传递给我们缓冲区的字节片。读者将在调用 Read() 时提供存储的切片,每个 reader 将跟踪其 Read() 方法提供了多少存储的切片。必须要处理同步,我们就用简单的sync.RWMutex.

事不宜迟,下面是实现:

type mybuf struct {
    data [][]byte
    sync.RWMutex
}

func (mb *mybuf) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Cannot retain p, so we must copy it:
    p2 := make([]byte, len(p))
    copy(p2, p)
    mb.Lock()
    mb.data = append(mb.data, p2)
    mb.Unlock()
    return len(p), nil
}

type mybufReader struct {
    mb   *mybuf // buffer we read from
    i    int    // next slice index
    data []byte // current data slice to serve
}

func (mbr *mybufReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Do we have data to send?
    if len(mbr.data) == 0 {
        mb := mbr.mb
        mb.RLock()
        if mbr.i < len(mb.data) {
            mbr.data = mb.data[mbr.i]
            mbr.i++
        }
        mb.RUnlock()
    }
    if len(mbr.data) == 0 {
        return 0, io.EOF
    }

    n = copy(p, mbr.data)
    mbr.data = mbr.data[n:]
    return n, nil
}

func (mb *mybuf) NewReader() io.Reader {
    return &mybufReader{mb: mb}
}

func NewMyBuf() MyBuf {
    return &mybuf{}
}

注意 Writer.Write() 的一般契约包括一个实现不能保留传递的切片,所以我们必须在 "storing" 它之前复制它。

另请注意,reader 中的 Read() 尝试锁定最短时间。也就是说,它只在我们需要来自缓冲区的新数据片时锁定,并且只进行读锁定,这意味着如果 reader 有部分数据片,将在 Read() 中发送它而不锁定和接触缓冲区。

作为实验的一部分,我不得不做一些类似的事情,所以分享:

type MultiReaderBuffer struct {
    mu  sync.RWMutex
    buf []byte
}

func (b *MultiReaderBuffer) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    b.mu.Lock()
    b.buf = append(b.buf, p...)
    b.mu.Unlock()
    return len(p), nil
}

func (b *MultiReaderBuffer) NewReader() io.Reader {
    return &mrbReader{mrb: b}
}

type mrbReader struct {
    mrb *MultiReaderBuffer
    off int
}

func (r *mrbReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    r.mrb.mu.RLock()
    n = copy(p, r.mrb.buf[r.off:])
    r.mrb.mu.RUnlock()
    if n == 0 {
        return 0, io.EOF
    }
    r.off += n
    return n, nil
}