diff options
| author | 2025-11-19 08:00:00 +0800 | |
|---|---|---|
| committer | 2025-11-19 08:00:00 +0800 | |
| commit | 811ba33e8401e2becf2c6caa03bf293fc610059a (patch) | |
| tree | ee69450ff7a65cc46691693187780b5fd8e14870 | |
| parent | adler32: Unroll update loop (diff) | |
| signature | No signature | |
Initial attempt to make a compressor with less overhead
io.Reader actually has massive overhead...
| -rw-r--r-- | internal/bench/decompress.go | 2 | ||||
| -rw-r--r-- | internal/flate/decompress_bytes.go | 99 | ||||
| -rw-r--r-- | internal/flate/decompress_test.go | 76 | ||||
| -rw-r--r-- | internal/zlib/decompress.go | 69 | ||||
| -rw-r--r-- | internal/zlib/decompress_test.go | 84 | ||||
| -rw-r--r-- | loose.go | 43 | ||||
| -rw-r--r-- | pack_pack.go | 55 |
7 files changed, 386 insertions, 42 deletions
diff --git a/internal/bench/decompress.go b/internal/bench/decompress.go index c4a4d274..2eab0ad2 100644 --- a/internal/bench/decompress.go +++ b/internal/bench/decompress.go @@ -1,3 +1,5 @@ +//go:build ignore + package main import ( diff --git a/internal/flate/decompress_bytes.go b/internal/flate/decompress_bytes.go new file mode 100644 index 00000000..2cd9fd89 --- /dev/null +++ b/internal/flate/decompress_bytes.go @@ -0,0 +1,99 @@ +package flate + +import ( + "io" + "sync" + + "git.sr.ht/~runxiyu/furgit/internal/bufpool" +) + +// byteSliceReader implements Reader over an in-memory byte slice. +type byteSliceReader struct { + data []byte + off int +} + +func (r *byteSliceReader) Reset(data []byte) { + r.data = data + r.off = 0 +} + +func (r *byteSliceReader) Read(p []byte) (int, error) { + if r.off >= len(r.data) { + return 0, io.EOF + } + n := copy(p, r.data[r.off:]) + r.off += n + return n, nil +} + +func (r *byteSliceReader) ReadByte() (byte, error) { + if r.off >= len(r.data) { + return 0, io.EOF + } + b := r.data[r.off] + r.off++ + return b, nil +} + +// bufferDecompressor wraps the core decompressor with pooling state so that +// byte-slice decompressions avoid repeated allocations. +type bufferDecompressor struct { + dec decompressor + reader byteSliceReader +} + +var bufferDecompressorPool = sync.Pool{ + New: func() any { + fixedHuffmanDecoderInit() + d := &bufferDecompressor{} + d.dec.bits = new([maxNumLit + maxNumDist]int) + d.dec.codebits = new([numCodes]int) + d.dec.step = (*decompressor).nextBlock + return d + }, +} + +// Decompress inflates the provided DEFLATE stream and returns the full output +// in a pooled bufpool.Buffer along with the number of consumed bytes from src. +func Decompress(src []byte) (bufpool.Buffer, int, error) { + return DecompressDict(src, nil) +} + +// DecompressDict inflates the provided DEFLATE stream using dict as the preset +// dictionary and returns the full output in a pooled bufpool.Buffer. The second +// returned value reports how many bytes of src were consumed. +func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, int, error) { + d := bufferDecompressorPool.Get().(*bufferDecompressor) + defer func() { + d.reader.Reset(nil) + bufferDecompressorPool.Put(d) + }() + + d.reader.Reset(src) + if err := d.dec.Reset(&d.reader, dict); err != nil { + return bufpool.Buffer{}, 0, err + } + + out := bufpool.Borrow(bufpool.DefaultBufferCap) + out.Resize(0) + + for { + if len(d.dec.toRead) > 0 { + out.Append(d.dec.toRead) + d.dec.toRead = nil + continue + } + if d.dec.err != nil { + if d.dec.err == io.EOF { + return out, d.reader.off, nil + } + out.Release() + return bufpool.Buffer{}, 0, d.dec.err + } + d.dec.step(&d.dec) + if d.dec.err != nil && len(d.dec.toRead) == 0 { + d.dec.toRead = d.dec.dict.readFlush() + } + } +} diff --git a/internal/flate/decompress_test.go b/internal/flate/decompress_test.go new file mode 100644 index 00000000..089159d6 --- /dev/null +++ b/internal/flate/decompress_test.go @@ -0,0 +1,76 @@ +package flate + +import ( + "bytes" + stdflate "compress/flate" + "testing" +) + +func compressDeflate(t *testing.T, payload, dict []byte) []byte { + t.Helper() + var buf bytes.Buffer + var ( + w *stdflate.Writer + err error + ) + if dict != nil { + w, err = stdflate.NewWriterDict(&buf, stdflate.DefaultCompression, dict) + } else { + w, err = stdflate.NewWriter(&buf, stdflate.DefaultCompression) + } + if err != nil { + t.Fatalf("NewWriter: %v", err) + } + if _, err := w.Write(payload); err != nil { + t.Fatalf("Write: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + return buf.Bytes() +} + +func TestDecompress(t *testing.T) { + payload := bytes.Repeat([]byte("golang"), 32) + compressed := compressDeflate(t, payload, nil) + + out, _, err := Decompress(compressed) + if err != nil { + t.Fatalf("Decompress: %v", err) + } + defer out.Release() + + if !bytes.Equal(out.Bytes(), payload) { + t.Fatalf("unexpected payload: got %q", out.Bytes()) + } +} + +func TestDecompressDict(t *testing.T) { + dict := []byte("furgit dictionary payload") + payload := append([]byte(nil), dict...) + payload = append(payload, []byte(" -- and some more data repeated -- and some more data repeated")...) + + compressed := compressDeflate(t, payload, dict) + + out, _, err := DecompressDict(compressed, dict) + if err != nil { + t.Fatalf("DecompressDict: %v", err) + } + defer out.Release() + + if !bytes.Equal(out.Bytes(), payload) { + t.Fatalf("unexpected payload: got %q", out.Bytes()) + } +} + +func TestDecompressDictMissing(t *testing.T) { + dict := []byte("shared prefix to enforce dictionary usage") + payload := append([]byte(nil), dict...) + payload = append(payload, []byte(" trailing data to force reference")...) + + compressed := compressDeflate(t, payload, dict) + + if _, _, err := Decompress(compressed); err == nil { + t.Fatalf("expected error when dictionary missing") + } +} diff --git a/internal/zlib/decompress.go b/internal/zlib/decompress.go new file mode 100644 index 00000000..55f6d3e2 --- /dev/null +++ b/internal/zlib/decompress.go @@ -0,0 +1,69 @@ +package zlib + +import ( + "encoding/binary" + "io" + + "git.sr.ht/~runxiyu/furgit/internal/adler32" + "git.sr.ht/~runxiyu/furgit/internal/bufpool" + "git.sr.ht/~runxiyu/furgit/internal/flate" +) + +// Decompress inflates the provided zlib wrapped stream and returns the +// uncompressed data inside a pooled bufpool.Buffer. +func Decompress(src []byte) (bufpool.Buffer, error) { + return DecompressDict(src, nil) +} + +// DecompressDict is like Decompress but accepts a preset dictionary. The +// dictionary must match the checksum embedded in the stream if the dictionary +// flag is present. +func DecompressDict(src []byte, dict []byte) (bufpool.Buffer, error) { + if len(src) < 6 { + return bufpool.Buffer{}, io.ErrUnexpectedEOF + } + + cmf := src[0] + flg := src[1] + if (cmf&0x0f != zlibDeflate) || (cmf>>4 > zlibMaxWindow) || (binary.BigEndian.Uint16(src[:2])%31 != 0) { + return bufpool.Buffer{}, ErrHeader + } + + offset := 2 + haveDict := flg&0x20 != 0 + if haveDict { + if len(src) < offset+4 { + return bufpool.Buffer{}, io.ErrUnexpectedEOF + } + if dict == nil { + return bufpool.Buffer{}, ErrDictionary + } + checksum := binary.BigEndian.Uint32(src[offset : offset+4]) + if checksum != adler32.Checksum(dict) { + return bufpool.Buffer{}, ErrDictionary + } + offset += 4 + } + + if len(src[offset:]) < 4 { + return bufpool.Buffer{}, io.ErrUnexpectedEOF + } + + deflateData := src[offset:] + out, consumed, err := flate.DecompressDict(deflateData, dict) + if err != nil { + return bufpool.Buffer{}, err + } + + checksumPos := offset + consumed + if checksumPos+4 > len(src) { + out.Release() + return bufpool.Buffer{}, io.ErrUnexpectedEOF + } + expected := binary.BigEndian.Uint32(src[checksumPos : checksumPos+4]) + if expected != adler32.Checksum(out.Bytes()) { + out.Release() + return bufpool.Buffer{}, ErrChecksum + } + return out, nil +} diff --git a/internal/zlib/decompress_test.go b/internal/zlib/decompress_test.go new file mode 100644 index 00000000..bb517ae6 --- /dev/null +++ b/internal/zlib/decompress_test.go @@ -0,0 +1,84 @@ +package zlib + +import ( + "bytes" + stdzlib "compress/zlib" + "testing" +) + +func compressZlib(t *testing.T, payload, dict []byte) []byte { + t.Helper() + var buf bytes.Buffer + var ( + w *stdzlib.Writer + err error + ) + if dict != nil { + w, err = stdzlib.NewWriterLevelDict(&buf, stdzlib.DefaultCompression, dict) + } else { + w = stdzlib.NewWriter(&buf) + } + if err != nil { + t.Fatalf("NewWriter: %v", err) + } + if _, err := w.Write(payload); err != nil { + t.Fatalf("Write: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + return buf.Bytes() +} + +func TestDecompress(t *testing.T) { + payload := []byte("hello, zlib world!") + compressed := compressZlib(t, payload, nil) + + out, err := Decompress(compressed) + if err != nil { + t.Fatalf("Decompress: %v", err) + } + defer out.Release() + + if !bytes.Equal(out.Bytes(), payload) { + t.Fatalf("unexpected payload %q", out.Bytes()) + } +} + +func TestDecompressDict(t *testing.T) { + dict := []byte("git dictionary for zlib") + payload := append([]byte(nil), dict...) + payload = append(payload, []byte(" -- extended body -- extended body")...) + compressed := compressZlib(t, payload, dict) + + out, err := DecompressDict(compressed, dict) + if err != nil { + t.Fatalf("DecompressDict: %v", err) + } + defer out.Release() + + if !bytes.Equal(out.Bytes(), payload) { + t.Fatalf("unexpected payload %q", out.Bytes()) + } +} + +func TestDecompressDictMissing(t *testing.T) { + dict := []byte("preset dictionary") + payload := append([]byte(nil), dict...) + payload = append(payload, []byte(" .. more data ..")...) + compressed := compressZlib(t, payload, dict) + + if _, err := Decompress(compressed); err != ErrDictionary { + t.Fatalf("expected ErrDictionary, got %v", err) + } +} + +func TestDecompressChecksumError(t *testing.T) { + payload := []byte("checksum check") + compressed := compressZlib(t, payload, nil) + compressed[len(compressed)-1] ^= 0xff + + if _, err := Decompress(compressed); err != ErrChecksum { + t.Fatalf("expected ErrChecksum, got %v", err) + } +} @@ -9,6 +9,7 @@ import ( "path/filepath" "strconv" + "git.sr.ht/~runxiyu/furgit/internal/bufpool" "git.sr.ht/~runxiyu/furgit/internal/zlib" ) @@ -28,55 +29,63 @@ func (repo *Repository) looseRead(id Hash) (StoredObject, error) { if err != nil { return nil, err } - return parseObjectBody(ty, id, body, repo) + obj, err := parseObjectBody(ty, id, body.Bytes(), repo) + body.Release() + return obj, err } -func (repo *Repository) looseReadTyped(id Hash) (ObjectType, []byte, error) { +func (repo *Repository) looseReadTyped(id Hash) (ObjectType, bufpool.Buffer, error) { path, err := repo.loosePath(id) if err != nil { - return ObjectTypeInvalid, nil, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } path = repo.repoPath(path) f, err := os.Open(path) if err != nil { if os.IsNotExist(err) { - return ObjectTypeInvalid, nil, ErrNotFound + return ObjectTypeInvalid, bufpool.Buffer{}, ErrNotFound } - return ObjectTypeInvalid, nil, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } defer func() { _ = f.Close() }() - zr, err := zlib.NewReader(f) + compressed, err := io.ReadAll(f) if err != nil { - return ObjectTypeInvalid, nil, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } - defer func() { _ = zr.Close() }() - raw, err := io.ReadAll(zr) + raw, err := zlib.Decompress(compressed) if err != nil { - return ObjectTypeInvalid, nil, err + return ObjectTypeInvalid, bufpool.Buffer{}, err } - nul := bytes.IndexByte(raw, 0) + rawBytes := raw.Bytes() + nul := bytes.IndexByte(rawBytes, 0) if nul < 0 { - return ObjectTypeInvalid, nil, ErrInvalidObject + raw.Release() + return ObjectTypeInvalid, bufpool.Buffer{}, ErrInvalidObject } - header := raw[:nul] - body := raw[nul+1:] + header := rawBytes[:nul] + body := rawBytes[nul+1:] ty, declaredSize, err := parseLooseHeader(header) if err != nil { - return ObjectTypeInvalid, nil, err + raw.Release() + return ObjectTypeInvalid, bufpool.Buffer{}, err } if declaredSize != int64(len(body)) { - return ObjectTypeInvalid, nil, ErrInvalidObject + raw.Release() + return ObjectTypeInvalid, bufpool.Buffer{}, ErrInvalidObject } + + copy(rawBytes, body) + raw.Resize(len(body)) // if !repo.verifyRawObject(raw, id) { // return ObjectTypeInvalid, nil, ErrInvalidObject // } - return ty, body, nil + return ty, raw, nil } func (repo *Repository) looseTypeSize(id Hash) (ObjectType, int64, error) { diff --git a/pack_pack.go b/pack_pack.go index fb2d965e..ced7ca4c 100644 --- a/pack_pack.go +++ b/pack_pack.go @@ -130,33 +130,34 @@ func packHeaderRead(r io.Reader) (ObjectType, int, error) { return ty, size, nil } -func packSectionInflate(r io.Reader, sizeHint int) (bufpool.Buffer, error) { +func packSectionInflate(pf *packFile, objectOfs uint64, r io.Reader, sizeHint int) (bufpool.Buffer, error) { + if pf != nil { + if br, ok := r.(*bytes.Reader); ok { + total := br.Size() + remaining := int64(br.Len()) + consumed := total - remaining + start := objectOfs + uint64(consumed) + if int64(consumed) < 0 || start > uint64(len(pf.data)) { + return bufpool.Buffer{}, ErrInvalidObject + } + body, err := zlib.Decompress(pf.data[start:]) + if err != nil { + return bufpool.Buffer{}, err + } + if sizeHint > 0 && len(body.Bytes()) != sizeHint { + body.Release() + return bufpool.Buffer{}, ErrInvalidObject + } + return body, nil + } + } + zr, err := zlib.NewReader(r) if err != nil { return bufpool.Buffer{}, err } defer func() { _ = zr.Close() }() - if sizeHint > 0 { - body := bufpool.Borrow(sizeHint) - body.Resize(sizeHint) - _, err := io.ReadFull(zr, body.Bytes()) - if err != nil { - body.Release() - return bufpool.Buffer{}, err - } - var extra [1]byte - _, err = zr.Read(extra[:]) - if err != io.EOF { - body.Release() - if err == nil { - return bufpool.Buffer{}, ErrInvalidObject - } - return bufpool.Buffer{}, err - } - return body, nil - } - body := bufpool.Borrow(bufpool.DefaultBufferCap) var scratch [32 * 1024]byte for { @@ -165,6 +166,10 @@ func packSectionInflate(r io.Reader, sizeHint int) (bufpool.Buffer, error) { body.Append(scratch[:n]) } if err == io.EOF { + if sizeHint > 0 && len(body.Bytes()) != sizeHint { + body.Release() + return bufpool.Buffer{}, ErrInvalidObject + } return body, nil } if err != nil { @@ -190,7 +195,7 @@ func (repo *Repository) packDeltaResolveOfs(pf *packFile, deltaOffset uint64, r if err != nil { return ObjectTypeInvalid, bufpool.Buffer{}, err } - delta, err := packSectionInflate(r, 0) + delta, err := packSectionInflate(pf, deltaOffset, r, 0) if err != nil { body.Release() return ObjectTypeInvalid, bufpool.Buffer{}, err @@ -234,7 +239,7 @@ func (repo *Repository) packBodyResolveByID(id Hash) (ObjectType, bufpool.Buffer if err != nil { return ObjectTypeInvalid, bufpool.Buffer{}, err } - return ty, bufpool.FromOwned(body), nil + return ty, body, nil } type packKey struct { @@ -314,7 +319,7 @@ func (repo *Repository) packBodyResolveWithin(pf *packFile, ofs uint64) (ObjectT switch ty { case ObjectTypeCommit, ObjectTypeTree, ObjectTypeBlob, ObjectTypeTag: - body, err := packSectionInflate(r, size) + body, err := packSectionInflate(pf, ofs, r, size) return ty, body, err case ObjectTypeRefDelta: var base Hash @@ -323,7 +328,7 @@ func (repo *Repository) packBodyResolveWithin(pf *packFile, ofs uint64) (ObjectT return ObjectTypeInvalid, bufpool.Buffer{}, err } base.size = repo.hashSize - delta, err := packSectionInflate(r, 0) + delta, err := packSectionInflate(pf, ofs, r, 0) if err != nil { return ObjectTypeInvalid, bufpool.Buffer{}, err } |
