aboutsummaryrefslogtreecommitdiff
path: root/format/pack/ingest/stream_scan.go
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-03-05 18:24:40 +0800
committerGravatar Runxi Yu2026-03-05 19:05:47 +0800
commit57f1818d547f2f1dca38033b4e29f62d89ef80f9 (patch)
tree88d55ac38e2427860bf380c8cce42fcb3bb1e9ee /format/pack/ingest/stream_scan.go
parentinternal/compress/zlib: Use flate's compression consumed counter (diff)
signatureNo signature
format/pack/ingest: Init
Diffstat (limited to 'format/pack/ingest/stream_scan.go')
-rw-r--r--format/pack/ingest/stream_scan.go335
1 files changed, 335 insertions, 0 deletions
diff --git a/format/pack/ingest/stream_scan.go b/format/pack/ingest/stream_scan.go
new file mode 100644
index 00000000..2c2389d8
--- /dev/null
+++ b/format/pack/ingest/stream_scan.go
@@ -0,0 +1,335 @@
+package ingest
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+
+ deltaapply "codeberg.org/lindenii/furgit/format/delta/apply"
+ packfmt "codeberg.org/lindenii/furgit/format/pack"
+ "codeberg.org/lindenii/furgit/internal/compress/zlib"
+ "codeberg.org/lindenii/furgit/objectheader"
+ "codeberg.org/lindenii/furgit/objectid"
+ "codeberg.org/lindenii/furgit/objecttype"
+)
+
+// streamPackAndScan copies src into temp .pack while scanning packed entries.
+func streamPackAndScan(state *ingestState) error {
+ hashImpl, err := state.algo.New()
+ if err != nil {
+ return err
+ }
+
+ state.stream = newStreamCopier(
+ state.src,
+ state.packFile,
+ newTrailerVerifier(hashImpl, state.algo.Size()),
+ )
+
+ if err := readAndValidatePackHeader(state); err != nil {
+ return err
+ }
+
+ state.records = make([]objectRecord, 0, state.objectCountHeader)
+ state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader)
+ state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader)
+
+ for range state.objectCountHeader {
+ nextOffset, err := scanOneEntry(state, state.stream.offset)
+ if err != nil {
+ return err
+ }
+
+ if nextOffset != state.stream.offset {
+ return fmt.Errorf("format/pack/ingest: internal stream offset mismatch")
+ }
+ }
+
+ return finalizeStreamPackHash(state)
+}
+
+// readAndValidatePackHeader reads and validates PACK header from the stream.
+func readAndValidatePackHeader(state *ingestState) error {
+ var hdr [12]byte
+ if err := state.stream.readFull(hdr[:]); err != nil {
+ return &ErrInvalidPackHeader{Reason: fmt.Sprintf("read header: %v", err)}
+ }
+
+ if binary.BigEndian.Uint32(hdr[:4]) != packfmt.Signature {
+ return &ErrInvalidPackHeader{Reason: "signature mismatch"}
+ }
+
+ version := binary.BigEndian.Uint32(hdr[4:8])
+ if !packfmt.VersionSupported(version) {
+ return &ErrInvalidPackHeader{Reason: fmt.Sprintf("unsupported version %d", version)}
+ }
+
+ state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12])
+ if state.objectCountHeader == 0 {
+ return &ErrInvalidPackHeader{Reason: "zero objects"}
+ }
+
+ return nil
+}
+
+// scanOneEntry scans one pack entry from stream and appends one record.
+func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) {
+ state.stream.beginEntryCRC()
+
+ record, err := parseEntryPrefix(state, startOffset)
+ if err != nil {
+ return 0, err
+ }
+
+ contentLen, consumedInput, oid, err := drainEntryPayload(state, record)
+ if err != nil {
+ return 0, err
+ }
+
+ if contentLen != record.declaredSize {
+ return 0, &ErrMalformedPackEntry{
+ Offset: startOffset,
+ Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize),
+ }
+ }
+
+ endOffset := startOffset + uint64(record.headerLen) + consumedInput
+ if endOffset > state.stream.offset {
+ return 0, &ErrMalformedPackEntry{
+ Offset: startOffset,
+ Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.offset),
+ }
+ }
+
+ record.packedLen = endOffset - startOffset
+ record.dataOffset = startOffset + uint64(record.headerLen)
+ if record.packedLen < uint64(record.headerLen) {
+ return 0, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative payload span"}
+ }
+
+ crc, err := state.stream.endEntryCRC()
+ if err != nil {
+ return 0, err
+ }
+ record.crc32 = crc
+
+ if packfmt.IsBaseObjectType(record.packedType) {
+ record.objectID = oid
+ record.realType = record.packedType
+ record.resolved = true
+ }
+
+ recordIdx := len(state.records)
+ state.records = append(state.records, record)
+ state.offsetToRecord[record.offset] = recordIdx
+ if record.resolved {
+ state.objectToRecord[record.objectID.String()] = recordIdx
+ }
+
+ switch record.packedType {
+ case objecttype.TypeOfsDelta:
+ state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{
+ baseOffset: record.baseOffset,
+ recordIdx: recordIdx,
+ })
+ case objecttype.TypeRefDelta:
+ state.refDeltas = append(state.refDeltas, refDeltaRef{
+ baseObject: record.baseObject,
+ recordIdx: recordIdx,
+ })
+ }
+
+ return endOffset, nil
+}
+
+// parseEntryPrefix parses one entry prefix from stream.
+func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) {
+ var record objectRecord
+ record.offset = startOffset
+
+ first, err := state.stream.ReadByte()
+ if err != nil {
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)}
+ }
+
+ record.packedType = objecttype.Type((first >> 4) & 0x07)
+ size := int64(first & 0x0f)
+ headerLen := uint32(1)
+ shift := uint(4)
+ b := first
+
+ for b&0x80 != 0 {
+ b, err = state.stream.ReadByte()
+ if err != nil {
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)}
+ }
+ headerLen++
+ size |= int64(b&0x7f) << shift
+ shift += 7
+ }
+ if size < 0 {
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative declared size"}
+ }
+ record.declaredSize = size
+
+ switch record.packedType {
+ case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag:
+ case objecttype.TypeRefDelta:
+ baseRaw := make([]byte, state.algo.Size())
+ if err := state.stream.readFull(baseRaw); err != nil {
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)}
+ }
+ baseID, err := objectid.FromBytes(state.algo, baseRaw)
+ if err != nil {
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)}
+ }
+ record.baseObject = baseID
+ headerLen += uint32(len(baseRaw))
+ case objecttype.TypeOfsDelta:
+ dist, consumed, err := readOfsDistanceFromStream(state.stream)
+ if err != nil {
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: err.Error()}
+ }
+ if startOffset <= dist {
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "ofs base offset out of bounds"}
+ }
+ record.baseOffset = startOffset - dist
+ headerLen += uint32(consumed)
+ case objecttype.TypeInvalid, objecttype.TypeFuture:
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}
+ default:
+ return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}
+ }
+
+ record.headerLen = headerLen
+
+ return record, nil
+}
+
+// drainEntryPayload inflates one entry payload from stream and returns
+// (inflatedLength, consumedInput, oidForBaseEntry).
+func drainEntryPayload(state *ingestState, record objectRecord) (int64, uint64, objectid.ObjectID, error) {
+ var zero objectid.ObjectID
+ reader, err := zlib.NewReader(state.stream)
+ if err != nil {
+ return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)}
+ }
+ defer func() { _ = reader.Close() }()
+
+ var total int64
+ if packfmt.IsBaseObjectType(record.packedType) {
+ header, ok := objectheader.Encode(record.packedType, record.declaredSize)
+ if !ok {
+ return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "encode object header"}
+ }
+
+ hashImpl, err := state.algo.New()
+ if err != nil {
+ return 0, 0, zero, err
+ }
+ _, _ = hashImpl.Write(header)
+
+ n, err := io.Copy(hashImpl, reader)
+ if err != nil {
+ return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)}
+ }
+ total = n
+
+ oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil))
+ if err != nil {
+ return 0, 0, zero, err
+ }
+
+ return total, reader.InputConsumed(), oid, nil
+ }
+
+ if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta {
+ n, err := io.Copy(io.Discard, reader)
+ if err != nil {
+ return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)}
+ }
+ total = n
+
+ return total, reader.InputConsumed(), zero, nil
+ }
+
+ return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "unsupported payload type"}
+}
+
+// readOfsDistanceFromStream reads one ofs-delta encoded distance.
+func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) {
+ first, err := reader.ReadByte()
+ if err != nil {
+ return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err)
+ }
+
+ dist := uint64(first & 0x7f)
+ consumed := 1
+ b := first
+ for b&0x80 != 0 {
+ b, err = reader.ReadByte()
+ if err != nil {
+ return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err)
+ }
+ consumed++
+ dist = ((dist + 1) << 7) + uint64(b&0x7f)
+ }
+
+ return dist, consumed, nil
+}
+
+// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity.
+func finalizeStreamPackHash(state *ingestState) error {
+ // We have already consumed object entries. Drain exactly the hash trailer.
+ trailer := make([]byte, state.algo.Size())
+ if err := state.stream.readFull(trailer); err != nil {
+ return &ErrPackTrailerMismatch{}
+ }
+
+ // Ensure no trailing garbage.
+ var probe [1]byte
+ n, err := state.stream.Read(probe[:])
+ if n > 0 || err == nil {
+ return fmt.Errorf("format/pack/ingest: pack has trailing garbage")
+ }
+ if err != io.EOF {
+ return err
+ }
+
+ if err := state.stream.verifier.verify(); err != nil {
+ return err
+ }
+
+ packHash, err := objectid.FromBytes(state.algo, trailer)
+ if err != nil {
+ return err
+ }
+ state.packHash = packHash
+
+ return nil
+}
+
+// readDeltaHeaderSizes reads source and destination sizes from one delta payload.
+func readDeltaHeaderSizes(payload []byte) (int, int, error) {
+ reader := &byteSliceReader{data: payload}
+
+ return deltaapply.ReadHeaderSizes(reader)
+}
+
+// byteSliceReader implements io.ByteReader on []byte.
+type byteSliceReader struct {
+ data []byte
+ pos int
+}
+
+// ReadByte reads one byte from receiver.
+func (reader *byteSliceReader) ReadByte() (byte, error) {
+ if reader.pos >= len(reader.data) {
+ return 0, io.EOF
+ }
+
+ b := reader.data[reader.pos]
+ reader.pos++
+
+ return b, nil
+}