diff options
| -rw-r--r-- | buffers.go | 92 | ||||
| -rw-r--r-- | internal/bufpool/buffers.go | 140 | ||||
| -rw-r--r-- | internal/bufpool/buffers_test.go (renamed from buffers_test.go) | 14 | ||||
| -rw-r--r-- | pack_pack.go | 92 | ||||
| -rw-r--r-- | pack_test.go | 10 |
5 files changed, 200 insertions, 148 deletions
diff --git a/buffers.go b/buffers.go deleted file mode 100644 index ac84b837..00000000 --- a/buffers.go +++ /dev/null @@ -1,92 +0,0 @@ -package furgit - -import "sync" - -const ( - defaultBodyCap = 32 * 1024 - maxPooledBody = 8 << 20 -) - -type borrowedBody struct { - buf []byte - pooled bool -} - -var bodyPool = sync.Pool{ - New: func() any { - buf := make([]byte, 0, defaultBodyCap) - return &buf - }, -} - -func borrowBody(capHint int) borrowedBody { - if capHint < defaultBodyCap { - capHint = defaultBodyCap - } - buf := bodyPool.Get().(*[]byte) - if cap(*buf) < capHint { - bodyPool.Put(buf) - newBuf := make([]byte, 0, capHint) - return borrowedBody{buf: newBuf, pooled: false} - } - slice := (*buf)[:0] - return borrowedBody{buf: slice, pooled: true} -} - -func borrowedFromOwned(buf []byte) borrowedBody { - return borrowedBody{buf: buf} -} - -func (body *borrowedBody) Resize(n int) { - if n < 0 { - n = 0 - } - body.ensureCapacity(n) - body.buf = body.buf[:n] -} - -func (body *borrowedBody) Append(src []byte) { - if len(src) == 0 { - return - } - start := len(body.buf) - body.ensureCapacity(start + len(src)) - body.buf = body.buf[:start+len(src)] - copy(body.buf[start:], src) -} - -func (body *borrowedBody) Bytes() []byte { - return body.buf -} - -func (body *borrowedBody) Release() { - if body.buf == nil { - return - } - if body.pooled && cap(body.buf) <= maxPooledBody { - tmp := body.buf[:0] - bodyPool.Put(&tmp) - } - body.buf = nil - body.pooled = false -} - -func (body *borrowedBody) ensureCapacity(needed int) { - if cap(body.buf) >= needed { - return - } - old := body.buf - wasPooled := body.pooled - newCap := cap(body.buf) * 2 - if newCap < needed { - newCap = needed - } - newBuf := make([]byte, len(body.buf), newCap) - copy(newBuf, body.buf) - body.buf = newBuf - body.pooled = false - if wasPooled && cap(old) <= maxPooledBody { - tmp := old[:0] - bodyPool.Put(&tmp) - } -} diff --git a/internal/bufpool/buffers.go b/internal/bufpool/buffers.go new file mode 100644 index 00000000..11ea1d54 --- /dev/null +++ b/internal/bufpool/buffers.go @@ -0,0 +1,140 @@ +// Package bufpool provides a lightweight byte-buffer type with optional +// pooling. +package bufpool + +import "sync" + +const ( + // DefaultBufferCap is the minimum capacity a borrowed buffer will have. + // Borrow() will allocate or retrieve a buffer with at least this capacity. + DefaultBufferCap = 32 * 1024 + + // maxPooledBuffer defines the maximum capacity of a buffer that may be + // returned to the pool. Buffers larger than this will not be pooled to + // avoid unbounded memory usage. + maxPooledBuffer = 8 << 20 +) + +// Buffer is a growable byte container that optionally participates in a +// memory pool. A Buffer may be obtained through Borrow() or constructed +// directly from owned data via FromOwned(). +// +// A Buffer's underlying slice may grow as needed. When finished with a +// pooled buffer, the caller should invoke Release() to return it to the pool. +// +// A zero-value Buffer is not valid for use. +type Buffer struct { + buf []byte + pooled bool +} + +var bufPool = sync.Pool{ + New: func() any { + buf := make([]byte, 0, DefaultBufferCap) + return &buf + }, +} + +// Borrow retrieves a Buffer suitable for storing up to capHint bytes. +// The returned Buffer may come from an internal sync.Pool. +// +// If capHint is smaller than DefaultBufferCap, it is automatically raised +// to DefaultBufferCap. If no pooled buffer has sufficient capacity, a new +// unpooled buffer is allocated. +// +// The caller must call Release() when finished using the returned Buffer. +func Borrow(capHint int) Buffer { + if capHint < DefaultBufferCap { + capHint = DefaultBufferCap + } + buf := bufPool.Get().(*[]byte) + if cap(*buf) < capHint { + bufPool.Put(buf) + newBuf := make([]byte, 0, capHint) + return Buffer{buf: newBuf, pooled: false} + } + slice := (*buf)[:0] + return Buffer{buf: slice, pooled: true} +} + +// FromOwned constructs a Buffer from a caller-owned byte slice. The resulting +// Buffer does not participate in pooling and will never be returned to the +// internal pool when released. +func FromOwned(buf []byte) Buffer { + return Buffer{buf: buf, pooled: false} +} + +// Resize adjusts the length of the buffer to n bytes. If n exceeds the current +// capacity, the underlying storage is grown. If n is negative, it is treated +// as zero. +// +// The buffer's new contents beyond the previous length are undefined. +func (buf *Buffer) Resize(n int) { + if n < 0 { + n = 0 + } + buf.ensureCapacity(n) + buf.buf = buf.buf[:n] +} + +// Append copies the provided bytes onto the end of the buffer, growing its +// capacity if required. If src is empty, the method does nothing. +// +// The receiver retains ownership of the data; the caller may reuse src freely. +func (buf *Buffer) Append(src []byte) { + if len(src) == 0 { + return + } + start := len(buf.buf) + buf.ensureCapacity(start + len(src)) + buf.buf = buf.buf[:start+len(src)] + copy(buf.buf[start:], src) +} + +// Bytes returns the underlying byte slice that represents the current contents +// of the buffer. Modifying the returned slice modifies the Buffer itself. +func (buf *Buffer) Bytes() []byte { + return buf.buf +} + +// Release returns the buffer to the global pool if it originated from the +// pool and its capacity is no larger than maxPooledBuffer. After release, the +// Buffer becomes invalid and should not be used further. +// +// Releasing a non-pooled buffer has no effect beyond clearing its internal +// storage. +func (buf *Buffer) Release() { + if buf.buf == nil { + return + } + if buf.pooled && cap(buf.buf) <= maxPooledBuffer { + tmp := buf.buf[:0] + bufPool.Put(&tmp) + } + buf.buf = nil + buf.pooled = false +} + +// ensureCapacity grows the underlying buffer to accommodate the requested +// number of bytes. Growth doubles the capacity by default unless a larger +// expansion is needed. If the previous storage was pooled and not oversized, +// it is returned to the pool. +func (buf *Buffer) ensureCapacity(needed int) { + if cap(buf.buf) >= needed { + return + } + old := buf.buf + wasPooled := buf.pooled + newCap := cap(buf.buf) * 2 + if newCap < needed { + newCap = needed + } + newBuf := make([]byte, len(buf.buf), newCap) + copy(newBuf, buf.buf) + buf.buf = newBuf + buf.pooled = false + if wasPooled && cap(old) <= maxPooledBuffer { + tmp := old[:0] + bufPool.Put(&tmp) + } +} diff --git a/buffers_test.go b/internal/bufpool/buffers_test.go index aae431e5..87dbdd73 100644 --- a/buffers_test.go +++ b/internal/bufpool/buffers_test.go @@ -1,13 +1,13 @@ -package furgit +package bufpool import "testing" -func TestBorrowBodyResizeAndAppend(t *testing.T) { - b := borrowBody(1) +func TestBorrowBufferResizeAndAppend(t *testing.T) { + b := Borrow(1) defer b.Release() - if cap(b.buf) < defaultBodyCap { - t.Fatalf("expected capacity >= %d, got %d", defaultBodyCap, cap(b.buf)) + if cap(b.buf) < DefaultBufferCap { + t.Fatalf("expected capacity >= %d, got %d", DefaultBufferCap, cap(b.buf)) } b.Append([]byte("alpha")) @@ -30,8 +30,8 @@ func TestBorrowBodyResizeAndAppend(t *testing.T) { } } -func TestBorrowBodyRelease(t *testing.T) { - b := borrowBody(defaultBodyCap / 2) +func TestBorrowBufferRelease(t *testing.T) { + b := Borrow(DefaultBufferCap / 2) b.Append([]byte("data")) b.Release() if b.buf != nil { diff --git a/pack_pack.go b/pack_pack.go index 1569cf57..4930c139 100644 --- a/pack_pack.go +++ b/pack_pack.go @@ -11,6 +11,8 @@ import ( "os" "sync" "syscall" + + "git.sr.ht/~runxiyu/furgit/internal/bufpool" ) const ( @@ -78,10 +80,10 @@ func (repo *Repository) packReadAt(loc packlocation, want Hash) (Object, error) return obj, err } -func (repo *Repository) packBodyResolveAtLocation(loc packlocation) (ObjectType, borrowedBody, error) { +func (repo *Repository) packBodyResolveAtLocation(loc packlocation) (ObjectType, bufpool.Buffer, error) { pf, err := repo.packFile(loc.PackPath) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } return repo.packBodyResolveWithin(pf, loc.Offset) } @@ -128,34 +130,34 @@ func packHeaderRead(r io.Reader) (ObjectType, int, error) { return ty, size, nil } -func packSectionInflate(r io.Reader, sizeHint int) (borrowedBody, error) { +func packSectionInflate(r io.Reader, sizeHint int) (bufpool.Buffer, error) { zr, err := zlib.NewReader(r) if err != nil { - return borrowedBody{}, err + return bufpool.Buffer{}, err } defer func() { _ = zr.Close() }() if sizeHint > 0 { - body := borrowBody(sizeHint) + body := bufpool.Borrow(sizeHint) body.Resize(sizeHint) _, err := io.ReadFull(zr, body.Bytes()) if err != nil { body.Release() - return borrowedBody{}, err + return bufpool.Buffer{}, err } var extra [1]byte _, err = zr.Read(extra[:]) if err != io.EOF { body.Release() if err == nil { - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } - return borrowedBody{}, err + return bufpool.Buffer{}, err } return body, nil } - body := borrowBody(defaultBodyCap) + body := bufpool.Borrow(bufpool.DefaultBufferCap) var scratch [32 * 1024]byte for { n, err := zr.Read(scratch[:]) @@ -167,38 +169,38 @@ func packSectionInflate(r io.Reader, sizeHint int) (borrowedBody, error) { } if err != nil { body.Release() - return borrowedBody{}, err + return bufpool.Buffer{}, err } } } -func (repo *Repository) packDeltaResolveOfs(pf *packFile, deltaOffset uint64, r io.Reader) (ObjectType, borrowedBody, error) { +func (repo *Repository) packDeltaResolveOfs(pf *packFile, deltaOffset uint64, r io.Reader) (ObjectType, bufpool.Buffer, error) { dist, err := packDeltaReadOfsDistance(r) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } var baseOfs uint64 if deltaOffset > dist { baseOfs = deltaOffset - dist } if baseOfs == 0 { - return ObjectTypeInvalid, borrowedBody{}, ErrInvalidObject + return ObjectTypeInvalid, bufpool.Buffer{}, ErrInvalidObject } ty, body, err := repo.packBodyResolveWithin(pf, baseOfs) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } delta, err := packSectionInflate(r, 0) if err != nil { body.Release() - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } out, err := packDeltaApply(body, delta) delta.Release() body.Release() if err != nil { out.Release() - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } return ty, out, nil } @@ -220,19 +222,19 @@ func packDeltaReadOfsDistance(r io.Reader) (uint64, error) { return dist, nil } -func (repo *Repository) packBodyResolveByID(id Hash) (ObjectType, borrowedBody, error) { +func (repo *Repository) packBodyResolveByID(id Hash) (ObjectType, bufpool.Buffer, error) { loc, err := repo.packIndexFind(id) if err == nil { return repo.packBodyResolveAtLocation(loc) } if !errors.Is(err, ErrNotFound) { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } ty, body, err := repo.looseReadTyped(id) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } - return ty, borrowedFromOwned(body), nil + return ty, bufpool.FromOwned(body), nil } type packKey struct { @@ -300,14 +302,14 @@ func (repo *Repository) packTypeSizeWithin(pf *packFile, ofs uint64, seen map[pa } } -func (repo *Repository) packBodyResolveWithin(pf *packFile, ofs uint64) (ObjectType, borrowedBody, error) { +func (repo *Repository) packBodyResolveWithin(pf *packFile, ofs uint64) (ObjectType, bufpool.Buffer, error) { r, err := pf.cursor(ofs) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } ty, size, err := packHeaderRead(r) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } switch ty { @@ -318,51 +320,51 @@ func (repo *Repository) packBodyResolveWithin(pf *packFile, ofs uint64) (ObjectT var base Hash _, err := io.ReadFull(r, base.data[:repo.hashSize]) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } base.size = repo.hashSize delta, err := packSectionInflate(r, 0) if err != nil { - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } bt, body, err := repo.packBodyResolveByID(base) if err != nil { delta.Release() - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } out, err := packDeltaApply(body, delta) delta.Release() body.Release() if err != nil { out.Release() - return ObjectTypeInvalid, borrowedBody{}, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } return bt, out, nil case ObjectTypeOfsDelta: return repo.packDeltaResolveOfs(pf, ofs, r) case ObjectTypeInvalid, ObjectTypeFuture: - return ObjectTypeInvalid, borrowedBody{}, ErrInvalidObject + return ObjectTypeInvalid, bufpool.Buffer{}, ErrInvalidObject default: - return ObjectTypeInvalid, borrowedBody{}, ErrInvalidObject + return ObjectTypeInvalid, bufpool.Buffer{}, ErrInvalidObject } } -func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { +func packDeltaApply(base, delta bufpool.Buffer) (bufpool.Buffer, error) { pos := 0 baseBytes := base.Bytes() deltaBytes := delta.Bytes() srcSize, err := packVarintRead(deltaBytes, &pos) if err != nil { - return borrowedBody{}, err + return bufpool.Buffer{}, err } dstSize, err := packVarintRead(deltaBytes, &pos) if err != nil { - return borrowedBody{}, err + return bufpool.Buffer{}, err } if srcSize != len(baseBytes) { - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } - out := borrowBody(dstSize) + out := bufpool.Borrow(dstSize) out.Resize(dstSize) outBytes := out.Bytes() outPos := 0 @@ -377,7 +379,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { if op&0x01 != 0 { if pos >= len(deltaBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } off |= int(deltaBytes[pos]) pos++ @@ -385,7 +387,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { if op&0x02 != 0 { if pos >= len(deltaBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } off |= int(deltaBytes[pos]) << 8 pos++ @@ -393,7 +395,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { if op&0x04 != 0 { if pos >= len(deltaBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } off |= int(deltaBytes[pos]) << 16 pos++ @@ -401,7 +403,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { if op&0x08 != 0 { if pos >= len(deltaBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } off |= int(deltaBytes[pos]) << 24 pos++ @@ -409,7 +411,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { if op&0x10 != 0 { if pos >= len(deltaBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } n |= int(deltaBytes[pos]) pos++ @@ -417,7 +419,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { if op&0x20 != 0 { if pos >= len(deltaBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } n |= int(deltaBytes[pos]) << 8 pos++ @@ -425,7 +427,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { if op&0x40 != 0 { if pos >= len(deltaBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } n |= int(deltaBytes[pos]) << 16 pos++ @@ -435,7 +437,7 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { } if off+n > len(baseBytes) || outPos+n > len(outBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } copy(outBytes[outPos:], baseBytes[off:off+n]) outPos += n @@ -443,20 +445,20 @@ func packDeltaApply(base, delta borrowedBody) (borrowedBody, error) { n := int(op) if pos+n > len(deltaBytes) || outPos+n > len(outBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } copy(outBytes[outPos:], deltaBytes[pos:pos+n]) pos += n outPos += n default: out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } } if outPos != len(outBytes) { out.Release() - return borrowedBody{}, ErrInvalidObject + return bufpool.Buffer{}, ErrInvalidObject } return out, nil } diff --git a/pack_test.go b/pack_test.go index 82176ae1..4d6f651f 100644 --- a/pack_test.go +++ b/pack_test.go @@ -5,6 +5,8 @@ import ( "compress/zlib" "encoding/binary" "testing" + + "git.sr.ht/~runxiyu/furgit/internal/bufpool" ) func compressBytes(t *testing.T, payload []byte) []byte { @@ -112,10 +114,10 @@ func TestPackVarintRead(t *testing.T) { } func TestPackDeltaApply(t *testing.T) { - base := borrowedFromOwned([]byte("abcdefghij")) + base := bufpool.FromOwned([]byte("abcdefghij")) defer base.Release() deltaBytes := []byte{0x0a, 0x0a, 0x91, 0x00, 0x03, 0x03, 'X', 'Y', 'Z', 0x91, 0x06, 0x04} - delta := borrowedFromOwned(deltaBytes) + delta := bufpool.FromOwned(deltaBytes) defer delta.Release() out, err := packDeltaApply(base, delta) if err != nil { @@ -128,9 +130,9 @@ func TestPackDeltaApply(t *testing.T) { } func TestPackDeltaApplyMismatchedBaseSize(t *testing.T) { - base := borrowedFromOwned([]byte("abc")) + base := bufpool.FromOwned([]byte("abc")) defer base.Release() - delta := borrowedFromOwned([]byte{0x04, 0x04}) + delta := bufpool.FromOwned([]byte{0x04, 0x04}) defer delta.Release() if _, err := packDeltaApply(base, delta); err == nil { t.Fatal("expected error for mismatched base size") |
