diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/bufpool/buffers.go | 189 | ||||
| -rw-r--r-- | internal/bufpool/buffers_test.go | 77 |
2 files changed, 266 insertions, 0 deletions
diff --git a/internal/bufpool/buffers.go b/internal/bufpool/buffers.go new file mode 100644 index 00000000..439e7e04 --- /dev/null +++ b/internal/bufpool/buffers.go @@ -0,0 +1,189 @@ +// Package bufpool provides a lightweight byte-buffer type with optional +// pooling. +package bufpool + +import "sync" + +const ( + // DefaultBufferCap is the minimum capacity a borrowed buffer will have. + // Borrow() will allocate or retrieve a buffer with at least this capacity. + DefaultBufferCap = 32 * 1024 + + // maxPooledBuffer defines the maximum capacity of a buffer that may be + // returned to the pool. Buffers larger than this will not be pooled to + // avoid unbounded memory usage. + maxPooledBuffer = 8 << 20 +) + +// Buffer is a growable byte container that optionally participates in a +// memory pool. A Buffer may be obtained through Borrow() or constructed +// directly from owned data via FromOwned(). +// +// A Buffer's underlying slice may grow as needed. When finished with a +// pooled buffer, the caller should invoke Release() to return it to the pool. +// +// Buffers must not be copied after first use; doing so can cause double-returns +// to the pool and data races. +// +//go:nocopy +type Buffer struct { + _ struct{} // for nocopy + buf []byte + pool poolIndex +} + +type poolIndex int8 + +const ( + unpooled poolIndex = -1 +) + +var sizeClasses = [...]int{ + DefaultBufferCap, + 64 << 10, + 128 << 10, + 256 << 10, + 512 << 10, + 1 << 20, + 2 << 20, + 4 << 20, + maxPooledBuffer, +} + +var bufferPools = func() []sync.Pool { + pools := make([]sync.Pool, len(sizeClasses)) + for i, classCap := range sizeClasses { + capCopy := classCap + pools[i].New = func() any { + buf := make([]byte, 0, capCopy) + return &buf + } + } + return pools +}() + +// Borrow retrieves a Buffer suitable for storing up to capHint bytes. +// The returned Buffer may come from an internal sync.Pool. +// +// If capHint is smaller than DefaultBufferCap, it is automatically raised +// to DefaultBufferCap. If no pooled buffer has sufficient capacity, a new +// unpooled buffer is allocated. +// +// The caller must call Release() when finished using the returned Buffer. +func Borrow(capHint int) Buffer { + if capHint < DefaultBufferCap { + capHint = DefaultBufferCap + } + classIdx, classCap, pooled := classFor(capHint) + if !pooled { + newBuf := make([]byte, 0, capHint) + return Buffer{buf: newBuf, pool: unpooled} + } + buf := bufferPools[classIdx].Get().(*[]byte) + if cap(*buf) < classCap { + *buf = make([]byte, 0, classCap) + } + slice := (*buf)[:0] + return Buffer{buf: slice, pool: poolIndex(classIdx)} +} + +// FromOwned constructs a Buffer from a caller-owned byte slice. The resulting +// Buffer does not participate in pooling and will never be returned to the +// internal pool when released. +func FromOwned(buf []byte) Buffer { + return Buffer{buf: buf, pool: unpooled} +} + +// Resize adjusts the length of the buffer to n bytes. If n exceeds the current +// capacity, the underlying storage is grown. If n is negative, it is treated +// as zero. +// +// The buffer's new contents beyond the previous length are undefined. +func (buf *Buffer) Resize(n int) { + if n < 0 { + n = 0 + } + buf.ensureCapacity(n) + buf.buf = buf.buf[:n] +} + +// Append copies the provided bytes onto the end of the buffer, growing its +// capacity if required. If src is empty, the method does nothing. +// +// The receiver retains ownership of the data; the caller may reuse src freely. +func (buf *Buffer) Append(src []byte) { + if len(src) == 0 { + return + } + start := len(buf.buf) + buf.ensureCapacity(start + len(src)) + buf.buf = buf.buf[:start+len(src)] + copy(buf.buf[start:], src) +} + +// Bytes returns the underlying byte slice that represents the current contents +// of the buffer. Modifying the returned slice modifies the Buffer itself. +func (buf *Buffer) Bytes() []byte { + return buf.buf +} + +// Release returns the buffer to the global pool if it originated from the +// pool and its capacity is no larger than maxPooledBuffer. After release, the +// Buffer becomes invalid and should not be used further. +// +// Releasing a non-pooled buffer has no effect beyond clearing its internal +// storage. +func (buf *Buffer) Release() { + if buf.buf == nil { + return + } + buf.returnToPool() + buf.buf = nil + buf.pool = unpooled +} + +// ensureCapacity grows the underlying buffer to accommodate the requested +// number of bytes. Growth doubles the capacity by default unless a larger +// expansion is needed. If the previous storage was pooled and not oversized, +// it is returned to the pool. +func (buf *Buffer) ensureCapacity(needed int) { + if cap(buf.buf) >= needed { + return + } + classIdx, classCap, pooled := classFor(needed) + var newBuf []byte + if pooled { + raw := bufferPools[classIdx].Get().(*[]byte) + if cap(*raw) < classCap { + *raw = make([]byte, 0, classCap) + } + newBuf = (*raw)[:len(buf.buf)] + } else { + newBuf = make([]byte, len(buf.buf), classCap) + } + copy(newBuf, buf.buf) + buf.returnToPool() + buf.buf = newBuf + if pooled { + buf.pool = poolIndex(classIdx) + } else { + buf.pool = unpooled + } +} + +func classFor(size int) (idx int, classCap int, ok bool) { + for i, class := range sizeClasses { + if size <= class { + return i, class, true + } + } + return -1, size, false +} + +func (buf *Buffer) returnToPool() { + if buf.pool == unpooled { + return + } + tmp := buf.buf[:0] + bufferPools[int(buf.pool)].Put(&tmp) +} diff --git a/internal/bufpool/buffers_test.go b/internal/bufpool/buffers_test.go new file mode 100644 index 00000000..f5c006da --- /dev/null +++ b/internal/bufpool/buffers_test.go @@ -0,0 +1,77 @@ +package bufpool + +import "testing" + +func TestBorrowBufferResizeAndAppend(t *testing.T) { + b := Borrow(1) + defer b.Release() + + if cap(b.buf) < DefaultBufferCap { + t.Fatalf("expected capacity >= %d, got %d", DefaultBufferCap, cap(b.buf)) + } + + b.Append([]byte("alpha")) + b.Append([]byte("beta")) + if got := string(b.Bytes()); got != "alphabeta" { + t.Fatalf("unexpected contents: %q", got) + } + + b.Resize(3) + if got := string(b.Bytes()); got != "alp" { + t.Fatalf("resize shrink mismatch: %q", got) + } + + b.Resize(8) + if len(b.Bytes()) != 8 { + t.Fatalf("expected len 8 after grow, got %d", len(b.Bytes())) + } + if prefix := string(b.Bytes()[:3]); prefix != "alp" { + t.Fatalf("prefix lost after grow: %q", prefix) + } +} + +func TestBorrowBufferRelease(t *testing.T) { + b := Borrow(DefaultBufferCap / 2) + b.Append([]byte("data")) + b.Release() + if b.buf != nil { + t.Fatal("expected buffer cleared after release") + } +} + +func TestBorrowUsesLargerPools(t *testing.T) { + const request = DefaultBufferCap * 4 + + classIdx, classCap, pooled := classFor(request) + if !pooled { + t.Fatalf("expected %d to map to a pooled class", request) + } + + b := Borrow(request) + if b.pool != poolIndex(classIdx) { + t.Fatalf("expected pooled buffer in class %d, got %d", classIdx, b.pool) + } + if cap(b.buf) != classCap { + t.Fatalf("expected capacity %d, got %d", classCap, cap(b.buf)) + } + b.Release() + + b2 := Borrow(request) + defer b2.Release() + if b2.pool != poolIndex(classIdx) { + t.Fatalf("expected pooled buffer in class %d on reuse, got %d", classIdx, b2.pool) + } + if cap(b2.buf) != classCap { + t.Fatalf("expected capacity %d on reuse, got %d", classCap, cap(b2.buf)) + } +} + +func TestGrowingBufferStaysPooled(t *testing.T) { + b := Borrow(DefaultBufferCap) + defer b.Release() + + b.Append(make([]byte, DefaultBufferCap*3)) + if b.pool == unpooled { + t.Fatal("buffer should stay pooled after growth within limit") + } +} |
