aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Runxi Yu2025-11-22 08:00:00 +0800
committerGravatar Runxi Yu2025-11-22 08:00:00 +0800
commit10987664c3a92e6d7744f7dcfa1214b8e1063234 (patch)
tree9c3bd029d09730e6f24f816e7b8f090d52da3d43
parentflatex: Reformat code (diff)
signatureNo signature
bufpool: Improve perf by using buckets of different size classes
-rw-r--r--internal/bufpool/buffers.go112
-rw-r--r--internal/bufpool/buffers_test.go37
-rw-r--r--internal/flatex/decompress_bytes.go11
-rw-r--r--internal/flatex/decompress_test.go19
-rw-r--r--internal/zlibx/decompress.go16
-rw-r--r--internal/zlibx/decompress_test.go19
-rw-r--r--pack_pack.go2
7 files changed, 182 insertions, 34 deletions
diff --git a/internal/bufpool/buffers.go b/internal/bufpool/buffers.go
index 11ea1d54..24b29fb5 100644
--- a/internal/bufpool/buffers.go
+++ b/internal/bufpool/buffers.go
@@ -22,19 +22,46 @@ const (
// A Buffer's underlying slice may grow as needed. When finished with a
// pooled buffer, the caller should invoke Release() to return it to the pool.
//
-// A zero-value Buffer is not valid for use.
+// Buffers must not be copied after first use; doing so can cause double-returns
+// to the pool and data races. A zero-value Buffer is not valid for use.
+//
+//go:nocopy
type Buffer struct {
- buf []byte
- pooled bool
+ _ noCopy
+ buf []byte
+ pool poolIndex
}
-var bufPool = sync.Pool{
- New: func() any {
- buf := make([]byte, 0, DefaultBufferCap)
- return &buf
- },
+type poolIndex int8
+
+const (
+ unpooled poolIndex = -1
+)
+
+var sizeClasses = [...]int{
+ DefaultBufferCap,
+ 64 << 10,
+ 128 << 10,
+ 256 << 10,
+ 512 << 10,
+ 1 << 20,
+ 2 << 20,
+ 4 << 20,
+ maxPooledBuffer,
}
+var bufferPools = func() []sync.Pool {
+ pools := make([]sync.Pool, len(sizeClasses))
+ for i, classCap := range sizeClasses {
+ capCopy := classCap
+ pools[i].New = func() any {
+ buf := make([]byte, 0, capCopy)
+ return &buf
+ }
+ }
+ return pools
+}()
+
// Borrow retrieves a Buffer suitable for storing up to capHint bytes.
// The returned Buffer may come from an internal sync.Pool.
//
@@ -47,21 +74,24 @@ func Borrow(capHint int) Buffer {
if capHint < DefaultBufferCap {
capHint = DefaultBufferCap
}
- buf := bufPool.Get().(*[]byte)
- if cap(*buf) < capHint {
- bufPool.Put(buf)
+ classIdx, classCap, pooled := classFor(capHint)
+ if !pooled {
newBuf := make([]byte, 0, capHint)
- return Buffer{buf: newBuf, pooled: false}
+ return Buffer{buf: newBuf, pool: unpooled}
+ }
+ buf := bufferPools[classIdx].Get().(*[]byte)
+ if cap(*buf) < classCap {
+ *buf = make([]byte, 0, classCap)
}
slice := (*buf)[:0]
- return Buffer{buf: slice, pooled: true}
+ return Buffer{buf: slice, pool: poolIndex(classIdx)}
}
// FromOwned constructs a Buffer from a caller-owned byte slice. The resulting
// Buffer does not participate in pooling and will never be returned to the
// internal pool when released.
func FromOwned(buf []byte) Buffer {
- return Buffer{buf: buf, pooled: false}
+ return Buffer{buf: buf, pool: unpooled}
}
// Resize adjusts the length of the buffer to n bytes. If n exceeds the current
@@ -107,12 +137,9 @@ func (buf *Buffer) Release() {
if buf.buf == nil {
return
}
- if buf.pooled && cap(buf.buf) <= maxPooledBuffer {
- tmp := buf.buf[:0]
- bufPool.Put(&tmp)
- }
+ buf.returnToPool()
buf.buf = nil
- buf.pooled = false
+ buf.pool = unpooled
}
// ensureCapacity grows the underlying buffer to accommodate the requested
@@ -123,18 +150,45 @@ func (buf *Buffer) ensureCapacity(needed int) {
if cap(buf.buf) >= needed {
return
}
- old := buf.buf
- wasPooled := buf.pooled
- newCap := cap(buf.buf) * 2
- if newCap < needed {
- newCap = needed
+ classIdx, classCap, pooled := classFor(needed)
+ var newBuf []byte
+ if pooled {
+ raw := bufferPools[classIdx].Get().(*[]byte)
+ if cap(*raw) < classCap {
+ *raw = make([]byte, 0, classCap)
+ }
+ newBuf = (*raw)[:len(buf.buf)]
+ } else {
+ newBuf = make([]byte, len(buf.buf), classCap)
}
- newBuf := make([]byte, len(buf.buf), newCap)
copy(newBuf, buf.buf)
+ buf.returnToPool()
buf.buf = newBuf
- buf.pooled = false
- if wasPooled && cap(old) <= maxPooledBuffer {
- tmp := old[:0]
- bufPool.Put(&tmp)
+ if pooled {
+ buf.pool = poolIndex(classIdx)
+ } else {
+ buf.pool = unpooled
}
}
+
+func classFor(size int) (idx int, classCap int, ok bool) {
+ for i, class := range sizeClasses {
+ if size <= class {
+ return i, class, true
+ }
+ }
+ return -1, size, false
+}
+
+func (buf *Buffer) returnToPool() {
+ if buf.pool == unpooled {
+ return
+ }
+ tmp := buf.buf[:0]
+ bufferPools[int(buf.pool)].Put(&tmp)
+}
+
+type noCopy struct{}
+
+func (*noCopy) Lock() {}
+func (*noCopy) Unlock() {}
diff --git a/internal/bufpool/buffers_test.go b/internal/bufpool/buffers_test.go
index 87dbdd73..f5c006da 100644
--- a/internal/bufpool/buffers_test.go
+++ b/internal/bufpool/buffers_test.go
@@ -38,3 +38,40 @@ func TestBorrowBufferRelease(t *testing.T) {
t.Fatal("expected buffer cleared after release")
}
}
+
+func TestBorrowUsesLargerPools(t *testing.T) {
+ const request = DefaultBufferCap * 4
+
+ classIdx, classCap, pooled := classFor(request)
+ if !pooled {
+ t.Fatalf("expected %d to map to a pooled class", request)
+ }
+
+ b := Borrow(request)
+ if b.pool != poolIndex(classIdx) {
+ t.Fatalf("expected pooled buffer in class %d, got %d", classIdx, b.pool)
+ }
+ if cap(b.buf) != classCap {
+ t.Fatalf("expected capacity %d, got %d", classCap, cap(b.buf))
+ }
+ b.Release()
+
+ b2 := Borrow(request)
+ defer b2.Release()
+ if b2.pool != poolIndex(classIdx) {
+ t.Fatalf("expected pooled buffer in class %d on reuse, got %d", classIdx, b2.pool)
+ }
+ if cap(b2.buf) != classCap {
+ t.Fatalf("expected capacity %d on reuse, got %d", classCap, cap(b2.buf))
+ }
+}
+
+func TestGrowingBufferStaysPooled(t *testing.T) {
+ b := Borrow(DefaultBufferCap)
+ defer b.Release()
+
+ b.Append(make([]byte, DefaultBufferCap*3))
+ if b.pool == unpooled {
+ t.Fatal("buffer should stay pooled after growth within limit")
+ }
+}
diff --git a/internal/flatex/decompress_bytes.go b/internal/flatex/decompress_bytes.go
index 5a660b0a..5e29f82d 100644
--- a/internal/flatex/decompress_bytes.go
+++ b/internal/flatex/decompress_bytes.go
@@ -26,13 +26,20 @@ var bufferDecompressorPool = sync.Pool{
// Decompress inflates the provided DEFLATE stream and returns the full output
// in a pooled bufpool.Buffer along with the number of consumed bytes from src.
func Decompress(src []byte) (bufpool.Buffer, int, error) {
- return DecompressDict(src, nil)
+ return DecompressDictSized(src, nil, 0)
}
// DecompressDict inflates the provided DEFLATE stream using dict as the preset
// dictionary and returns the full output in a pooled bufpool.Buffer. The second
// returned value reports how many bytes of src were consumed.
func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, int, error) {
+ return DecompressDictSized(src, dict, 0)
+}
+
+// DecompressDictSized is like DecompressDict but allows providing an expected
+// output size to pre-size the destination buffer and avoid repeated growth.
+// A non-positive sizeHint falls back to the default buffer capacity.
+func DecompressDictSized(src []byte, dict []byte, sizeHint int) (bufpool.Buffer, int, error) {
d := bufferDecompressorPool.Get().(*bufferDecompressor)
defer bufferDecompressorPool.Put(d)
@@ -40,7 +47,7 @@ func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, int, error) {
return bufpool.Buffer{}, 0, err
}
- out := bufpool.Borrow(bufpool.DefaultBufferCap)
+ out := bufpool.Borrow(sizeHint)
out.Resize(0)
for {
diff --git a/internal/flatex/decompress_test.go b/internal/flatex/decompress_test.go
index afb84292..7c290555 100644
--- a/internal/flatex/decompress_test.go
+++ b/internal/flatex/decompress_test.go
@@ -74,3 +74,22 @@ func TestDecompressDictMissing(t *testing.T) {
t.Fatalf("expected error when dictionary missing")
}
}
+
+func TestDecompressDictSizedUsesHint(t *testing.T) {
+ payload := []byte("short")
+ compressed := compressDeflate(t, payload, nil)
+
+ const hint = 1 << 19
+ out, _, err := DecompressDictSized(compressed, nil, hint)
+ if err != nil {
+ t.Fatalf("DecompressDictSized: %v", err)
+ }
+ defer out.Release()
+
+ if !bytes.Equal(out.Bytes(), payload) {
+ t.Fatalf("unexpected payload: got %q", out.Bytes())
+ }
+ if cap(out.Bytes()) < hint {
+ t.Fatalf("expected capacity >= %d, got %d", hint, cap(out.Bytes()))
+ }
+}
diff --git a/internal/zlibx/decompress.go b/internal/zlibx/decompress.go
index 34e62c6f..c6eb65e5 100644
--- a/internal/zlibx/decompress.go
+++ b/internal/zlibx/decompress.go
@@ -12,13 +12,25 @@ import (
// Decompress inflates the provided zlib wrapped stream and returns the
// uncompressed data inside a pooled bufpool.Buffer.
func Decompress(src []byte) (bufpool.Buffer, error) {
- return DecompressDict(src, nil)
+ return DecompressSized(src, 0)
+}
+
+// DecompressSized inflates the provided zlib stream, using sizeHint to
+// preallocate the output buffer when known (e.g. packfile entries).
+func DecompressSized(src []byte, sizeHint int) (bufpool.Buffer, error) {
+ return DecompressDictSized(src, nil, sizeHint)
}
// DecompressDict is like Decompress but accepts a preset dictionary. The
// dictionary must match the checksum embedded in the stream if the dictionary
// flag is present.
func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, error) {
+ return DecompressDictSized(src, dict, 0)
+}
+
+// DecompressDictSized is like DecompressDict but allows providing an expected
+// uncompressed size to avoid buffer growth copies.
+func DecompressDictSized(src []byte, dict []byte, sizeHint int) (bufpool.Buffer, error) {
if len(src) < 6 {
return bufpool.Buffer{}, io.ErrUnexpectedEOF
}
@@ -50,7 +62,7 @@ func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, error) {
}
deflateData := src[offset:]
- out, consumed, err := flatex.DecompressDict(deflateData, dict)
+ out, consumed, err := flatex.DecompressDictSized(deflateData, dict, sizeHint)
if err != nil {
return bufpool.Buffer{}, err
}
diff --git a/internal/zlibx/decompress_test.go b/internal/zlibx/decompress_test.go
index a4e9c608..3dfc07a5 100644
--- a/internal/zlibx/decompress_test.go
+++ b/internal/zlibx/decompress_test.go
@@ -82,3 +82,22 @@ func TestDecompressChecksumError(t *testing.T) {
t.Fatalf("expected ErrChecksum, got %v", err)
}
}
+
+func TestDecompressSizedUsesHint(t *testing.T) {
+ payload := []byte("tiny payload")
+ compressed := compressZlib(t, payload, nil)
+
+ const hint = 1 << 20
+ out, err := DecompressSized(compressed, hint)
+ if err != nil {
+ t.Fatalf("DecompressSized: %v", err)
+ }
+ defer out.Release()
+
+ if !bytes.Equal(out.Bytes(), payload) {
+ t.Fatalf("unexpected payload %q", out.Bytes())
+ }
+ if cap(out.Bytes()) < hint {
+ t.Fatalf("expected capacity >= %d, got %d", hint, cap(out.Bytes()))
+ }
+}
diff --git a/pack_pack.go b/pack_pack.go
index 1cd6b35b..6dcfb5d9 100644
--- a/pack_pack.go
+++ b/pack_pack.go
@@ -104,7 +104,7 @@ func packSectionInflate(pf *packFile, start uint64, sizeHint int) (bufpool.Buffe
if start > uint64(len(pf.data)) {
return bufpool.Buffer{}, ErrInvalidObject
}
- body, err := zlibx.Decompress(pf.data[start:])
+ body, err := zlibx.DecompressSized(pf.data[start:], sizeHint)
if err != nil {
return bufpool.Buffer{}, err
}