diff options
| author | 2026-03-06 11:04:15 +0800 | |
|---|---|---|
| committer | 2026-03-06 11:16:16 +0800 | |
| commit | 6945464a0438396de97b9bc28c1bd31c22456092 (patch) | |
| tree | 94de06ad52d6fdff77c6d05dc5c415c65a8311e6 /format | |
| parent | reachability: Split walk files (diff) | |
| signature | No signature | |
format/pack/ingest: Split files
Diffstat (limited to 'format')
30 files changed, 1196 insertions, 1067 deletions
diff --git a/format/pack/ingest/byteslice_reader.go b/format/pack/ingest/byteslice_reader.go new file mode 100644 index 00000000..a1570ef3 --- /dev/null +++ b/format/pack/ingest/byteslice_reader.go @@ -0,0 +1,21 @@ +package ingest + +import "io" + +// byteSliceReader implements io.ByteReader on []byte. +type byteSliceReader struct { + data []byte + pos int +} + +// ReadByte reads one byte from receiver. +func (reader *byteSliceReader) ReadByte() (byte, error) { + if reader.pos >= len(reader.data) { + return 0, io.EOF + } + + b := reader.data[reader.pos] + reader.pos++ + + return b, nil +} diff --git a/format/pack/ingest/counting_writer.go b/format/pack/ingest/counting_writer.go new file mode 100644 index 00000000..051ad9d1 --- /dev/null +++ b/format/pack/ingest/counting_writer.go @@ -0,0 +1,17 @@ +package ingest + +import "io" + +// countingWriter counts bytes written to dst. +type countingWriter struct { + dst io.Writer + n int +} + +// Write writes src to dst and tracks output byte count. +func (writer *countingWriter) Write(src []byte) (int, error) { + n, err := writer.dst.Write(src) + writer.n += n + + return n, err +} diff --git a/format/pack/ingest/crc.go b/format/pack/ingest/crc.go new file mode 100644 index 00000000..35fa6952 --- /dev/null +++ b/format/pack/ingest/crc.go @@ -0,0 +1,22 @@ +package ingest + +import "fmt" + +// beginEntryCRC starts inline CRC accumulation for one packed entry. +func (scanner *streamScanner) beginEntryCRC() { + scanner.entryCRC = 0 + scanner.inEntryCRC = true +} + +// endEntryCRC finishes inline CRC accumulation for one packed entry. +func (scanner *streamScanner) endEntryCRC() (uint32, error) { + if !scanner.inEntryCRC { + return 0, fmt.Errorf("format/pack/ingest: entry CRC not started") + } + + crc := scanner.entryCRC + scanner.entryCRC = 0 + scanner.inEntryCRC = false + + return crc, nil +} diff --git a/format/pack/ingest/delta_header.go b/format/pack/ingest/delta_header.go new file mode 100644 index 00000000..4ecbea1b --- /dev/null +++ b/format/pack/ingest/delta_header.go @@ -0,0 +1,11 @@ +package ingest + +import deltaapply "codeberg.org/lindenii/furgit/format/delta/apply" + +// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity. +// readDeltaHeaderSizes reads source and destination sizes from one delta payload. +func readDeltaHeaderSizes(payload []byte) (int, int, error) { + reader := &byteSliceReader{data: payload} + + return deltaapply.ReadHeaderSizes(reader) +} diff --git a/format/pack/ingest/distance.go b/format/pack/ingest/distance.go new file mode 100644 index 00000000..9bc4d886 --- /dev/null +++ b/format/pack/ingest/distance.go @@ -0,0 +1,30 @@ +package ingest + +import ( + "fmt" + "io" +) + +// readOfsDistanceFromStream reads one ofs-delta encoded distance. +func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) { + first, err := reader.ReadByte() + if err != nil { + return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err) + } + + dist := uint64(first & 0x7f) + consumed := 1 + + b := first + for b&0x80 != 0 { + b, err = reader.ReadByte() + if err != nil { + return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err) + } + + consumed++ + dist = ((dist + 1) << 7) + uint64(b&0x7f) + } + + return dist, consumed, nil +} diff --git a/format/pack/ingest/drain.go b/format/pack/ingest/drain.go new file mode 100644 index 00000000..b92683c3 --- /dev/null +++ b/format/pack/ingest/drain.go @@ -0,0 +1,68 @@ +package ingest + +import ( + "fmt" + "io" + + packfmt "codeberg.org/lindenii/furgit/format/pack" + "codeberg.org/lindenii/furgit/internal/compress/zlib" + "codeberg.org/lindenii/furgit/objectheader" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// drainEntryPayload inflates one entry payload from stream and returns +// (inflatedLength, consumedInput, oidForBaseEntry). +func drainEntryPayload(state *ingestState, record objectRecord) (int64, uint64, objectid.ObjectID, error) { + var zero objectid.ObjectID + + reader, err := zlib.NewReader(state.stream) + if err != nil { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)} + } + + defer func() { _ = reader.Close() }() + + var total int64 + + if packfmt.IsBaseObjectType(record.packedType) { + header, ok := objectheader.Encode(record.packedType, record.declaredSize) + if !ok { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "encode object header"} + } + + hashImpl, err := state.algo.New() + if err != nil { + return 0, 0, zero, err + } + + _, _ = hashImpl.Write(header) + + n, err := io.Copy(hashImpl, reader) + if err != nil { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)} + } + + total = n + + oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil)) + if err != nil { + return 0, 0, zero, err + } + + return total, reader.InputConsumed(), oid, nil + } + + if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta { + n, err := io.Copy(io.Discard, reader) + if err != nil { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)} + } + + total = n + + return total, reader.InputConsumed(), zero, nil + } + + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "unsupported payload type"} +} diff --git a/format/pack/ingest/entry.go b/format/pack/ingest/entry.go new file mode 100644 index 00000000..f6c9074e --- /dev/null +++ b/format/pack/ingest/entry.go @@ -0,0 +1,88 @@ +package ingest + +import ( + "fmt" + + packfmt "codeberg.org/lindenii/furgit/format/pack" + "codeberg.org/lindenii/furgit/objecttype" +) + +// scanOneEntry scans one pack entry from stream and appends one record. +func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) { + state.stream.beginEntryCRC() + + record, err := parseEntryPrefix(state, startOffset) + if err != nil { + return 0, err + } + + contentLen, consumedInput, oid, err := drainEntryPayload(state, record) + if err != nil { + return 0, err + } + + if contentLen != record.declaredSize { + return 0, &ErrMalformedPackEntry{ + Offset: startOffset, + Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize), + } + } + + endOffset := startOffset + uint64(record.headerLen) + consumedInput + if endOffset > state.stream.consumed { + return 0, &ErrMalformedPackEntry{ + Offset: startOffset, + Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed), + } + } + + record.packedLen = endOffset - startOffset + + record.dataOffset = startOffset + uint64(record.headerLen) + if record.packedLen < uint64(record.headerLen) { + return 0, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative payload span"} + } + + crc, err := state.stream.endEntryCRC() + if err != nil { + return 0, err + } + + record.crc32 = crc + + if packfmt.IsBaseObjectType(record.packedType) { + record.objectID = oid + record.realType = record.packedType + record.resolved = true + } + + recordIdx := len(state.records) + state.records = append(state.records, record) + + state.offsetToRecord[record.offset] = recordIdx + if record.resolved { + state.objectToRecord[record.objectID] = recordIdx + } + + switch record.packedType { + case objecttype.TypeOfsDelta: + state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{ + baseOffset: record.baseOffset, + recordIdx: recordIdx, + }) + case objecttype.TypeRefDelta: + state.refDeltas = append(state.refDeltas, refDeltaRef{ + baseObject: record.baseObject, + recordIdx: recordIdx, + }) + case objecttype.TypeInvalid, + objecttype.TypeCommit, + objecttype.TypeTree, + objecttype.TypeBlob, + objecttype.TypeTag, + objecttype.TypeFuture: + default: + } + + return endOffset, nil +} diff --git a/format/pack/ingest/entry_header.go b/format/pack/ingest/entry_header.go new file mode 100644 index 00000000..54f63fac --- /dev/null +++ b/format/pack/ingest/entry_header.go @@ -0,0 +1,33 @@ +package ingest + +import ( + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objecttype" +) + +// encodePackEntryHeader encodes one non-delta packed entry header. +func encodePackEntryHeader(ty objecttype.Type, size int64) []byte { + var out [16]byte + + n := 0 + + s, err := intconv.Int64ToUint64(size) + if err != nil { + panic(err) + } + + c := (uint8(ty) << 4) | byte(s&0x0f) + + s >>= 4 + for s != 0 { + out[n] = c | 0x80 + n++ + c = byte(s & 0x7f) + s >>= 7 + } + + out[n] = c + n++ + + return append([]byte(nil), out[:n]...) +} diff --git a/format/pack/ingest/entry_prefix.go b/format/pack/ingest/entry_prefix.go new file mode 100644 index 00000000..6ffb1c56 --- /dev/null +++ b/format/pack/ingest/entry_prefix.go @@ -0,0 +1,95 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// parseEntryPrefix parses one entry prefix from stream. +func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) { + var record objectRecord + + record.offset = startOffset + + first, err := state.stream.ReadByte() + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)} + } + + record.packedType = objecttype.Type((first >> 4) & 0x07) + size := int64(first & 0x0f) + headerLen := uint32(1) + shift := uint(4) + b := first + + for b&0x80 != 0 { + b, err = state.stream.ReadByte() + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)} + } + + headerLen++ + size |= int64(b&0x7f) << shift + shift += 7 + } + + if size < 0 { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative declared size"} + } + + record.declaredSize = size + + switch record.packedType { + case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag: + case objecttype.TypeRefDelta: + baseRaw := make([]byte, state.algo.Size()) + + err := state.stream.readFull(baseRaw) + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)} + } + + baseID, err := objectid.FromBytes(state.algo, baseRaw) + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)} + } + + record.baseObject = baseID + + baseRawLen, err := intconv.IntToUint32(len(baseRaw)) + if err != nil { + return record, err + } + + headerLen += baseRawLen + case objecttype.TypeOfsDelta: + dist, consumed, err := readOfsDistanceFromStream(state.stream) + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: err.Error()} + } + + if startOffset <= dist { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "ofs base offset out of bounds"} + } + + record.baseOffset = startOffset - dist + + consumedUint32, err := intconv.IntToUint32(consumed) + if err != nil { + return record, err + } + + headerLen += consumedUint32 + case objecttype.TypeInvalid, objecttype.TypeFuture: + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} + default: + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} + } + + record.headerLen = headerLen + + return record, nil +} diff --git a/format/pack/ingest/errors.go b/format/pack/ingest/errors.go index 1fc321e7..cd65d3cf 100644 --- a/format/pack/ingest/errors.go +++ b/format/pack/ingest/errors.go @@ -1,6 +1,9 @@ package ingest -import "fmt" +import ( + "errors" + "fmt" +) // ErrInvalidPackHeader reports an invalid or unsupported pack header. type ErrInvalidPackHeader struct { @@ -61,3 +64,5 @@ type ErrDestinationWrite struct { func (err *ErrDestinationWrite) Error() string { return fmt.Sprintf("format/pack/ingest: destination write failure: %s", err.Op) } + +var errExternalThinBase = errors.New("format/pack/ingest: external thin base required") diff --git a/format/pack/ingest/file_section_writer.go b/format/pack/ingest/file_section_writer.go new file mode 100644 index 00000000..fa28c1a9 --- /dev/null +++ b/format/pack/ingest/file_section_writer.go @@ -0,0 +1,22 @@ +package ingest + +import "os" + +// fileSectionWriter writes sequentially to file via WriteAt at one base offset. +type fileSectionWriter struct { + file *os.File + off int64 + pos int64 +} + +// Write writes src at current section position. +func (writer *fileSectionWriter) Write(src []byte) (int, error) { + if len(src) == 0 { + return 0, nil + } + + n, err := writer.file.WriteAt(src, writer.off+writer.pos) + writer.pos += int64(n) + + return n, err +} diff --git a/format/pack/ingest/fill.go b/format/pack/ingest/fill.go new file mode 100644 index 00000000..76fc6e60 --- /dev/null +++ b/format/pack/ingest/fill.go @@ -0,0 +1,44 @@ +package ingest + +import ( + "errors" + "fmt" + "io" +) + +// fill ensures at least min unread bytes are available in receiver's buffer. +func (scanner *streamScanner) fill(minLen int) error { + if minLen <= 0 { + return nil + } + + if minLen > len(scanner.buf) { + return fmt.Errorf("format/pack/ingest: fill(%d) exceeds scanner buffer", minLen) + } + + for scanner.n-scanner.off < minLen { + err := scanner.flushConsumedPrefix() + if err != nil { + return err + } + + readN, err := scanner.src.Read(scanner.buf[scanner.n:]) + if readN > 0 { + scanner.n += readN + } + + if err != nil { + if errors.Is(err, io.EOF) && scanner.n-scanner.off >= minLen { + return nil + } + + return err + } + + if readN == 0 { + return io.ErrNoProgress + } + } + + return nil +} diff --git a/format/pack/ingest/flush.go b/format/pack/ingest/flush.go new file mode 100644 index 00000000..4742ead1 --- /dev/null +++ b/format/pack/ingest/flush.go @@ -0,0 +1,37 @@ +package ingest + +import "fmt" + +// flush writes all consumed-but-unflushed bytes to destination pack file. +func (scanner *streamScanner) flush() error { + return scanner.flushConsumedPrefix() +} + +// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread +// bytes to the start of buffer. +func (scanner *streamScanner) flushConsumedPrefix() error { + if scanner.off == 0 { + return nil + } + + written := 0 + for written < scanner.off { + n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off]) + if err != nil { + return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} + } + + if n == 0 { + return &ErrDestinationWrite{Op: "write pack: short write"} + } + + written += n + } + + unread := scanner.n - scanner.off + copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n]) + scanner.off = 0 + scanner.n = unread + + return nil +} diff --git a/format/pack/ingest/hash.go b/format/pack/ingest/hash.go new file mode 100644 index 00000000..8d4dac21 --- /dev/null +++ b/format/pack/ingest/hash.go @@ -0,0 +1,27 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objectheader" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// hashCanonicalObject hashes canonical object bytes (header+content). +func hashCanonicalObject(algo objectid.Algorithm, ty objecttype.Type, content []byte) (objectid.ObjectID, error) { + header, ok := objectheader.Encode(ty, int64(len(content))) + if !ok { + return objectid.ObjectID{}, fmt.Errorf("format/pack/ingest: encode object header for type %d", ty) + } + + hashImpl, err := algo.New() + if err != nil { + return objectid.ObjectID{}, err + } + + _, _ = hashImpl.Write(header) + _, _ = hashImpl.Write(content) + + return objectid.FromBytes(algo, hashImpl.Sum(nil)) +} diff --git a/format/pack/ingest/header.go b/format/pack/ingest/header.go new file mode 100644 index 00000000..88663760 --- /dev/null +++ b/format/pack/ingest/header.go @@ -0,0 +1,34 @@ +package ingest + +import ( + "encoding/binary" + "fmt" + + "codeberg.org/lindenii/furgit/format/pack" +) + +// readAndValidatePackHeader reads and validates PACK header from the stream. +func readAndValidatePackHeader(state *ingestState) error { + var hdr [12]byte + + err := state.stream.readFull(hdr[:]) + if err != nil { + return &ErrInvalidPackHeader{Reason: fmt.Sprintf("read header: %v", err)} + } + + if binary.BigEndian.Uint32(hdr[:4]) != pack.Signature { + return &ErrInvalidPackHeader{Reason: "signature mismatch"} + } + + version := binary.BigEndian.Uint32(hdr[4:8]) + if !pack.VersionSupported(version) { + return &ErrInvalidPackHeader{Reason: fmt.Sprintf("unsupported version %d", version)} + } + + state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12]) + if state.objectCountHeader == 0 { + return &ErrInvalidPackHeader{Reason: "zero objects"} + } + + return nil +} diff --git a/format/pack/ingest/record_content.go b/format/pack/ingest/record_content.go new file mode 100644 index 00000000..1d4f27c3 --- /dev/null +++ b/format/pack/ingest/record_content.go @@ -0,0 +1,30 @@ +package ingest + +import ( + "fmt" + + packfmt "codeberg.org/lindenii/furgit/format/pack" + "codeberg.org/lindenii/furgit/objecttype" +) + +// readBaseRecordContent reads canonical base content for one non-delta record. +func readBaseRecordContent(state *ingestState, idx int) (objecttype.Type, []byte, error) { + record := state.records[idx] + if !packfmt.IsBaseObjectType(record.packedType) { + return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record %d is not a base object", idx) + } + + content, err := inflateRecordPayload(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + if int64(len(content)) != record.declaredSize { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("base content size mismatch got %d want %d", len(content), record.declaredSize), + } + } + + return record.packedType, content, nil +} diff --git a/format/pack/ingest/record_delta.go b/format/pack/ingest/record_delta.go new file mode 100644 index 00000000..c9febcc3 --- /dev/null +++ b/format/pack/ingest/record_delta.go @@ -0,0 +1,59 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objecttype" +) + +// applyDeltaRecord applies one delta record onto base content. +func applyDeltaRecord(state *ingestState, idx int, baseType objecttype.Type, baseContent []byte) (objecttype.Type, []byte, error) { + record := state.records[idx] + if record.packedType != objecttype.TypeOfsDelta && record.packedType != objecttype.TypeRefDelta { + return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record %d is not a delta record", idx) + } + + deltaPayload, err := inflateRecordPayload(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + if int64(len(deltaPayload)) != record.declaredSize { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("delta payload size mismatch got %d want %d", len(deltaPayload), record.declaredSize), + } + } + + srcSize, dstSize, err := readDeltaHeaderSizes(deltaPayload) + if err != nil { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("read delta header: %v", err), + } + } + + if srcSize != len(baseContent) { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("delta source size mismatch got %d want %d", srcSize, len(baseContent)), + } + } + + content, err := deltaapply.Apply(baseContent, deltaPayload) + if err != nil { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("apply delta: %v", err), + } + } + + if len(content) != dstSize { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("delta result size mismatch got %d want %d", len(content), dstSize), + } + } + + return baseType, content, nil +} diff --git a/format/pack/ingest/record_inflate.go b/format/pack/ingest/record_inflate.go new file mode 100644 index 00000000..a8d68c07 --- /dev/null +++ b/format/pack/ingest/record_inflate.go @@ -0,0 +1,46 @@ +package ingest + +import ( + "compress/zlib" + "fmt" + "io" + + "codeberg.org/lindenii/furgit/internal/intconv" +) + +// inflateRecordPayload inflates one record's zlib payload from pack file. +func inflateRecordPayload(state *ingestState, idx int) ([]byte, error) { + record := state.records[idx] + if record.packedLen < uint64(record.headerLen) { + return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: "entry packed span underflow"} + } + + compressedOffset := record.offset + uint64(record.headerLen) + compressedLen := record.packedLen - uint64(record.headerLen) + + compressedOffsetInt64, err := intconv.Uint64ToInt64(compressedOffset) + if err != nil { + return nil, err + } + + compressedLenInt64, err := intconv.Uint64ToInt64(compressedLen) + if err != nil { + return nil, err + } + + section := io.NewSectionReader(state.packFile, compressedOffsetInt64, compressedLenInt64) + + reader, err := zlib.NewReader(section) + if err != nil { + return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open payload zlib: %v", err)} + } + + defer func() { _ = reader.Close() }() + + out, err := io.ReadAll(reader) + if err != nil { + return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate payload: %v", err)} + } + + return out, nil +} diff --git a/format/pack/ingest/record_resolve.go b/format/pack/ingest/record_resolve.go new file mode 100644 index 00000000..6ef5e857 --- /dev/null +++ b/format/pack/ingest/record_resolve.go @@ -0,0 +1,117 @@ +package ingest + +import ( + "fmt" + + packfmt "codeberg.org/lindenii/furgit/format/pack" + "codeberg.org/lindenii/furgit/objecttype" +) + +// resolveRecord resolves one record and returns canonical type/content. +func resolveRecord(state *ingestState, idx int, visiting map[int]struct{}) (objecttype.Type, []byte, error) { + if idx < 0 || idx >= len(state.records) { + return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record index out of bounds") + } + + if _, ok := visiting[idx]; ok { + return objecttype.TypeInvalid, nil, &ErrDeltaCycle{Offset: state.records[idx].offset} + } + + visiting[idx] = struct{}{} + defer delete(visiting, idx) + + record := &state.records[idx] + if ty, content, ok := state.baseCache.get(idx); ok { + return ty, content, nil + } + + if packfmt.IsBaseObjectType(record.packedType) { + ty, content, err := readBaseRecordContent(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + if record.resolved { + state.baseCache.add(idx, record.realType, content) + + return record.realType, content, nil + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + record.objectID = id + record.realType = ty + record.resolved = true + state.objectToRecord[id] = idx + state.baseCache.add(idx, ty, content) + + return ty, content, nil + } + + var ( + baseType objecttype.Type + baseContent []byte + err error + ) + switch record.packedType { + case objecttype.TypeOfsDelta: + baseIdx, ok := state.offsetToRecord[record.baseOffset] + if !ok { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: "missing ofs-delta base entry", + } + } + + baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + case objecttype.TypeRefDelta: + baseIdx, ok := state.objectToRecord[record.baseObject] + if ok { + baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + } else { + return objecttype.TypeInvalid, nil, errExternalThinBase + } + case objecttype.TypeInvalid, + objecttype.TypeCommit, + objecttype.TypeTree, + objecttype.TypeBlob, + objecttype.TypeTag, + objecttype.TypeFuture: + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: "unsupported delta type", + } + default: + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: "unsupported delta type", + } + } + + ty, content, err := applyDeltaRecord(state, idx, baseType, baseContent) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + record.objectID = id + record.realType = ty + record.resolved = true + state.objectToRecord[id] = idx + state.baseCache.add(idx, ty, content) + + return ty, content, nil +} diff --git a/format/pack/ingest/resolve.go b/format/pack/ingest/resolve.go deleted file mode 100644 index 7426e52d..00000000 --- a/format/pack/ingest/resolve.go +++ /dev/null @@ -1,320 +0,0 @@ -package ingest - -import ( - "bytes" - "errors" - "fmt" - "io" - "slices" - - deltaapply "codeberg.org/lindenii/furgit/format/delta/apply" - packfmt "codeberg.org/lindenii/furgit/format/pack" - "codeberg.org/lindenii/furgit/internal/compress/zlib" - "codeberg.org/lindenii/furgit/internal/intconv" - "codeberg.org/lindenii/furgit/objectheader" - "codeberg.org/lindenii/furgit/objectid" - "codeberg.org/lindenii/furgit/objecttype" -) - -var errExternalThinBase = errors.New("format/pack/ingest: external thin base required") - -// resolveAll resolves all delta records and finalizes ObjectID/RealType for every record. -func resolveAll(state *ingestState) error { - state.unresolvedRefDeltas = state.unresolvedRefDeltas[:0] - - for idx := range state.records { - if state.records[idx].resolved { - continue - } - - visiting := make(map[int]struct{}) - - ty, content, err := resolveRecord(state, idx, visiting) - if err != nil { - if errors.Is(err, errExternalThinBase) { - state.unresolvedRefDeltas = append(state.unresolvedRefDeltas, idx) - - continue - } - - return err - } - - id, err := hashCanonicalObject(state.algo, ty, content) - if err != nil { - return err - } - - record := &state.records[idx] - record.realType = ty - record.objectID = id - record.resolved = true - state.objectToRecord[id] = idx - state.baseCache.add(idx, ty, content) - } - - return nil -} - -// resolveRecord resolves one record and returns canonical type/content. -func resolveRecord(state *ingestState, idx int, visiting map[int]struct{}) (objecttype.Type, []byte, error) { - if idx < 0 || idx >= len(state.records) { - return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record index out of bounds") - } - - if _, ok := visiting[idx]; ok { - return objecttype.TypeInvalid, nil, &ErrDeltaCycle{Offset: state.records[idx].offset} - } - - visiting[idx] = struct{}{} - defer delete(visiting, idx) - - record := &state.records[idx] - if ty, content, ok := state.baseCache.get(idx); ok { - return ty, content, nil - } - - if packfmt.IsBaseObjectType(record.packedType) { - ty, content, err := readBaseRecordContent(state, idx) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - - if record.resolved { - state.baseCache.add(idx, record.realType, content) - - return record.realType, content, nil - } - - id, err := hashCanonicalObject(state.algo, ty, content) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - - record.objectID = id - record.realType = ty - record.resolved = true - state.objectToRecord[id] = idx - state.baseCache.add(idx, ty, content) - - return ty, content, nil - } - - var ( - baseType objecttype.Type - baseContent []byte - err error - ) - switch record.packedType { - case objecttype.TypeOfsDelta: - baseIdx, ok := state.offsetToRecord[record.baseOffset] - if !ok { - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: "missing ofs-delta base entry", - } - } - - baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - case objecttype.TypeRefDelta: - baseIdx, ok := state.objectToRecord[record.baseObject] - if ok { - baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - } else { - return objecttype.TypeInvalid, nil, errExternalThinBase - } - case objecttype.TypeInvalid, - objecttype.TypeCommit, - objecttype.TypeTree, - objecttype.TypeBlob, - objecttype.TypeTag, - objecttype.TypeFuture: - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: "unsupported delta type", - } - default: - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: "unsupported delta type", - } - } - - ty, content, err := applyDeltaRecord(state, idx, baseType, baseContent) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - - id, err := hashCanonicalObject(state.algo, ty, content) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - - record.objectID = id - record.realType = ty - record.resolved = true - state.objectToRecord[id] = idx - state.baseCache.add(idx, ty, content) - - return ty, content, nil -} - -// readBaseRecordContent reads canonical base content for one non-delta record. -func readBaseRecordContent(state *ingestState, idx int) (objecttype.Type, []byte, error) { - record := state.records[idx] - if !packfmt.IsBaseObjectType(record.packedType) { - return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record %d is not a base object", idx) - } - - content, err := inflateRecordPayload(state, idx) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - - if int64(len(content)) != record.declaredSize { - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: fmt.Sprintf("base content size mismatch got %d want %d", len(content), record.declaredSize), - } - } - - return record.packedType, content, nil -} - -// applyDeltaRecord applies one delta record onto base content. -func applyDeltaRecord(state *ingestState, idx int, baseType objecttype.Type, baseContent []byte) (objecttype.Type, []byte, error) { - record := state.records[idx] - if record.packedType != objecttype.TypeOfsDelta && record.packedType != objecttype.TypeRefDelta { - return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record %d is not a delta record", idx) - } - - deltaPayload, err := inflateRecordPayload(state, idx) - if err != nil { - return objecttype.TypeInvalid, nil, err - } - - if int64(len(deltaPayload)) != record.declaredSize { - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: fmt.Sprintf("delta payload size mismatch got %d want %d", len(deltaPayload), record.declaredSize), - } - } - - srcSize, dstSize, err := readDeltaHeaderSizes(deltaPayload) - if err != nil { - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: fmt.Sprintf("read delta header: %v", err), - } - } - - if srcSize != len(baseContent) { - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: fmt.Sprintf("delta source size mismatch got %d want %d", srcSize, len(baseContent)), - } - } - - content, err := deltaapply.Apply(baseContent, deltaPayload) - if err != nil { - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: fmt.Sprintf("apply delta: %v", err), - } - } - - if len(content) != dstSize { - return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ - Offset: record.offset, - Reason: fmt.Sprintf("delta result size mismatch got %d want %d", len(content), dstSize), - } - } - - return baseType, content, nil -} - -// inflateRecordPayload inflates one record's zlib payload from pack file. -func inflateRecordPayload(state *ingestState, idx int) ([]byte, error) { - record := state.records[idx] - if record.packedLen < uint64(record.headerLen) { - return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: "entry packed span underflow"} - } - - compressedOffset := record.offset + uint64(record.headerLen) - compressedLen := record.packedLen - uint64(record.headerLen) - - compressedOffsetInt64, err := intconv.Uint64ToInt64(compressedOffset) - if err != nil { - return nil, err - } - - compressedLenInt64, err := intconv.Uint64ToInt64(compressedLen) - if err != nil { - return nil, err - } - - section := io.NewSectionReader(state.packFile, compressedOffsetInt64, compressedLenInt64) - - reader, err := zlib.NewReader(section) - if err != nil { - return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open payload zlib: %v", err)} - } - - defer func() { _ = reader.Close() }() - - out, err := io.ReadAll(reader) - if err != nil { - return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate payload: %v", err)} - } - - return out, nil -} - -// hashCanonicalObject hashes canonical object bytes (header+content). -func hashCanonicalObject(algo objectid.Algorithm, ty objecttype.Type, content []byte) (objectid.ObjectID, error) { - header, ok := objectheader.Encode(ty, int64(len(content))) - if !ok { - return objectid.ObjectID{}, fmt.Errorf("format/pack/ingest: encode object header for type %d", ty) - } - - hashImpl, err := algo.New() - if err != nil { - return objectid.ObjectID{}, err - } - - _, _ = hashImpl.Write(header) - _, _ = hashImpl.Write(content) - - return objectid.FromBytes(algo, hashImpl.Sum(nil)) -} - -// unresolvedThinBaseIDs returns sorted unique unresolved ref base IDs. -func unresolvedThinBaseIDs(state *ingestState) []objectid.ObjectID { - seen := make(map[objectid.ObjectID]struct{}) - - for _, idx := range state.unresolvedRefDeltas { - record := state.records[idx] - if record.packedType != objecttype.TypeRefDelta { - continue - } - - seen[record.baseObject] = struct{}{} - } - - out := make([]objectid.ObjectID, 0, len(seen)) - for id := range seen { - out = append(out, id) - } - - slices.SortFunc(out, func(a, b objectid.ObjectID) int { - return bytes.Compare(a.RawBytes(), b.RawBytes()) - }) - - return out -} diff --git a/format/pack/ingest/resolve_all.go b/format/pack/ingest/resolve_all.go new file mode 100644 index 00000000..992d87ae --- /dev/null +++ b/format/pack/ingest/resolve_all.go @@ -0,0 +1,41 @@ +package ingest + +import "errors" + +// resolveAll resolves all delta records and finalizes ObjectID/RealType for every record. +func resolveAll(state *ingestState) error { + state.unresolvedRefDeltas = state.unresolvedRefDeltas[:0] + + for idx := range state.records { + if state.records[idx].resolved { + continue + } + + visiting := make(map[int]struct{}) + + ty, content, err := resolveRecord(state, idx, visiting) + if err != nil { + if errors.Is(err, errExternalThinBase) { + state.unresolvedRefDeltas = append(state.unresolvedRefDeltas, idx) + + continue + } + + return err + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return err + } + + record := &state.records[idx] + record.realType = ty + record.objectID = id + record.resolved = true + state.objectToRecord[id] = idx + state.baseCache.add(idx, ty, content) + } + + return nil +} diff --git a/format/pack/ingest/rewrite_header_trailer.go b/format/pack/ingest/rewrite_header_trailer.go new file mode 100644 index 00000000..c5f858de --- /dev/null +++ b/format/pack/ingest/rewrite_header_trailer.go @@ -0,0 +1,89 @@ +package ingest + +import ( + "encoding/binary" + "io" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objectid" +) + +// rewritePackHeaderAndTrailer rewrites object count and trailer hash using ReadAt/WriteAt. +func rewritePackHeaderAndTrailer(state *ingestState) error { + var countRaw [4]byte + + recordCountUint32, err := intconv.IntToUint32(len(state.records)) + if err != nil { + return err + } + + binary.BigEndian.PutUint32(countRaw[:], recordCountUint32) + + _, err = state.packFile.WriteAt(countRaw[:], 8) + if err != nil { + return err + } + + info, err := state.packFile.Stat() + if err != nil { + return err + } + + endWithoutTrailer := info.Size() + + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + var ( + buf [128 << 10]byte + pos int64 + ) + for pos < endWithoutTrailer { + want := int64(len(buf)) + + remaining := endWithoutTrailer - pos + if remaining < want { + want = remaining + } + + n, err := state.packFile.ReadAt(buf[:want], pos) + if err != nil && err != io.EOF { + return err + } + + if n == 0 { + return io.ErrUnexpectedEOF + } + + _, _ = hashImpl.Write(buf[:n]) + pos += int64(n) + } + + sum := hashImpl.Sum(nil) + + _, err = state.packFile.WriteAt(sum, endWithoutTrailer) + if err != nil { + return err + } + + packHash, err := objectid.FromBytes(state.algo, sum) + if err != nil { + return err + } + + state.packHash = packHash + state.objectCountHeader = recordCountUint32 + + sumLenInt64 := int64(len(sum)) + + newConsumed, err := intconv.Int64ToUint64(endWithoutTrailer + sumLenInt64) + if err != nil { + return err + } + + state.stream.consumed = newConsumed + + return nil +} diff --git a/format/pack/ingest/scan.go b/format/pack/ingest/scan.go new file mode 100644 index 00000000..36b7f75e --- /dev/null +++ b/format/pack/ingest/scan.go @@ -0,0 +1,60 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objectid" +) + +// streamPackAndScan copies src into temp .pack while scanning packed entries. +func streamPackAndScan(state *ingestState) error { + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + state.stream = newStreamScanner( + state.src, + state.packFile, + hashImpl, + state.algo.Size(), + ) + + err = readAndValidatePackHeader(state) + if err != nil { + return err + } + + state.records = make([]objectRecord, 0, state.objectCountHeader) + state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader) + state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader) + + for range state.objectCountHeader { + nextOffset, err := scanOneEntry(state, state.stream.consumed) + if err != nil { + return err + } + + if nextOffset != state.stream.consumed { + return fmt.Errorf("format/pack/ingest: internal stream offset mismatch") + } + } + + err = state.stream.finishAndFlushTrailer() + if err != nil { + return err + } + + if len(state.stream.packTrailer) != state.algo.Size() { + return fmt.Errorf("format/pack/ingest: invalid trailer size") + } + + packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer) + if err != nil { + return err + } + + state.packHash = packHash + + return state.stream.flush() +} diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go index 72f6c5a4..66a6fc5f 100644 --- a/format/pack/ingest/stream.go +++ b/format/pack/ingest/stream.go @@ -1,11 +1,8 @@ package ingest import ( - "bytes" "errors" - "fmt" "hash" - "hash/crc32" "io" "os" ) @@ -106,71 +103,6 @@ func (scanner *streamScanner) ReadByte() (byte, error) { return b, nil } -// fill ensures at least min unread bytes are available in receiver's buffer. -func (scanner *streamScanner) fill(minLen int) error { - if minLen <= 0 { - return nil - } - - if minLen > len(scanner.buf) { - return fmt.Errorf("format/pack/ingest: fill(%d) exceeds scanner buffer", minLen) - } - - for scanner.n-scanner.off < minLen { - err := scanner.flushConsumedPrefix() - if err != nil { - return err - } - - readN, err := scanner.src.Read(scanner.buf[scanner.n:]) - if readN > 0 { - scanner.n += readN - } - - if err != nil { - if errors.Is(err, io.EOF) && scanner.n-scanner.off >= minLen { - return nil - } - - return err - } - - if readN == 0 { - return io.ErrNoProgress - } - } - - return nil -} - -// use consumes n unread bytes and updates accounting/checksum state. -func (scanner *streamScanner) use(n int) error { - if n < 0 || n > scanner.n-scanner.off { - return fmt.Errorf("format/pack/ingest: invalid consume length %d", n) - } - - if n == 0 { - return nil - } - - chunk := scanner.buf[scanner.off : scanner.off+n] - if scanner.hashEnabled { - _, err := scanner.hash.Write(chunk) - if err != nil { - return err - } - } - - if scanner.inEntryCRC { - scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk) - } - - scanner.off += n - scanner.consumed += uint64(n) - - return nil -} - // readFull reads exactly len(dst) bytes through receiver. func (scanner *streamScanner) readFull(dst []byte) error { _, err := io.ReadFull(scanner, dst) @@ -180,93 +112,3 @@ func (scanner *streamScanner) readFull(dst []byte) error { return nil } - -// flush writes all consumed-but-unflushed bytes to destination pack file. -func (scanner *streamScanner) flush() error { - return scanner.flushConsumedPrefix() -} - -// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum, -// and ensures no trailing garbage remains in stream. -func (scanner *streamScanner) finishAndFlushTrailer() error { - if scanner.hashSize <= 0 { - return fmt.Errorf("format/pack/ingest: invalid hash size") - } - - trailer := make([]byte, scanner.hashSize) - - scanner.hashEnabled = false - - err := scanner.readFull(trailer) - if err != nil { - return &ErrPackTrailerMismatch{} - } - - scanner.packTrailer = append(scanner.packTrailer[:0], trailer...) - - var probe [1]byte - - n, err := scanner.Read(probe[:]) - if n > 0 || err == nil { - return fmt.Errorf("format/pack/ingest: pack has trailing garbage") - } - - if !errors.Is(err, io.EOF) { - return err - } - - computed := scanner.hash.Sum(nil) - if !bytes.Equal(computed, trailer) { - return &ErrPackTrailerMismatch{} - } - - return nil -} - -// beginEntryCRC starts inline CRC accumulation for one packed entry. -func (scanner *streamScanner) beginEntryCRC() { - scanner.entryCRC = 0 - scanner.inEntryCRC = true -} - -// endEntryCRC finishes inline CRC accumulation for one packed entry. -func (scanner *streamScanner) endEntryCRC() (uint32, error) { - if !scanner.inEntryCRC { - return 0, fmt.Errorf("format/pack/ingest: entry CRC not started") - } - - crc := scanner.entryCRC - scanner.entryCRC = 0 - scanner.inEntryCRC = false - - return crc, nil -} - -// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread -// bytes to the start of buffer. -func (scanner *streamScanner) flushConsumedPrefix() error { - if scanner.off == 0 { - return nil - } - - written := 0 - for written < scanner.off { - n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off]) - if err != nil { - return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} - } - - if n == 0 { - return &ErrDestinationWrite{Op: "write pack: short write"} - } - - written += n - } - - unread := scanner.n - scanner.off - copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n]) - scanner.off = 0 - scanner.n = unread - - return nil -} diff --git a/format/pack/ingest/stream_scan.go b/format/pack/ingest/stream_scan.go deleted file mode 100644 index 56ee5d55..00000000 --- a/format/pack/ingest/stream_scan.go +++ /dev/null @@ -1,366 +0,0 @@ -package ingest - -import ( - "encoding/binary" - "fmt" - "io" - - deltaapply "codeberg.org/lindenii/furgit/format/delta/apply" - packfmt "codeberg.org/lindenii/furgit/format/pack" - "codeberg.org/lindenii/furgit/internal/compress/zlib" - "codeberg.org/lindenii/furgit/internal/intconv" - "codeberg.org/lindenii/furgit/objectheader" - "codeberg.org/lindenii/furgit/objectid" - "codeberg.org/lindenii/furgit/objecttype" -) - -// streamPackAndScan copies src into temp .pack while scanning packed entries. -func streamPackAndScan(state *ingestState) error { - hashImpl, err := state.algo.New() - if err != nil { - return err - } - - state.stream = newStreamScanner( - state.src, - state.packFile, - hashImpl, - state.algo.Size(), - ) - - err = readAndValidatePackHeader(state) - if err != nil { - return err - } - - state.records = make([]objectRecord, 0, state.objectCountHeader) - state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader) - state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader) - - for range state.objectCountHeader { - nextOffset, err := scanOneEntry(state, state.stream.consumed) - if err != nil { - return err - } - - if nextOffset != state.stream.consumed { - return fmt.Errorf("format/pack/ingest: internal stream offset mismatch") - } - } - - err = state.stream.finishAndFlushTrailer() - if err != nil { - return err - } - - if len(state.stream.packTrailer) != state.algo.Size() { - return fmt.Errorf("format/pack/ingest: invalid trailer size") - } - - packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer) - if err != nil { - return err - } - - state.packHash = packHash - - return state.stream.flush() -} - -// readAndValidatePackHeader reads and validates PACK header from the stream. -func readAndValidatePackHeader(state *ingestState) error { - var hdr [12]byte - - err := state.stream.readFull(hdr[:]) - if err != nil { - return &ErrInvalidPackHeader{Reason: fmt.Sprintf("read header: %v", err)} - } - - if binary.BigEndian.Uint32(hdr[:4]) != packfmt.Signature { - return &ErrInvalidPackHeader{Reason: "signature mismatch"} - } - - version := binary.BigEndian.Uint32(hdr[4:8]) - if !packfmt.VersionSupported(version) { - return &ErrInvalidPackHeader{Reason: fmt.Sprintf("unsupported version %d", version)} - } - - state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12]) - if state.objectCountHeader == 0 { - return &ErrInvalidPackHeader{Reason: "zero objects"} - } - - return nil -} - -// scanOneEntry scans one pack entry from stream and appends one record. -func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) { - state.stream.beginEntryCRC() - - record, err := parseEntryPrefix(state, startOffset) - if err != nil { - return 0, err - } - - contentLen, consumedInput, oid, err := drainEntryPayload(state, record) - if err != nil { - return 0, err - } - - if contentLen != record.declaredSize { - return 0, &ErrMalformedPackEntry{ - Offset: startOffset, - Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize), - } - } - - endOffset := startOffset + uint64(record.headerLen) + consumedInput - if endOffset > state.stream.consumed { - return 0, &ErrMalformedPackEntry{ - Offset: startOffset, - Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed), - } - } - - record.packedLen = endOffset - startOffset - - record.dataOffset = startOffset + uint64(record.headerLen) - if record.packedLen < uint64(record.headerLen) { - return 0, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative payload span"} - } - - crc, err := state.stream.endEntryCRC() - if err != nil { - return 0, err - } - - record.crc32 = crc - - if packfmt.IsBaseObjectType(record.packedType) { - record.objectID = oid - record.realType = record.packedType - record.resolved = true - } - - recordIdx := len(state.records) - state.records = append(state.records, record) - - state.offsetToRecord[record.offset] = recordIdx - if record.resolved { - state.objectToRecord[record.objectID] = recordIdx - } - - switch record.packedType { - case objecttype.TypeOfsDelta: - state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{ - baseOffset: record.baseOffset, - recordIdx: recordIdx, - }) - case objecttype.TypeRefDelta: - state.refDeltas = append(state.refDeltas, refDeltaRef{ - baseObject: record.baseObject, - recordIdx: recordIdx, - }) - case objecttype.TypeInvalid, - objecttype.TypeCommit, - objecttype.TypeTree, - objecttype.TypeBlob, - objecttype.TypeTag, - objecttype.TypeFuture: - default: - } - - return endOffset, nil -} - -// parseEntryPrefix parses one entry prefix from stream. -func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) { - var record objectRecord - - record.offset = startOffset - - first, err := state.stream.ReadByte() - if err != nil { - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)} - } - - record.packedType = objecttype.Type((first >> 4) & 0x07) - size := int64(first & 0x0f) - headerLen := uint32(1) - shift := uint(4) - b := first - - for b&0x80 != 0 { - b, err = state.stream.ReadByte() - if err != nil { - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)} - } - - headerLen++ - size |= int64(b&0x7f) << shift - shift += 7 - } - - if size < 0 { - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative declared size"} - } - - record.declaredSize = size - - switch record.packedType { - case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag: - case objecttype.TypeRefDelta: - baseRaw := make([]byte, state.algo.Size()) - - err := state.stream.readFull(baseRaw) - if err != nil { - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)} - } - - baseID, err := objectid.FromBytes(state.algo, baseRaw) - if err != nil { - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)} - } - - record.baseObject = baseID - - baseRawLen, err := intconv.IntToUint32(len(baseRaw)) - if err != nil { - return record, err - } - - headerLen += baseRawLen - case objecttype.TypeOfsDelta: - dist, consumed, err := readOfsDistanceFromStream(state.stream) - if err != nil { - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: err.Error()} - } - - if startOffset <= dist { - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "ofs base offset out of bounds"} - } - - record.baseOffset = startOffset - dist - - consumedUint32, err := intconv.IntToUint32(consumed) - if err != nil { - return record, err - } - - headerLen += consumedUint32 - case objecttype.TypeInvalid, objecttype.TypeFuture: - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} - default: - return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} - } - - record.headerLen = headerLen - - return record, nil -} - -// drainEntryPayload inflates one entry payload from stream and returns -// (inflatedLength, consumedInput, oidForBaseEntry). -func drainEntryPayload(state *ingestState, record objectRecord) (int64, uint64, objectid.ObjectID, error) { - var zero objectid.ObjectID - - reader, err := zlib.NewReader(state.stream) - if err != nil { - return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)} - } - - defer func() { _ = reader.Close() }() - - var total int64 - - if packfmt.IsBaseObjectType(record.packedType) { - header, ok := objectheader.Encode(record.packedType, record.declaredSize) - if !ok { - return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "encode object header"} - } - - hashImpl, err := state.algo.New() - if err != nil { - return 0, 0, zero, err - } - - _, _ = hashImpl.Write(header) - - n, err := io.Copy(hashImpl, reader) - if err != nil { - return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)} - } - - total = n - - oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil)) - if err != nil { - return 0, 0, zero, err - } - - return total, reader.InputConsumed(), oid, nil - } - - if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta { - n, err := io.Copy(io.Discard, reader) - if err != nil { - return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)} - } - - total = n - - return total, reader.InputConsumed(), zero, nil - } - - return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "unsupported payload type"} -} - -// readOfsDistanceFromStream reads one ofs-delta encoded distance. -func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) { - first, err := reader.ReadByte() - if err != nil { - return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err) - } - - dist := uint64(first & 0x7f) - consumed := 1 - - b := first - for b&0x80 != 0 { - b, err = reader.ReadByte() - if err != nil { - return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err) - } - - consumed++ - dist = ((dist + 1) << 7) + uint64(b&0x7f) - } - - return dist, consumed, nil -} - -// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity. -// readDeltaHeaderSizes reads source and destination sizes from one delta payload. -func readDeltaHeaderSizes(payload []byte) (int, int, error) { - reader := &byteSliceReader{data: payload} - - return deltaapply.ReadHeaderSizes(reader) -} - -// byteSliceReader implements io.ByteReader on []byte. -type byteSliceReader struct { - data []byte - pos int -} - -// ReadByte reads one byte from receiver. -func (reader *byteSliceReader) ReadByte() (byte, error) { - if reader.pos >= len(reader.data) { - return 0, io.EOF - } - - b := reader.data[reader.pos] - reader.pos++ - - return b, nil -} diff --git a/format/pack/ingest/thin_append.go b/format/pack/ingest/thin_append.go new file mode 100644 index 00000000..ad2d33d1 --- /dev/null +++ b/format/pack/ingest/thin_append.go @@ -0,0 +1,86 @@ +package ingest + +import ( + "compress/zlib" + "hash/crc32" + "io" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// appendBaseObject appends one base object as a new packed non-delta entry. +func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) { + start := state.stream.consumed + + header := encodePackEntryHeader(realType, int64(len(content))) + + startInt64, err := intconv.Uint64ToInt64(start) + if err != nil { + return 0, err + } + + _, err = state.packFile.WriteAt(header, startInt64) + if err != nil { + return 0, err + } + + headerLenInt64 := int64(len(header)) + section := &fileSectionWriter{file: state.packFile, off: startInt64 + headerLenInt64} + crc := crc32.NewIEEE() + _, _ = crc.Write(header) + counting := &countingWriter{dst: section} + + zw := zlib.NewWriter(io.MultiWriter(counting, crc)) + + _, err = zw.Write(content) + if err != nil { + return 0, err + } + + err = zw.Close() + if err != nil { + return 0, err + } + + headerLenUint64, err := intconv.IntToUint64(len(header)) + if err != nil { + return 0, err + } + + countingNUint64, err := intconv.IntToUint64(counting.n) + if err != nil { + return 0, err + } + + packedLen := headerLenUint64 + countingNUint64 + end := start + packedLen + state.stream.consumed = end + + headerLenUint32, err := intconv.IntToUint32(len(header)) + if err != nil { + return 0, err + } + + record := objectRecord{ + offset: start, + headerLen: headerLenUint32, + packedLen: packedLen, + crc32: crc.Sum32(), + packedType: realType, + realType: realType, + declaredSize: int64(len(content)), + dataOffset: start + headerLenUint64, + objectID: id, + resolved: true, + } + + recordIdx := len(state.records) + state.records = append(state.records, record) + state.offsetToRecord[start] = recordIdx + state.objectToRecord[id] = recordIdx + state.baseCache.add(recordIdx, realType, content) + + return recordIdx, nil +} diff --git a/format/pack/ingest/thin_fix.go b/format/pack/ingest/thin_fix.go index 767816ed..65ce131c 100644 --- a/format/pack/ingest/thin_fix.go +++ b/format/pack/ingest/thin_fix.go @@ -1,16 +1,9 @@ package ingest import ( - "encoding/binary" "fmt" - "hash/crc32" - "io" - "os" - "codeberg.org/lindenii/furgit/internal/compress/zlib" "codeberg.org/lindenii/furgit/internal/intconv" - "codeberg.org/lindenii/furgit/objectid" - "codeberg.org/lindenii/furgit/objecttype" ) // maybeFixThin appends missing bases and rewrites pack header/trailer when needed. @@ -75,218 +68,3 @@ func maybeFixThin(state *ingestState) error { return nil } - -// appendBaseObject appends one base object as a new packed non-delta entry. -func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) { - start := state.stream.consumed - - header := encodePackEntryHeader(realType, int64(len(content))) - - startInt64, err := intconv.Uint64ToInt64(start) - if err != nil { - return 0, err - } - - _, err = state.packFile.WriteAt(header, startInt64) - if err != nil { - return 0, err - } - - headerLenInt64 := int64(len(header)) - section := &fileSectionWriter{file: state.packFile, off: startInt64 + headerLenInt64} - crc := crc32.NewIEEE() - _, _ = crc.Write(header) - counting := &countingWriter{dst: section} - - zw := zlib.NewWriter(io.MultiWriter(counting, crc)) - - _, err = zw.Write(content) - if err != nil { - return 0, err - } - - err = zw.Close() - if err != nil { - return 0, err - } - - headerLenUint64, err := intconv.IntToUint64(len(header)) - if err != nil { - return 0, err - } - - countingNUint64, err := intconv.IntToUint64(counting.n) - if err != nil { - return 0, err - } - - packedLen := headerLenUint64 + countingNUint64 - end := start + packedLen - state.stream.consumed = end - - headerLenUint32, err := intconv.IntToUint32(len(header)) - if err != nil { - return 0, err - } - - record := objectRecord{ - offset: start, - headerLen: headerLenUint32, - packedLen: packedLen, - crc32: crc.Sum32(), - packedType: realType, - realType: realType, - declaredSize: int64(len(content)), - dataOffset: start + headerLenUint64, - objectID: id, - resolved: true, - } - - recordIdx := len(state.records) - state.records = append(state.records, record) - state.offsetToRecord[start] = recordIdx - state.objectToRecord[id] = recordIdx - state.baseCache.add(recordIdx, realType, content) - - return recordIdx, nil -} - -// fileSectionWriter writes sequentially to file via WriteAt at one base offset. -type fileSectionWriter struct { - file *os.File - off int64 - pos int64 -} - -// Write writes src at current section position. -func (writer *fileSectionWriter) Write(src []byte) (int, error) { - if len(src) == 0 { - return 0, nil - } - - n, err := writer.file.WriteAt(src, writer.off+writer.pos) - writer.pos += int64(n) - - return n, err -} - -// countingWriter counts bytes written to dst. -type countingWriter struct { - dst io.Writer - n int -} - -// Write writes src to dst and tracks output byte count. -func (writer *countingWriter) Write(src []byte) (int, error) { - n, err := writer.dst.Write(src) - writer.n += n - - return n, err -} - -// rewritePackHeaderAndTrailer rewrites object count and trailer hash using ReadAt/WriteAt. -func rewritePackHeaderAndTrailer(state *ingestState) error { - var countRaw [4]byte - - recordCountUint32, err := intconv.IntToUint32(len(state.records)) - if err != nil { - return err - } - - binary.BigEndian.PutUint32(countRaw[:], recordCountUint32) - - _, err = state.packFile.WriteAt(countRaw[:], 8) - if err != nil { - return err - } - - info, err := state.packFile.Stat() - if err != nil { - return err - } - - endWithoutTrailer := info.Size() - - hashImpl, err := state.algo.New() - if err != nil { - return err - } - - var ( - buf [128 << 10]byte - pos int64 - ) - for pos < endWithoutTrailer { - want := int64(len(buf)) - - remaining := endWithoutTrailer - pos - if remaining < want { - want = remaining - } - - n, err := state.packFile.ReadAt(buf[:want], pos) - if err != nil && err != io.EOF { - return err - } - - if n == 0 { - return io.ErrUnexpectedEOF - } - - _, _ = hashImpl.Write(buf[:n]) - pos += int64(n) - } - - sum := hashImpl.Sum(nil) - - _, err = state.packFile.WriteAt(sum, endWithoutTrailer) - if err != nil { - return err - } - - packHash, err := objectid.FromBytes(state.algo, sum) - if err != nil { - return err - } - - state.packHash = packHash - state.objectCountHeader = recordCountUint32 - - sumLenInt64 := int64(len(sum)) - - newConsumed, err := intconv.Int64ToUint64(endWithoutTrailer + sumLenInt64) - if err != nil { - return err - } - - state.stream.consumed = newConsumed - - return nil -} - -// encodePackEntryHeader encodes one non-delta packed entry header. -func encodePackEntryHeader(ty objecttype.Type, size int64) []byte { - var out [16]byte - - n := 0 - - s, err := intconv.Int64ToUint64(size) - if err != nil { - panic(err) - } - - c := (uint8(ty) << 4) | byte(s&0x0f) - - s >>= 4 - for s != 0 { - out[n] = c | 0x80 - n++ - c = byte(s & 0x7f) - s >>= 7 - } - - out[n] = c - n++ - - return append([]byte(nil), out[:n]...) -} diff --git a/format/pack/ingest/thin_unresolved.go b/format/pack/ingest/thin_unresolved.go new file mode 100644 index 00000000..347f3f29 --- /dev/null +++ b/format/pack/ingest/thin_unresolved.go @@ -0,0 +1,34 @@ +package ingest + +import ( + "bytes" + "slices" + + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// unresolvedThinBaseIDs returns sorted unique unresolved ref base IDs. +func unresolvedThinBaseIDs(state *ingestState) []objectid.ObjectID { + seen := make(map[objectid.ObjectID]struct{}) + + for _, idx := range state.unresolvedRefDeltas { + record := state.records[idx] + if record.packedType != objecttype.TypeRefDelta { + continue + } + + seen[record.baseObject] = struct{}{} + } + + out := make([]objectid.ObjectID, 0, len(seen)) + for id := range seen { + out = append(out, id) + } + + slices.SortFunc(out, func(a, b objectid.ObjectID) int { + return bytes.Compare(a.RawBytes(), b.RawBytes()) + }) + + return out +} diff --git a/format/pack/ingest/trailer.go b/format/pack/ingest/trailer.go new file mode 100644 index 00000000..4dba3884 --- /dev/null +++ b/format/pack/ingest/trailer.go @@ -0,0 +1,45 @@ +package ingest + +import ( + "bytes" + "errors" + "fmt" + "io" +) + +// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum, +// and ensures no trailing garbage remains in stream. +func (scanner *streamScanner) finishAndFlushTrailer() error { + if scanner.hashSize <= 0 { + return fmt.Errorf("format/pack/ingest: invalid hash size") + } + + trailer := make([]byte, scanner.hashSize) + + scanner.hashEnabled = false + + err := scanner.readFull(trailer) + if err != nil { + return &ErrPackTrailerMismatch{} + } + + scanner.packTrailer = append(scanner.packTrailer[:0], trailer...) + + var probe [1]byte + + n, err := scanner.Read(probe[:]) + if n > 0 || err == nil { + return fmt.Errorf("format/pack/ingest: pack has trailing garbage") + } + + if !errors.Is(err, io.EOF) { + return err + } + + computed := scanner.hash.Sum(nil) + if !bytes.Equal(computed, trailer) { + return &ErrPackTrailerMismatch{} + } + + return nil +} diff --git a/format/pack/ingest/use.go b/format/pack/ingest/use.go new file mode 100644 index 00000000..18fc6357 --- /dev/null +++ b/format/pack/ingest/use.go @@ -0,0 +1,34 @@ +package ingest + +import ( + "fmt" + "hash/crc32" +) + +// use consumes n unread bytes and updates accounting/checksum state. +func (scanner *streamScanner) use(n int) error { + if n < 0 || n > scanner.n-scanner.off { + return fmt.Errorf("format/pack/ingest: invalid consume length %d", n) + } + + if n == 0 { + return nil + } + + chunk := scanner.buf[scanner.off : scanner.off+n] + if scanner.hashEnabled { + _, err := scanner.hash.Write(chunk) + if err != nil { + return err + } + } + + if scanner.inEntryCRC { + scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += uint64(n) + + return nil +} |
