diff options
| author | 2025-11-22 08:00:00 +0800 | |
|---|---|---|
| committer | 2025-11-22 08:00:00 +0800 | |
| commit | 10987664c3a92e6d7744f7dcfa1214b8e1063234 (patch) | |
| tree | 9c3bd029d09730e6f24f816e7b8f090d52da3d43 /internal | |
| parent | flatex: Reformat code (diff) | |
| signature | No signature | |
bufpool: Improve perf by using buckets of different size classes
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/bufpool/buffers.go | 112 | ||||
| -rw-r--r-- | internal/bufpool/buffers_test.go | 37 | ||||
| -rw-r--r-- | internal/flatex/decompress_bytes.go | 11 | ||||
| -rw-r--r-- | internal/flatex/decompress_test.go | 19 | ||||
| -rw-r--r-- | internal/zlibx/decompress.go | 16 | ||||
| -rw-r--r-- | internal/zlibx/decompress_test.go | 19 |
6 files changed, 181 insertions, 33 deletions
diff --git a/internal/bufpool/buffers.go b/internal/bufpool/buffers.go index 11ea1d54..24b29fb5 100644 --- a/internal/bufpool/buffers.go +++ b/internal/bufpool/buffers.go @@ -22,19 +22,46 @@ const ( // A Buffer's underlying slice may grow as needed. When finished with a // pooled buffer, the caller should invoke Release() to return it to the pool. // -// A zero-value Buffer is not valid for use. +// Buffers must not be copied after first use; doing so can cause double-returns +// to the pool and data races. A zero-value Buffer is not valid for use. +// +//go:nocopy type Buffer struct { - buf []byte - pooled bool + _ noCopy + buf []byte + pool poolIndex } -var bufPool = sync.Pool{ - New: func() any { - buf := make([]byte, 0, DefaultBufferCap) - return &buf - }, +type poolIndex int8 + +const ( + unpooled poolIndex = -1 +) + +var sizeClasses = [...]int{ + DefaultBufferCap, + 64 << 10, + 128 << 10, + 256 << 10, + 512 << 10, + 1 << 20, + 2 << 20, + 4 << 20, + maxPooledBuffer, } +var bufferPools = func() []sync.Pool { + pools := make([]sync.Pool, len(sizeClasses)) + for i, classCap := range sizeClasses { + capCopy := classCap + pools[i].New = func() any { + buf := make([]byte, 0, capCopy) + return &buf + } + } + return pools +}() + // Borrow retrieves a Buffer suitable for storing up to capHint bytes. // The returned Buffer may come from an internal sync.Pool. // @@ -47,21 +74,24 @@ func Borrow(capHint int) Buffer { if capHint < DefaultBufferCap { capHint = DefaultBufferCap } - buf := bufPool.Get().(*[]byte) - if cap(*buf) < capHint { - bufPool.Put(buf) + classIdx, classCap, pooled := classFor(capHint) + if !pooled { newBuf := make([]byte, 0, capHint) - return Buffer{buf: newBuf, pooled: false} + return Buffer{buf: newBuf, pool: unpooled} + } + buf := bufferPools[classIdx].Get().(*[]byte) + if cap(*buf) < classCap { + *buf = make([]byte, 0, classCap) } slice := (*buf)[:0] - return Buffer{buf: slice, pooled: true} + return Buffer{buf: slice, pool: poolIndex(classIdx)} } // FromOwned constructs a Buffer from a caller-owned byte slice. The resulting // Buffer does not participate in pooling and will never be returned to the // internal pool when released. func FromOwned(buf []byte) Buffer { - return Buffer{buf: buf, pooled: false} + return Buffer{buf: buf, pool: unpooled} } // Resize adjusts the length of the buffer to n bytes. If n exceeds the current @@ -107,12 +137,9 @@ func (buf *Buffer) Release() { if buf.buf == nil { return } - if buf.pooled && cap(buf.buf) <= maxPooledBuffer { - tmp := buf.buf[:0] - bufPool.Put(&tmp) - } + buf.returnToPool() buf.buf = nil - buf.pooled = false + buf.pool = unpooled } // ensureCapacity grows the underlying buffer to accommodate the requested @@ -123,18 +150,45 @@ func (buf *Buffer) ensureCapacity(needed int) { if cap(buf.buf) >= needed { return } - old := buf.buf - wasPooled := buf.pooled - newCap := cap(buf.buf) * 2 - if newCap < needed { - newCap = needed + classIdx, classCap, pooled := classFor(needed) + var newBuf []byte + if pooled { + raw := bufferPools[classIdx].Get().(*[]byte) + if cap(*raw) < classCap { + *raw = make([]byte, 0, classCap) + } + newBuf = (*raw)[:len(buf.buf)] + } else { + newBuf = make([]byte, len(buf.buf), classCap) } - newBuf := make([]byte, len(buf.buf), newCap) copy(newBuf, buf.buf) + buf.returnToPool() buf.buf = newBuf - buf.pooled = false - if wasPooled && cap(old) <= maxPooledBuffer { - tmp := old[:0] - bufPool.Put(&tmp) + if pooled { + buf.pool = poolIndex(classIdx) + } else { + buf.pool = unpooled } } + +func classFor(size int) (idx int, classCap int, ok bool) { + for i, class := range sizeClasses { + if size <= class { + return i, class, true + } + } + return -1, size, false +} + +func (buf *Buffer) returnToPool() { + if buf.pool == unpooled { + return + } + tmp := buf.buf[:0] + bufferPools[int(buf.pool)].Put(&tmp) +} + +type noCopy struct{} + +func (*noCopy) Lock() {} +func (*noCopy) Unlock() {} diff --git a/internal/bufpool/buffers_test.go b/internal/bufpool/buffers_test.go index 87dbdd73..f5c006da 100644 --- a/internal/bufpool/buffers_test.go +++ b/internal/bufpool/buffers_test.go @@ -38,3 +38,40 @@ func TestBorrowBufferRelease(t *testing.T) { t.Fatal("expected buffer cleared after release") } } + +func TestBorrowUsesLargerPools(t *testing.T) { + const request = DefaultBufferCap * 4 + + classIdx, classCap, pooled := classFor(request) + if !pooled { + t.Fatalf("expected %d to map to a pooled class", request) + } + + b := Borrow(request) + if b.pool != poolIndex(classIdx) { + t.Fatalf("expected pooled buffer in class %d, got %d", classIdx, b.pool) + } + if cap(b.buf) != classCap { + t.Fatalf("expected capacity %d, got %d", classCap, cap(b.buf)) + } + b.Release() + + b2 := Borrow(request) + defer b2.Release() + if b2.pool != poolIndex(classIdx) { + t.Fatalf("expected pooled buffer in class %d on reuse, got %d", classIdx, b2.pool) + } + if cap(b2.buf) != classCap { + t.Fatalf("expected capacity %d on reuse, got %d", classCap, cap(b2.buf)) + } +} + +func TestGrowingBufferStaysPooled(t *testing.T) { + b := Borrow(DefaultBufferCap) + defer b.Release() + + b.Append(make([]byte, DefaultBufferCap*3)) + if b.pool == unpooled { + t.Fatal("buffer should stay pooled after growth within limit") + } +} diff --git a/internal/flatex/decompress_bytes.go b/internal/flatex/decompress_bytes.go index 5a660b0a..5e29f82d 100644 --- a/internal/flatex/decompress_bytes.go +++ b/internal/flatex/decompress_bytes.go @@ -26,13 +26,20 @@ var bufferDecompressorPool = sync.Pool{ // Decompress inflates the provided DEFLATE stream and returns the full output // in a pooled bufpool.Buffer along with the number of consumed bytes from src. func Decompress(src []byte) (bufpool.Buffer, int, error) { - return DecompressDict(src, nil) + return DecompressDictSized(src, nil, 0) } // DecompressDict inflates the provided DEFLATE stream using dict as the preset // dictionary and returns the full output in a pooled bufpool.Buffer. The second // returned value reports how many bytes of src were consumed. func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, int, error) { + return DecompressDictSized(src, dict, 0) +} + +// DecompressDictSized is like DecompressDict but allows providing an expected +// output size to pre-size the destination buffer and avoid repeated growth. +// A non-positive sizeHint falls back to the default buffer capacity. +func DecompressDictSized(src []byte, dict []byte, sizeHint int) (bufpool.Buffer, int, error) { d := bufferDecompressorPool.Get().(*bufferDecompressor) defer bufferDecompressorPool.Put(d) @@ -40,7 +47,7 @@ func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, int, error) { return bufpool.Buffer{}, 0, err } - out := bufpool.Borrow(bufpool.DefaultBufferCap) + out := bufpool.Borrow(sizeHint) out.Resize(0) for { diff --git a/internal/flatex/decompress_test.go b/internal/flatex/decompress_test.go index afb84292..7c290555 100644 --- a/internal/flatex/decompress_test.go +++ b/internal/flatex/decompress_test.go @@ -74,3 +74,22 @@ func TestDecompressDictMissing(t *testing.T) { t.Fatalf("expected error when dictionary missing") } } + +func TestDecompressDictSizedUsesHint(t *testing.T) { + payload := []byte("short") + compressed := compressDeflate(t, payload, nil) + + const hint = 1 << 19 + out, _, err := DecompressDictSized(compressed, nil, hint) + if err != nil { + t.Fatalf("DecompressDictSized: %v", err) + } + defer out.Release() + + if !bytes.Equal(out.Bytes(), payload) { + t.Fatalf("unexpected payload: got %q", out.Bytes()) + } + if cap(out.Bytes()) < hint { + t.Fatalf("expected capacity >= %d, got %d", hint, cap(out.Bytes())) + } +} diff --git a/internal/zlibx/decompress.go b/internal/zlibx/decompress.go index 34e62c6f..c6eb65e5 100644 --- a/internal/zlibx/decompress.go +++ b/internal/zlibx/decompress.go @@ -12,13 +12,25 @@ import ( // Decompress inflates the provided zlib wrapped stream and returns the // uncompressed data inside a pooled bufpool.Buffer. func Decompress(src []byte) (bufpool.Buffer, error) { - return DecompressDict(src, nil) + return DecompressSized(src, 0) +} + +// DecompressSized inflates the provided zlib stream, using sizeHint to +// preallocate the output buffer when known (e.g. packfile entries). +func DecompressSized(src []byte, sizeHint int) (bufpool.Buffer, error) { + return DecompressDictSized(src, nil, sizeHint) } // DecompressDict is like Decompress but accepts a preset dictionary. The // dictionary must match the checksum embedded in the stream if the dictionary // flag is present. func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, error) { + return DecompressDictSized(src, dict, 0) +} + +// DecompressDictSized is like DecompressDict but allows providing an expected +// uncompressed size to avoid buffer growth copies. +func DecompressDictSized(src []byte, dict []byte, sizeHint int) (bufpool.Buffer, error) { if len(src) < 6 { return bufpool.Buffer{}, io.ErrUnexpectedEOF } @@ -50,7 +62,7 @@ func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, error) { } deflateData := src[offset:] - out, consumed, err := flatex.DecompressDict(deflateData, dict) + out, consumed, err := flatex.DecompressDictSized(deflateData, dict, sizeHint) if err != nil { return bufpool.Buffer{}, err } diff --git a/internal/zlibx/decompress_test.go b/internal/zlibx/decompress_test.go index a4e9c608..3dfc07a5 100644 --- a/internal/zlibx/decompress_test.go +++ b/internal/zlibx/decompress_test.go @@ -82,3 +82,22 @@ func TestDecompressChecksumError(t *testing.T) { t.Fatalf("expected ErrChecksum, got %v", err) } } + +func TestDecompressSizedUsesHint(t *testing.T) { + payload := []byte("tiny payload") + compressed := compressZlib(t, payload, nil) + + const hint = 1 << 20 + out, err := DecompressSized(compressed, hint) + if err != nil { + t.Fatalf("DecompressSized: %v", err) + } + defer out.Release() + + if !bytes.Equal(out.Bytes(), payload) { + t.Fatalf("unexpected payload %q", out.Bytes()) + } + if cap(out.Bytes()) < hint { + t.Fatalf("expected capacity >= %d, got %d", hint, cap(out.Bytes())) + } +} |
