diff options
Diffstat (limited to 'packfile')
50 files changed, 3249 insertions, 0 deletions
diff --git a/packfile/entry.go b/packfile/entry.go new file mode 100644 index 00000000..c17fe619 --- /dev/null +++ b/packfile/entry.go @@ -0,0 +1,121 @@ +package pack + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objecttype" +) + +// EntryHeader is one parsed pack entry header prefix. +type EntryHeader struct { + // Type is the entry type tag from the first header byte. + Type objecttype.Type + // Size is the declared resulting object size. + Size int64 + // HeaderSize is the number of bytes consumed by the type/size header. + HeaderSize int +} + +// ParseEntryHeader parses one pack entry type/size header from data. +func ParseEntryHeader(data []byte) (EntryHeader, error) { + var zero EntryHeader + if len(data) == 0 { + return zero, fmt.Errorf("packfile: truncated entry header") + } + + first := data[0] + header := EntryHeader{ + Type: objecttype.Type((first >> 4) & 0x07), + Size: int64(first & 0x0f), + HeaderSize: 1, + } + + shift := uint(4) + + b := first + for b&0x80 != 0 { + if header.HeaderSize >= len(data) { + return zero, fmt.Errorf("packfile: truncated entry header") + } + + b = data[header.HeaderSize] + header.HeaderSize++ + header.Size |= int64(b&0x7f) << shift + shift += 7 + } + + if header.Size < 0 { + return zero, fmt.Errorf("packfile: negative entry size") + } + + return header, nil +} + +// Entry is one parsed pack entry prefix, including any delta base reference +// data that appears before the compressed payload. +type Entry struct { + // Type is the pack entry type. + Type objecttype.Type + // Size is the declared resulting object size. + Size int64 + // DataOffset is the byte offset from the start of the entry to the zlib + // payload bytes. + DataOffset int + // RefBaseID is the referenced base object ID bytes for ref-delta entries. + RefBaseID []byte + // OfsBaseDistance is the backward distance for ofs-delta entries. + OfsBaseDistance uint64 +} + +// ParseEntry parses one full pack entry prefix from data. +// +// hashSize must match the hash algorithm size used by the pack/index. +func ParseEntry(data []byte, hashSize int) (Entry, error) { + var zero Entry + + header, err := ParseEntryHeader(data) + if err != nil { + return zero, err + } + + entry := Entry{ + Type: header.Type, + Size: header.Size, + DataOffset: header.HeaderSize, + } + + switch entry.Type { + case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag: + // Base object entries have no extra prefix fields. + case objecttype.TypeRefDelta: + if hashSize <= 0 { + return zero, fmt.Errorf("packfile: invalid hash size %d", hashSize) + } + + end := entry.DataOffset + hashSize + if end > len(data) { + return zero, fmt.Errorf("packfile: truncated ref-delta base id") + } + + entry.RefBaseID = data[entry.DataOffset:end] + entry.DataOffset = end + case objecttype.TypeOfsDelta: + dist, consumed, err := ParseOfsDeltaDistance(data[entry.DataOffset:]) + if err != nil { + return zero, err + } + + entry.OfsBaseDistance = dist + entry.DataOffset += consumed + case objecttype.TypeInvalid, objecttype.TypeFuture: + return zero, fmt.Errorf("packfile: unsupported object type %d", entry.Type) + default: + return zero, fmt.Errorf("packfile: unsupported object type %d", entry.Type) + } + + if entry.DataOffset > len(data) { + return zero, fmt.Errorf("packfile: entry data offset out of bounds") + } + + return entry, nil +} diff --git a/packfile/ingest/api.go b/packfile/ingest/api.go new file mode 100644 index 00000000..040e8d77 --- /dev/null +++ b/packfile/ingest/api.go @@ -0,0 +1,192 @@ +package ingest + +import ( + "bufio" + "bytes" + "errors" + "io" + "os" + + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objectstore" +) + +// Options controls one pack ingest operation. +type Options struct { + // FixThin appends missing local bases for thin packs. + FixThin bool + // WriteRev writes a .rev alongside the .pack and .idx. + WriteRev bool + // Base supplies existing objects for thin-pack fixup. + Base objectstore.Store + // Progress receives human-readable progress messages. + // + // When nil, no progress output is emitted. + Progress io.Writer + // ProgressFlush flushes transport output after progress writes. + // + // When nil, no explicit flush is attempted. + ProgressFlush func() error + // RequireTrailingEOF requires the source to hit EOF after the pack trailer. + // + // This is suitable for exact pack-file readers, but should be disabled for + // full-duplex transport streams like receive-pack where the peer keeps the + // connection open to read the server response. + RequireTrailingEOF bool +} + +// Result describes one successful ingest transaction. +type Result struct { + // PackName is the destination-relative filename of the written .pack. + PackName string + // IdxName is the destination-relative filename of the written .idx. + IdxName string + // RevName is the destination-relative filename of the written .rev. + // + // RevName is empty when writeRev is false. + RevName string + // PackHash is the final pack hash (same hash embedded in .idx/.rev trailers). + PackHash objectid.ObjectID + // ObjectCount is the final object count in the resulting pack. + // + // If thin fixup appends objects, this includes appended base objects. + ObjectCount uint32 + // ThinFixed reports whether thin fixup appended local bases. + ThinFixed bool +} + +// HeaderInfo describes the parsed PACK header. +type HeaderInfo struct { + Version uint32 + ObjectCount uint32 +} + +// DiscardResult describes one successful Discard call. +type DiscardResult struct { + PackHash objectid.ObjectID + ObjectCount uint32 +} + +// Pending is one started ingest operation awaiting Continue or Discard. +type Pending struct { + reader *bufio.Reader + algo objectid.Algorithm + opts Options + header HeaderInfo + headerRaw [packHeaderSize]byte + + finalized bool +} + +// Ingest reads and validates one PACK header, returning one pending operation. +func Ingest( + src io.Reader, + algo objectid.Algorithm, + opts Options, +) (*Pending, error) { + if algo.Size() == 0 { + return nil, objectid.ErrInvalidAlgorithm + } + + reader := bufio.NewReader(src) + + header, headerRaw, err := readAndValidatePackHeader(reader) + if err != nil { + return nil, err + } + + return &Pending{ + reader: reader, + algo: algo, + opts: opts, + header: header, + headerRaw: headerRaw, + }, nil +} + +// Header returns parsed PACK header info. +func (pending *Pending) Header() HeaderInfo { + return pending.header +} + +// Continue ingests the pack stream into destination and writes pack artifacts. +func (pending *Pending) Continue(destination *os.Root) (Result, error) { + if pending.finalized { + return Result{}, ErrAlreadyFinalized + } + + pending.finalized = true + + if pending.header.ObjectCount == 0 { + return Result{}, ErrZeroObjectContinue + } + + state, err := newIngestState( + pending.reader, + destination, + pending.algo, + pending.opts, + pending.header, + pending.headerRaw, + ) + if err != nil { + return Result{}, err + } + + return ingest(state) +} + +// Discard consumes and verifies one zero-object pack stream without writing files. +func (pending *Pending) Discard() (DiscardResult, error) { + if pending.finalized { + return DiscardResult{}, ErrAlreadyFinalized + } + + pending.finalized = true + + if pending.header.ObjectCount != 0 { + return DiscardResult{}, ErrNonZeroDiscard + } + + hashImpl, err := pending.algo.New() + if err != nil { + return DiscardResult{}, err + } + + _, _ = hashImpl.Write(pending.headerRaw[:]) + + trailer := make([]byte, pending.algo.Size()) + + _, err = io.ReadFull(pending.reader, trailer) + if err != nil { + return DiscardResult{}, &PackTrailerMismatchError{} + } + + computed := hashImpl.Sum(nil) + if !bytes.Equal(computed, trailer) { + return DiscardResult{}, &PackTrailerMismatchError{} + } + + if pending.opts.RequireTrailingEOF { + var probe [1]byte + + n, err := pending.reader.Read(probe[:]) + if n > 0 || err == nil { + return DiscardResult{}, errors.New("packfile/ingest: pack has trailing garbage") + } + + if err != io.EOF { + return DiscardResult{}, err + } + } + + packHash, err := objectid.FromBytes(pending.algo, trailer) + if err != nil { + return DiscardResult{}, err + } + + return DiscardResult{ + PackHash: packHash, + ObjectCount: 0, + }, nil +} diff --git a/packfile/ingest/byteslice_reader.go b/packfile/ingest/byteslice_reader.go new file mode 100644 index 00000000..a1570ef3 --- /dev/null +++ b/packfile/ingest/byteslice_reader.go @@ -0,0 +1,21 @@ +package ingest + +import "io" + +// byteSliceReader implements io.ByteReader on []byte. +type byteSliceReader struct { + data []byte + pos int +} + +// ReadByte reads one byte from receiver. +func (reader *byteSliceReader) ReadByte() (byte, error) { + if reader.pos >= len(reader.data) { + return 0, io.EOF + } + + b := reader.data[reader.pos] + reader.pos++ + + return b, nil +} diff --git a/packfile/ingest/cache.go b/packfile/ingest/cache.go new file mode 100644 index 00000000..3b4bf31a --- /dev/null +++ b/packfile/ingest/cache.go @@ -0,0 +1,53 @@ +package ingest + +import ( + "codeberg.org/lindenii/furgit/internal/lru" + "codeberg.org/lindenii/furgit/objecttype" +) + +// deltaBaseCacheKey identifies one resolved base by record index. +type deltaBaseCacheKey struct { + recordIdx int +} + +// deltaBaseCacheValue stores one resolved base object payload. +type deltaBaseCacheValue struct { + realType objecttype.Type + content []byte +} + +// deltaBaseCache is a bounded LRU for resolved base payloads. +type deltaBaseCache struct { + lru *lru.Cache[deltaBaseCacheKey, deltaBaseCacheValue] +} + +// newDeltaBaseCache creates one bounded base cache. +func newDeltaBaseCache(maxBytes int64) *deltaBaseCache { + return &deltaBaseCache{ + lru: lru.New( + maxBytes, + func(_ deltaBaseCacheKey, value deltaBaseCacheValue) int64 { + return int64(len(value.content)) + }, + nil, + ), + } +} + +// get returns one cache entry for recordIdx. +func (cache *deltaBaseCache) get(recordIdx int) (objecttype.Type, []byte, bool) { + value, ok := cache.lru.Get(deltaBaseCacheKey{recordIdx: recordIdx}) + if !ok { + return objecttype.TypeInvalid, nil, false + } + + return value.realType, value.content, true +} + +// add stores one cache entry for recordIdx. +func (cache *deltaBaseCache) add(recordIdx int, realType objecttype.Type, content []byte) { + cache.lru.Add(deltaBaseCacheKey{recordIdx: recordIdx}, deltaBaseCacheValue{ + realType: realType, + content: content, + }) +} diff --git a/packfile/ingest/counting_writer.go b/packfile/ingest/counting_writer.go new file mode 100644 index 00000000..051ad9d1 --- /dev/null +++ b/packfile/ingest/counting_writer.go @@ -0,0 +1,17 @@ +package ingest + +import "io" + +// countingWriter counts bytes written to dst. +type countingWriter struct { + dst io.Writer + n int +} + +// Write writes src to dst and tracks output byte count. +func (writer *countingWriter) Write(src []byte) (int, error) { + n, err := writer.dst.Write(src) + writer.n += n + + return n, err +} diff --git a/packfile/ingest/crc.go b/packfile/ingest/crc.go new file mode 100644 index 00000000..f55af4ff --- /dev/null +++ b/packfile/ingest/crc.go @@ -0,0 +1,22 @@ +package ingest + +import "fmt" + +// beginEntryCRC starts inline CRC accumulation for one packed entry. +func (scanner *streamScanner) beginEntryCRC() { + scanner.entryCRC = 0 + scanner.inEntryCRC = true +} + +// endEntryCRC finishes inline CRC accumulation for one packed entry. +func (scanner *streamScanner) endEntryCRC() (uint32, error) { + if !scanner.inEntryCRC { + return 0, fmt.Errorf("packfile/ingest: entry CRC not started") + } + + crc := scanner.entryCRC + scanner.entryCRC = 0 + scanner.inEntryCRC = false + + return crc, nil +} diff --git a/packfile/ingest/delta_header.go b/packfile/ingest/delta_header.go new file mode 100644 index 00000000..76e90172 --- /dev/null +++ b/packfile/ingest/delta_header.go @@ -0,0 +1,11 @@ +package ingest + +import deltaapply "codeberg.org/lindenii/furgit/delta/apply" + +// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity. +// readDeltaHeaderSizes reads source and destination sizes from one delta payload. +func readDeltaHeaderSizes(payload []byte) (int, int, error) { + reader := &byteSliceReader{data: payload} + + return deltaapply.ReadHeaderSizes(reader) +} diff --git a/packfile/ingest/distance.go b/packfile/ingest/distance.go new file mode 100644 index 00000000..9bc4d886 --- /dev/null +++ b/packfile/ingest/distance.go @@ -0,0 +1,30 @@ +package ingest + +import ( + "fmt" + "io" +) + +// readOfsDistanceFromStream reads one ofs-delta encoded distance. +func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) { + first, err := reader.ReadByte() + if err != nil { + return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err) + } + + dist := uint64(first & 0x7f) + consumed := 1 + + b := first + for b&0x80 != 0 { + b, err = reader.ReadByte() + if err != nil { + return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err) + } + + consumed++ + dist = ((dist + 1) << 7) + uint64(b&0x7f) + } + + return dist, consumed, nil +} diff --git a/packfile/ingest/doc.go b/packfile/ingest/doc.go new file mode 100644 index 00000000..2095068a --- /dev/null +++ b/packfile/ingest/doc.go @@ -0,0 +1,3 @@ +// Package ingest implements streaming ingestion of one Git pack stream into a +// destination root, producing .pack/.idx and optionally .rev. +package ingest diff --git a/packfile/ingest/drain.go b/packfile/ingest/drain.go new file mode 100644 index 00000000..ce04fcb4 --- /dev/null +++ b/packfile/ingest/drain.go @@ -0,0 +1,68 @@ +package ingest + +import ( + "fmt" + "io" + + "codeberg.org/lindenii/furgit/internal/compress/zlib" + "codeberg.org/lindenii/furgit/objectheader" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" + packfmt "codeberg.org/lindenii/furgit/packfile" +) + +// drainEntryPayload inflates one entry payload from stream and returns +// (inflatedLength, oidForBaseEntry). +func drainEntryPayload(state *ingestState, record objectRecord) (int64, objectid.ObjectID, error) { + var zero objectid.ObjectID + + reader, err := zlib.NewReader(state.stream) + if err != nil { + return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)} + } + + defer func() { _ = reader.Close() }() + + var total int64 + + if packfmt.IsBaseObjectType(record.packedType) { + header, ok := objectheader.Encode(record.packedType, record.declaredSize) + if !ok { + return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: "encode object header"} + } + + hashImpl, err := state.algo.New() + if err != nil { + return 0, zero, err + } + + _, _ = hashImpl.Write(header) + + n, err := io.Copy(hashImpl, reader) + if err != nil { + return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)} + } + + total = n + + oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil)) + if err != nil { + return 0, zero, err + } + + return total, oid, nil + } + + if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta { + n, err := io.Copy(io.Discard, reader) + if err != nil { + return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)} + } + + total = n + + return total, zero, nil + } + + return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: "unsupported payload type"} +} diff --git a/packfile/ingest/entry.go b/packfile/ingest/entry.go new file mode 100644 index 00000000..01be318d --- /dev/null +++ b/packfile/ingest/entry.go @@ -0,0 +1,92 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objecttype" + packfmt "codeberg.org/lindenii/furgit/packfile" +) + +// scanOneEntry scans one pack entry from stream and appends one record. +func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) { + state.stream.beginEntryCRC() + + record, err := parseEntryPrefix(state, startOffset) + if err != nil { + return 0, err + } + + payloadStartConsumed := state.stream.consumed + + contentLen, oid, err := drainEntryPayload(state, record) + if err != nil { + return 0, err + } + + consumedInput := state.stream.consumed - payloadStartConsumed + + if contentLen != record.declaredSize { + return 0, &MalformedPackEntryError{ + Offset: startOffset, + Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize), + } + } + + endOffset := startOffset + uint64(record.headerLen) + consumedInput + if endOffset > state.stream.consumed { + return 0, &MalformedPackEntryError{ + Offset: startOffset, + Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed), + } + } + + record.packedLen = endOffset - startOffset + + record.dataOffset = startOffset + uint64(record.headerLen) + if record.packedLen < uint64(record.headerLen) { + return 0, &MalformedPackEntryError{Offset: startOffset, Reason: "negative payload span"} + } + + crc, err := state.stream.endEntryCRC() + if err != nil { + return 0, err + } + + record.crc32 = crc + + if packfmt.IsBaseObjectType(record.packedType) { + record.objectID = oid + record.realType = record.packedType + record.resolved = true + } + + recordIdx := len(state.records) + state.records = append(state.records, record) + + state.offsetToRecord[record.offset] = recordIdx + if record.resolved { + state.objectToRecord[record.objectID] = recordIdx + } + + switch record.packedType { + case objecttype.TypeOfsDelta: + state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{ + baseOffset: record.baseOffset, + recordIdx: recordIdx, + }) + case objecttype.TypeRefDelta: + state.refDeltas = append(state.refDeltas, refDeltaRef{ + baseObject: record.baseObject, + recordIdx: recordIdx, + }) + case objecttype.TypeInvalid, + objecttype.TypeCommit, + objecttype.TypeTree, + objecttype.TypeBlob, + objecttype.TypeTag, + objecttype.TypeFuture: + default: + } + + return endOffset, nil +} diff --git a/packfile/ingest/entry_header.go b/packfile/ingest/entry_header.go new file mode 100644 index 00000000..54f63fac --- /dev/null +++ b/packfile/ingest/entry_header.go @@ -0,0 +1,33 @@ +package ingest + +import ( + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objecttype" +) + +// encodePackEntryHeader encodes one non-delta packed entry header. +func encodePackEntryHeader(ty objecttype.Type, size int64) []byte { + var out [16]byte + + n := 0 + + s, err := intconv.Int64ToUint64(size) + if err != nil { + panic(err) + } + + c := (uint8(ty) << 4) | byte(s&0x0f) + + s >>= 4 + for s != 0 { + out[n] = c | 0x80 + n++ + c = byte(s & 0x7f) + s >>= 7 + } + + out[n] = c + n++ + + return append([]byte(nil), out[:n]...) +} diff --git a/packfile/ingest/entry_prefix.go b/packfile/ingest/entry_prefix.go new file mode 100644 index 00000000..85493798 --- /dev/null +++ b/packfile/ingest/entry_prefix.go @@ -0,0 +1,95 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// parseEntryPrefix parses one entry prefix from stream. +func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) { + var record objectRecord + + record.offset = startOffset + + first, err := state.stream.ReadByte() + if err != nil { + return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)} + } + + record.packedType = objecttype.Type((first >> 4) & 0x07) + size := int64(first & 0x0f) + headerLen := uint32(1) + shift := uint(4) + b := first + + for b&0x80 != 0 { + b, err = state.stream.ReadByte() + if err != nil { + return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)} + } + + headerLen++ + size |= int64(b&0x7f) << shift + shift += 7 + } + + if size < 0 { + return record, &MalformedPackEntryError{Offset: startOffset, Reason: "negative declared size"} + } + + record.declaredSize = size + + switch record.packedType { + case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag: + case objecttype.TypeRefDelta: + baseRaw := make([]byte, state.algo.Size()) + + err := state.stream.readFull(baseRaw) + if err != nil { + return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)} + } + + baseID, err := objectid.FromBytes(state.algo, baseRaw) + if err != nil { + return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)} + } + + record.baseObject = baseID + + baseRawLen, err := intconv.IntToUint32(len(baseRaw)) + if err != nil { + return record, err + } + + headerLen += baseRawLen + case objecttype.TypeOfsDelta: + dist, consumed, err := readOfsDistanceFromStream(state.stream) + if err != nil { + return record, &MalformedPackEntryError{Offset: startOffset, Reason: err.Error()} + } + + if startOffset <= dist { + return record, &MalformedPackEntryError{Offset: startOffset, Reason: "ofs base offset out of bounds"} + } + + record.baseOffset = startOffset - dist + + consumedUint32, err := intconv.IntToUint32(consumed) + if err != nil { + return record, err + } + + headerLen += consumedUint32 + case objecttype.TypeInvalid, objecttype.TypeFuture: + return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} + default: + return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} + } + + record.headerLen = headerLen + + return record, nil +} diff --git a/packfile/ingest/errors.go b/packfile/ingest/errors.go new file mode 100644 index 00000000..6292d790 --- /dev/null +++ b/packfile/ingest/errors.go @@ -0,0 +1,77 @@ +package ingest + +import ( + "errors" + "fmt" +) + +// InvalidPackHeaderError reports an invalid or unsupported pack header. +type InvalidPackHeaderError struct { + Reason string +} + +// Error implements error. +func (err *InvalidPackHeaderError) Error() string { + return "packfile/ingest: invalid pack header: " + err.Reason +} + +// PackTrailerMismatchError reports a mismatch between computed and trailer pack hash. +type PackTrailerMismatchError struct{} + +// Error implements error. +func (err *PackTrailerMismatchError) Error() string { + return "packfile/ingest: pack trailer hash mismatch" +} + +// ThinPackUnresolvedError reports unresolved REF deltas when fixThin is disabled +// or when required bases cannot be found in base. +type ThinPackUnresolvedError struct { + Count int +} + +// Error implements error. +func (err *ThinPackUnresolvedError) Error() string { + return fmt.Sprintf("packfile/ingest: unresolved thin deltas: %d", err.Count) +} + +// MalformedPackEntryError reports malformed entry encoding at one pack offset. +type MalformedPackEntryError struct { + Offset uint64 + Reason string +} + +// Error implements error. +func (err *MalformedPackEntryError) Error() string { + return fmt.Sprintf("packfile/ingest: malformed pack entry at offset %d: %s", err.Offset, err.Reason) +} + +// DeltaCycleError reports a detected cycle in delta dependency resolution. +type DeltaCycleError struct { + Offset uint64 +} + +// Error implements error. +func (err *DeltaCycleError) Error() string { + return fmt.Sprintf("packfile/ingest: delta cycle detected at offset %d", err.Offset) +} + +// DestinationWriteError reports destination I/O failures. +type DestinationWriteError struct { + Op string +} + +// Error implements error. +func (err *DestinationWriteError) Error() string { + return "packfile/ingest: destination write failure: " + err.Op +} + +var errExternalThinBase = errors.New("packfile/ingest: external thin base required") + +var ( + // ErrAlreadyFinalized indicates Continue/Discard already called. + ErrAlreadyFinalized = errors.New("packfile/ingest: operation already finalized") + // ErrZeroObjectContinue indicates Continue was called for a zero-object pack. + ErrZeroObjectContinue = errors.New("packfile/ingest: cannot continue zero-object pack") + // ErrNonZeroDiscard indicates Discard was called for a non-zero-object pack. + ErrNonZeroDiscard = errors.New("packfile/ingest: cannot discard non-zero pack") +) diff --git a/packfile/ingest/file_section_writer.go b/packfile/ingest/file_section_writer.go new file mode 100644 index 00000000..fa28c1a9 --- /dev/null +++ b/packfile/ingest/file_section_writer.go @@ -0,0 +1,22 @@ +package ingest + +import "os" + +// fileSectionWriter writes sequentially to file via WriteAt at one base offset. +type fileSectionWriter struct { + file *os.File + off int64 + pos int64 +} + +// Write writes src at current section position. +func (writer *fileSectionWriter) Write(src []byte) (int, error) { + if len(src) == 0 { + return 0, nil + } + + n, err := writer.file.WriteAt(src, writer.off+writer.pos) + writer.pos += int64(n) + + return n, err +} diff --git a/packfile/ingest/fill.go b/packfile/ingest/fill.go new file mode 100644 index 00000000..eca4e4d6 --- /dev/null +++ b/packfile/ingest/fill.go @@ -0,0 +1,44 @@ +package ingest + +import ( + "errors" + "fmt" + "io" +) + +// fill ensures at least min unread bytes are available in receiver's buffer. +func (scanner *streamScanner) fill(minLen int) error { + if minLen <= 0 { + return nil + } + + if minLen > len(scanner.buf) { + return fmt.Errorf("packfile/ingest: fill(%d) exceeds scanner buffer", minLen) + } + + for scanner.n-scanner.off < minLen { + err := scanner.flushConsumedPrefix() + if err != nil { + return err + } + + readN, err := scanner.src.Read(scanner.buf[scanner.n:]) + if readN > 0 { + scanner.n += readN + } + + if err != nil { + if errors.Is(err, io.EOF) && scanner.n-scanner.off >= minLen { + return nil + } + + return err + } + + if readN == 0 { + return io.ErrNoProgress + } + } + + return nil +} diff --git a/packfile/ingest/finalize.go b/packfile/ingest/finalize.go new file mode 100644 index 00000000..6fe4edb2 --- /dev/null +++ b/packfile/ingest/finalize.go @@ -0,0 +1,94 @@ +package ingest + +import ( + "errors" + "fmt" + "io/fs" + "strings" + + "codeberg.org/lindenii/furgit/internal/intconv" +) + +// finalizeArtifacts links temporary files to final names and returns Result. +func finalizeArtifacts(state *ingestState) (Result, error) { + base := "pack-" + state.packHash.String() + packFinal := base + ".pack" + idxFinal := base + ".idx" + + revFinal := "" + if state.opts.WriteRev { + revFinal = base + ".rev" + } + + err := linkTempToFinal(state, state.packTmpName, packFinal) + if err != nil { + return Result{}, err + } + + err = linkTempToFinal(state, state.idxTmpName, idxFinal) + if err != nil { + return Result{}, err + } + + if state.opts.WriteRev { + err := linkTempToFinal(state, state.revTmpName, revFinal) + if err != nil { + return Result{}, err + } + } + + objectCount, err := intconv.IntToUint32(len(state.records)) + if err != nil { + return Result{}, err + } + + return Result{ + PackName: packFinal, + IdxName: idxFinal, + RevName: revFinal, + PackHash: state.packHash, + ObjectCount: objectCount, + ThinFixed: state.thinFixed, + }, nil +} + +// rollbackTemporaryArtifacts removes temporary files after failure. +func rollbackTemporaryArtifacts(state *ingestState) { + if state.packTmpName != "" { + _ = state.destination.Remove(state.packTmpName) + } + + if state.idxTmpName != "" { + _ = state.destination.Remove(state.idxTmpName) + } + + if state.revTmpName != "" { + _ = state.destination.Remove(state.revTmpName) + } +} + +// linkTempToFinal hard-links tmp to final, tolerating existing final paths. +func linkTempToFinal(state *ingestState, tmp, final string) error { + if tmp == "" || final == "" { + return fmt.Errorf("packfile/ingest: invalid finalize names tmp=%q final=%q", tmp, final) + } + + if strings.Contains(final, "/") { + return fmt.Errorf("packfile/ingest: final name must be leaf: %q", final) + } + + err := state.destination.Link(tmp, final) + if err == nil { + _ = state.destination.Remove(tmp) + + return nil + } + + if errors.Is(err, fs.ErrExist) { + _ = state.destination.Remove(tmp) + + return nil + } + + return err +} diff --git a/packfile/ingest/flush.go b/packfile/ingest/flush.go new file mode 100644 index 00000000..96753170 --- /dev/null +++ b/packfile/ingest/flush.go @@ -0,0 +1,37 @@ +package ingest + +import "fmt" + +// flush writes all consumed-but-unflushed bytes to destination pack file. +func (scanner *streamScanner) flush() error { + return scanner.flushConsumedPrefix() +} + +// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread +// bytes to the start of buffer. +func (scanner *streamScanner) flushConsumedPrefix() error { + if scanner.off == 0 { + return nil + } + + written := 0 + for written < scanner.off { + n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off]) + if err != nil { + return &DestinationWriteError{Op: fmt.Sprintf("write pack: %v", err)} + } + + if n == 0 { + return &DestinationWriteError{Op: "write pack: short write"} + } + + written += n + } + + unread := scanner.n - scanner.off + copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n]) + scanner.off = 0 + scanner.n = unread + + return nil +} diff --git a/packfile/ingest/hash.go b/packfile/ingest/hash.go new file mode 100644 index 00000000..83df55c5 --- /dev/null +++ b/packfile/ingest/hash.go @@ -0,0 +1,27 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objectheader" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// hashCanonicalObject hashes canonical object bytes (header+content). +func hashCanonicalObject(algo objectid.Algorithm, ty objecttype.Type, content []byte) (objectid.ObjectID, error) { + header, ok := objectheader.Encode(ty, int64(len(content))) + if !ok { + return objectid.ObjectID{}, fmt.Errorf("packfile/ingest: encode object header for type %d", ty) + } + + hashImpl, err := algo.New() + if err != nil { + return objectid.ObjectID{}, err + } + + _, _ = hashImpl.Write(header) + _, _ = hashImpl.Write(content) + + return objectid.FromBytes(algo, hashImpl.Sum(nil)) +} diff --git a/packfile/ingest/header.go b/packfile/ingest/header.go new file mode 100644 index 00000000..3e64fa57 --- /dev/null +++ b/packfile/ingest/header.go @@ -0,0 +1,49 @@ +package ingest + +import ( + "encoding/binary" + "fmt" + "io" + + "codeberg.org/lindenii/furgit/packfile" +) + +const packHeaderSize = 12 + +// readAndValidatePackHeader reads one PACK header from src and validates it. +func readAndValidatePackHeader(src io.Reader) (HeaderInfo, [packHeaderSize]byte, error) { + var hdr [packHeaderSize]byte + + _, err := io.ReadFull(src, hdr[:]) + if err != nil { + return HeaderInfo{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{ + Reason: fmt.Sprintf("read header: %v", err), + } + } + + header, err := parseAndValidatePackHeader(hdr) + if err != nil { + return HeaderInfo{}, [packHeaderSize]byte{}, err + } + + return header, hdr, nil +} + +// parseAndValidatePackHeader validates one already-read PACK header. +func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (HeaderInfo, error) { + if binary.BigEndian.Uint32(hdr[:4]) != pack.Signature { + return HeaderInfo{}, &InvalidPackHeaderError{Reason: "signature mismatch"} + } + + version := binary.BigEndian.Uint32(hdr[4:8]) + if !pack.VersionSupported(version) { + return HeaderInfo{}, &InvalidPackHeaderError{ + Reason: fmt.Sprintf("unsupported version %d", version), + } + } + + return HeaderInfo{ + Version: version, + ObjectCount: binary.BigEndian.Uint32(hdr[8:12]), + }, nil +} diff --git a/packfile/ingest/idx_write.go b/packfile/ingest/idx_write.go new file mode 100644 index 00000000..506788b9 --- /dev/null +++ b/packfile/ingest/idx_write.go @@ -0,0 +1,266 @@ +package ingest + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash" + "io" + "slices" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/internal/progress" +) + +const ( + idxMagicV2 = 0xff744f63 + idxVersionV2 = 2 +) + +// writeIdx writes idx v2 for resolved records. +func writeIdx(state *ingestState) error { + order := buildIdxOrder(state) + + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + write := func(src []byte) error { + _, writeErr := state.idxFile.Write(src) + if writeErr != nil { + return writeErr + } + + _, writeErr = hashImpl.Write(src) + if writeErr != nil { + return writeErr + } + + return nil + } + + var ( + scratch [8]byte + fanout [256]uint32 + ) + + writeProgressf(state, "writing index fanout...\r") + + for _, recordIdx := range order { + idRaw := state.records[recordIdx].objectID.Bytes() + fanout[idRaw[0]]++ + } + + binary.BigEndian.PutUint32(scratch[:4], idxMagicV2) + binary.BigEndian.PutUint32(scratch[4:8], idxVersionV2) + + err = write(scratch[:8]) + if err != nil { + return err + } + + var cumulative uint32 + for i := range fanout { + cumulative += fanout[i] + binary.BigEndian.PutUint32(scratch[:4], cumulative) + + err := write(scratch[:4]) + if err != nil { + return err + } + } + + writeProgressf(state, "writing index fanout: done.\n") + + largeOffsetCount := 0 + + for idx := range state.records { + if state.records[idx].offset >= 0x80000000 { + largeOffsetCount++ + } + } + + oidMeter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "writing index object ids", + Total: uint64(len(order)), + }) + + var oidDone uint64 + + for _, recordIdx := range order { + idRaw := state.records[recordIdx].objectID.Bytes() + + err := write(idRaw) + if err != nil { + return err + } + + oidDone++ + oidMeter.Set(oidDone, 0) + } + + if oidDone > 0 { + oidMeter.Stop("done") + } + + crcMeter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "writing index crc32", + Total: uint64(len(order)), + }) + + var crcDone uint64 + + for _, recordIdx := range order { + binary.BigEndian.PutUint32(scratch[:4], state.records[recordIdx].crc32) + + err := write(scratch[:4]) + if err != nil { + return err + } + + crcDone++ + crcMeter.Set(crcDone, 0) + } + + if crcDone > 0 { + crcMeter.Stop("done") + } + + largeOffsets := make([]uint64, 0) + offsetMeter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "writing index offsets", + Total: uint64(len(order)), + }) + + var offsetDone uint64 + + for _, recordIdx := range order { + offset := state.records[recordIdx].offset + if offset >= 0x80000000 { + largeOffsetIdx, err := intconv.IntToUint32(len(largeOffsets)) + if err != nil { + return err + } + + word := 0x80000000 | largeOffsetIdx + + largeOffsets = append(largeOffsets, offset) + + binary.BigEndian.PutUint32(scratch[:4], word) + } else { + binary.BigEndian.PutUint32(scratch[:4], uint32(offset)) + } + + err := write(scratch[:4]) + if err != nil { + return err + } + + offsetDone++ + offsetMeter.Set(offsetDone, 0) + } + + if offsetDone > 0 { + offsetMeter.Stop("done") + } + + total, err := intconv.IntToUint64(largeOffsetCount) + if err != nil { + return err + } + + largeOffsetMeter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "writing index large offsets", + Total: total, + }) + + var largeOffsetDone uint64 + + for _, off := range largeOffsets { + binary.BigEndian.PutUint64(scratch[:8], off) + + err := write(scratch[:8]) + if err != nil { + return err + } + + largeOffsetDone++ + largeOffsetMeter.Set(largeOffsetDone, 0) + } + + if largeOffsetDone > 0 { + largeOffsetMeter.Stop("done") + } + + writeProgressf(state, "writing index trailer...\r") + + err = write(state.packHash.Bytes()) + if err != nil { + return err + } + + idxHash := hashImpl.Sum(nil) + + _, err = state.idxFile.Write(idxHash) + if err != nil { + return err + } + + err = state.idxFile.Sync() + if err != nil { + return err + } + + writeProgressf(state, "writing index trailer: done.\n") + + return nil +} + +// buildIdxOrder returns record indexes sorted by ObjectID. +func buildIdxOrder(state *ingestState) []int { + out := make([]int, 0, len(state.records)) + for idx := range state.records { + out = append(out, idx) + } + + slices.SortFunc(out, func(a, b int) int { + return bytes.Compare(state.records[a].objectID.Bytes(), state.records[b].objectID.Bytes()) + }) + + return out +} + +// verifyResolvedRecords checks that all records are fully resolved before index writing. +func verifyResolvedRecords(state *ingestState) error { + for idx, record := range state.records { + if !record.resolved { + return fmt.Errorf("packfile/ingest: unresolved record %d at offset %d", idx, record.offset) + } + } + + return nil +} + +// writeAndHash writes src to dst and updates hash. +func writeAndHash(dst io.Writer, hashImpl hash.Hash, src []byte) error { + _, err := dst.Write(src) + if err != nil { + return err + } + + _, err = hashImpl.Write(src) + if err != nil { + return err + } + + return nil +} diff --git a/packfile/ingest/ingest.go b/packfile/ingest/ingest.go new file mode 100644 index 00000000..be65ff5f --- /dev/null +++ b/packfile/ingest/ingest.go @@ -0,0 +1,68 @@ +package ingest + +import ( + "fmt" +) + +// ingest initializes transaction state and executes the ingest pipeline. +func ingest(state *ingestState) (out Result, err error) { + err = openTemporaryArtifacts(state) + if err != nil { + return Result{}, err + } + + defer func() { + _ = closeTemporaryArtifacts(state) + if err != nil { + rollbackTemporaryArtifacts(state) + } + }() + + err = streamPackAndScan(state) + if err != nil { + return Result{}, err + } + + err = resolveAll(state) + if err != nil { + return Result{}, err + } + + err = maybeFixThin(state) + if err != nil { + return Result{}, err + } + + if state.thinFixed { + err = resolveAll(state) + if err != nil { + return Result{}, err + } + } + + if len(state.unresolvedRefDeltas) > 0 { + return Result{}, &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)} + } + + err = verifyResolvedRecords(state) + if err != nil { + return Result{}, err + } + + err = state.packFile.Sync() + if err != nil { + return Result{}, &DestinationWriteError{Op: fmt.Sprintf("sync pack: %v", err)} + } + + err = writeIdx(state) + if err != nil { + return Result{}, err + } + + err = writeRev(state) + if err != nil { + return Result{}, err + } + + return finalizeArtifacts(state) +} diff --git a/packfile/ingest/ingest_test.go b/packfile/ingest/ingest_test.go new file mode 100644 index 00000000..4e0cd3ec --- /dev/null +++ b/packfile/ingest/ingest_test.go @@ -0,0 +1,434 @@ +package ingest_test + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "testing" + + "codeberg.org/lindenii/furgit/internal/testgit" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/packfile/ingest" +) + +type noExtraReadReader struct { + reader *bytes.Reader +} + +func (r *noExtraReadReader) Read(p []byte) (int, error) { + if r.reader.Len() == 0 { + return 0, errors.New("unexpected extra read after pack trailer") + } + + return r.reader.Read(p) +} + +func beginAndContinue( + src io.Reader, + packRoot *os.Root, + algo objectid.Algorithm, + opts ingest.Options, +) (ingest.Result, error) { + pending, err := ingest.Ingest(src, algo, opts) + if err != nil { + return ingest.Result{}, err + } + + return pending.Continue(packRoot) +} + +// fixturePath returns one fixture file path for the selected algorithm. +func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string { + t.Helper() + + dir := algo.String() + if dir == "" { + t.Fatalf("unsupported fixture algorithm: %v", algo) + } + + return filepath.Join("testdata", "fixtures", dir, name) +} + +// fixtureBytes reads one fixture file fully. +func fixtureBytes(t *testing.T, algo objectid.Algorithm, name string) []byte { + t.Helper() + + path := fixturePath(t, algo, name) + dir := filepath.Dir(path) + base := filepath.Base(path) + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("open fixture root %q: %v", dir, err) + } + + defer func() { + err := root.Close() + if err != nil { + t.Fatalf("close fixture root %q: %v", dir, err) + } + }() + + data, err := root.ReadFile(base) + if err != nil { + t.Fatalf("read fixture %q: %v", base, err) + } + + return data +} + +// fixtureMetadata parses key=value metadata for one algorithm fixture set. +func fixtureMetadata(t *testing.T, algo objectid.Algorithm) map[string]string { + t.Helper() + + data := fixtureBytes(t, algo, "METADATA.txt") + + out := make(map[string]string) + for line := range strings.SplitSeq(strings.TrimSpace(string(data)), "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + key, value, ok := strings.Cut(line, "=") + if !ok { + t.Fatalf("invalid fixture metadata line %q", line) + } + + out[strings.TrimSpace(key)] = strings.TrimSpace(value) + } + + return out +} + +// fixtureOID returns one fixture metadata object ID value. +func fixtureOID(t *testing.T, algo objectid.Algorithm, key string) objectid.ObjectID { + t.Helper() + + meta := fixtureMetadata(t, algo) + + hex, ok := meta[key] + if !ok { + t.Fatalf("missing fixture metadata key %q", key) + } + + id, err := objectid.ParseHex(algo, hex) + if err != nil { + t.Fatalf("parse fixture metadata oid %q: %v", hex, err) + } + + return id +} + +// verifyReindexOracle regenerates idx/rev with upstream git index-pack and +// compares bytes with files produced by ingest. +func verifyReindexOracle(t *testing.T, repo *testgit.TestRepo, packName, idxName, revName string) { + t.Helper() + + oracleDir := t.TempDir() + oracleIdxPath := filepath.Join(oracleDir, "oracle.idx") + _ = repo.Run(t, "index-pack", "--rev-index", "-o", oracleIdxPath, filepath.Join("objects", "pack", packName)) + oracleRevPath := strings.TrimSuffix(oracleIdxPath, ".idx") + ".rev" + + packRoot := repo.OpenPackRoot(t) + + gotIdx, err := packRoot.ReadFile(idxName) + if err != nil { + t.Fatalf("read idx: %v", err) + } + + oracleRoot, err := os.OpenRoot(oracleDir) + if err != nil { + t.Fatalf("open oracle root: %v", err) + } + + defer func() { + err := oracleRoot.Close() + if err != nil { + t.Fatalf("close oracle root: %v", err) + } + }() + + wantIdx, err := oracleRoot.ReadFile(filepath.Base(oracleIdxPath)) + if err != nil { + t.Fatalf("read oracle idx: %v", err) + } + + if !bytes.Equal(gotIdx, wantIdx) { + t.Fatal("idx bytes differ from git index-pack output") + } + + gotRev, err := packRoot.ReadFile(revName) + if err != nil { + t.Fatalf("read rev: %v", err) + } + + wantRev, err := oracleRoot.ReadFile(filepath.Base(oracleRevPath)) + if err != nil { + t.Fatalf("read oracle rev: %v", err) + } + + if !bytes.Equal(gotRev, wantRev) { + t.Fatal("rev bytes differ from git index-pack output") + } +} + +func TestIngestNonThinPackWritesPackIdxRev(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + head := fixtureOID(t, algo, "head") + packBytes := fixtureBytes(t, algo, "nonthin.pack") + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + + packRoot := receiver.OpenPackRoot(t) + + result, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ + WriteRev: true, + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + + if result.ThinFixed { + t.Fatalf("ThinFixed = true, want false") + } + + if result.RevName == "" { + t.Fatal("RevName is empty") + } + + _, err = packRoot.Stat(result.PackName) + if err != nil { + t.Fatalf("stat pack: %v", err) + } + + _, err = packRoot.Stat(result.IdxName) + if err != nil { + t.Fatalf("stat idx: %v", err) + } + + _, err = packRoot.Stat(result.RevName) + if err != nil { + t.Fatalf("stat rev: %v", err) + } + + _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName)) + verifyReindexOracle(t, receiver, result.PackName, result.IdxName, result.RevName) + + receiver.UpdateRef(t, "refs/heads/main", head) + _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling") + }) +} + +func TestIngestThinPackWithoutFixReturnsUnresolved(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + thinPack := fixtureBytes(t, algo, "thin.pack") + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packRoot := receiver.OpenPackRoot(t) + + _, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ + WriteRev: true, + RequireTrailingEOF: true, + }) + if err == nil { + t.Fatal("Ingest error = nil, want error") + } + + if _, ok := errors.AsType[*ingest.ThinPackUnresolvedError](err); !ok { + t.Fatalf("Ingest error type = %T (%v), want *ThinPackUnresolvedError", err, err) + } + + entries, err := fs.ReadDir(packRoot.FS(), ".") + if err != nil { + t.Fatalf("ReadDir(pack): %v", err) + } + + for _, entry := range entries { + if strings.HasSuffix(entry.Name(), ".pack") { + t.Fatalf("found finalized pack file after failure: %v", entry.Name()) + } + } + }) +} + +func TestIngestThinPackWithFixThin(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + head := fixtureOID(t, algo, "head") + basePack := fixtureBytes(t, algo, "base.pack") + thinPack := fixtureBytes(t, algo, "thin.pack") + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + + packRoot := receiver.OpenPackRoot(t) + + _, err := beginAndContinue(bytes.NewReader(basePack), packRoot, algo, ingest.Options{ + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("ingest base pack: %v", err) + } + + receiverRepo := receiver.OpenRepository(t) + + result, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ + FixThin: true, + WriteRev: true, + Base: receiverRepo.Objects(), + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest(thin): %v", err) + } + + if !result.ThinFixed { + t.Fatal("ThinFixed = false, want true") + } + + _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName)) + verifyReindexOracle(t, receiver, result.PackName, result.IdxName, result.RevName) + receiver.UpdateRef(t, "refs/heads/main", head) + _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling") + }) +} + +func TestIngestPackTrailerMismatch(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + packBytes := fixtureBytes(t, algo, "nonthin.pack") + if len(packBytes) == 0 { + t.Fatal("empty pack stream") + } + + packBytes[len(packBytes)-1] ^= 0xff + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packRoot := receiver.OpenPackRoot(t) + + _, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ + WriteRev: true, + RequireTrailingEOF: true, + }) + if err == nil { + t.Fatal("Ingest error = nil, want error") + } + + if _, ok := errors.AsType[*ingest.PackTrailerMismatchError](err); !ok { + t.Fatalf("Ingest error type = %T (%v), want *PackTrailerMismatchError", err, err) + } + + entries, err := fs.ReadDir(packRoot.FS(), ".") + if err != nil { + t.Fatalf("ReadDir(pack): %v", err) + } + + for _, entry := range entries { + if strings.HasSuffix(entry.Name(), ".pack") { + t.Fatalf("found finalized pack file after failure: %v", entry.Name()) + } + } + }) +} + +func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte { + t.Helper() + + hashImpl, err := algo.New() + if err != nil { + t.Fatalf("algo.New: %v", err) + } + + var header [12]byte + copy(header[:4], []byte{'P', 'A', 'C', 'K'}) + binary.BigEndian.PutUint32(header[4:8], 2) + binary.BigEndian.PutUint32(header[8:12], 0) + + _, _ = hashImpl.Write(header[:]) + + return append(header[:], hashImpl.Sum(nil)...) +} + +func TestIngestDiscardZeroObjectPack(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + packBytes := zeroObjectPackBytes(t, algo) + + pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{ + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + + if pending.Header().ObjectCount != 0 { + t.Fatalf("ObjectCount = %d, want 0", pending.Header().ObjectCount) + } + + discarded, err := pending.Discard() + if err != nil { + t.Fatalf("Discard: %v", err) + } + + if discarded.ObjectCount != 0 { + t.Fatalf("Discard.ObjectCount = %d, want 0", discarded.ObjectCount) + } + }) +} + +func TestIngestContinueRejectsZeroObjectPack(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + packBytes := zeroObjectPackBytes(t, algo) + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packRoot := receiver.OpenPackRoot(t) + + pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{ + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + + _, err = pending.Continue(packRoot) + if !errors.Is(err, ingest.ErrZeroObjectContinue) { + t.Fatalf("Continue error = %v, want ErrZeroObjectContinue", err) + } + }) +} + +func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + head := fixtureOID(t, algo, "head") + packBytes := fixtureBytes(t, algo, "nonthin.pack") + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packRoot := receiver.OpenPackRoot(t) + + result, err := beginAndContinue(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{ + WriteRev: true, + }) + if err != nil { + t.Fatalf("Ingest without trailing EOF: %v", err) + } + + receiver.UpdateRef(t, "refs/heads/main", head) + _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName)) + _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling") + }) +} diff --git a/packfile/ingest/progress_write.go b/packfile/ingest/progress_write.go new file mode 100644 index 00000000..5b9f184b --- /dev/null +++ b/packfile/ingest/progress_write.go @@ -0,0 +1,11 @@ +package ingest + +import "codeberg.org/lindenii/furgit/internal/utils" + +func writeProgressf(state *ingestState, format string, args ...any) { + utils.BestEffortFprintf(state.opts.Progress, format, args...) + + if state.opts.ProgressFlush != nil { + _ = state.opts.ProgressFlush() + } +} diff --git a/packfile/ingest/record_content.go b/packfile/ingest/record_content.go new file mode 100644 index 00000000..05972b22 --- /dev/null +++ b/packfile/ingest/record_content.go @@ -0,0 +1,30 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objecttype" + packfmt "codeberg.org/lindenii/furgit/packfile" +) + +// readBaseRecordContent reads canonical base content for one non-delta record. +func readBaseRecordContent(state *ingestState, idx int) (objecttype.Type, []byte, error) { + record := state.records[idx] + if !packfmt.IsBaseObjectType(record.packedType) { + return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record %d is not a base object", idx) + } + + content, err := inflateRecordPayload(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + if int64(len(content)) != record.declaredSize { + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: fmt.Sprintf("base content size mismatch got %d want %d", len(content), record.declaredSize), + } + } + + return record.packedType, content, nil +} diff --git a/packfile/ingest/record_delta.go b/packfile/ingest/record_delta.go new file mode 100644 index 00000000..a28fcabc --- /dev/null +++ b/packfile/ingest/record_delta.go @@ -0,0 +1,60 @@ +package ingest + +import ( + "fmt" + + deltaapply "codeberg.org/lindenii/furgit/delta/apply" + "codeberg.org/lindenii/furgit/objecttype" +) + +// applyDeltaRecord applies one delta record onto base content. +func applyDeltaRecord(state *ingestState, idx int, baseType objecttype.Type, baseContent []byte) (objecttype.Type, []byte, error) { + record := state.records[idx] + if record.packedType != objecttype.TypeOfsDelta && record.packedType != objecttype.TypeRefDelta { + return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record %d is not a delta record", idx) + } + + deltaPayload, err := inflateRecordPayload(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + if int64(len(deltaPayload)) != record.declaredSize { + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: fmt.Sprintf("delta payload size mismatch got %d want %d", len(deltaPayload), record.declaredSize), + } + } + + srcSize, dstSize, err := readDeltaHeaderSizes(deltaPayload) + if err != nil { + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: fmt.Sprintf("read delta header: %v", err), + } + } + + if srcSize != len(baseContent) { + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: fmt.Sprintf("delta source size mismatch got %d want %d", srcSize, len(baseContent)), + } + } + + content, err := deltaapply.Apply(baseContent, deltaPayload) + if err != nil { + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: fmt.Sprintf("apply delta: %v", err), + } + } + + if len(content) != dstSize { + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: fmt.Sprintf("delta result size mismatch got %d want %d", len(content), dstSize), + } + } + + return baseType, content, nil +} diff --git a/packfile/ingest/record_inflate.go b/packfile/ingest/record_inflate.go new file mode 100644 index 00000000..b8eca25b --- /dev/null +++ b/packfile/ingest/record_inflate.go @@ -0,0 +1,46 @@ +package ingest + +import ( + "compress/zlib" + "fmt" + "io" + + "codeberg.org/lindenii/furgit/internal/intconv" +) + +// inflateRecordPayload inflates one record's zlib payload from pack file. +func inflateRecordPayload(state *ingestState, idx int) ([]byte, error) { + record := state.records[idx] + if record.packedLen < uint64(record.headerLen) { + return nil, &MalformedPackEntryError{Offset: record.offset, Reason: "entry packed span underflow"} + } + + compressedOffset := record.offset + uint64(record.headerLen) + compressedLen := record.packedLen - uint64(record.headerLen) + + compressedOffsetInt64, err := intconv.Uint64ToInt64(compressedOffset) + if err != nil { + return nil, err + } + + compressedLenInt64, err := intconv.Uint64ToInt64(compressedLen) + if err != nil { + return nil, err + } + + section := io.NewSectionReader(state.packFile, compressedOffsetInt64, compressedLenInt64) + + reader, err := zlib.NewReader(section) + if err != nil { + return nil, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("open payload zlib: %v", err)} + } + + defer func() { _ = reader.Close() }() + + out, err := io.ReadAll(reader) + if err != nil { + return nil, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate payload: %v", err)} + } + + return out, nil +} diff --git a/packfile/ingest/record_resolve.go b/packfile/ingest/record_resolve.go new file mode 100644 index 00000000..1eec1858 --- /dev/null +++ b/packfile/ingest/record_resolve.go @@ -0,0 +1,117 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objecttype" + packfmt "codeberg.org/lindenii/furgit/packfile" +) + +// resolveRecord resolves one record and returns canonical type/content. +func resolveRecord(state *ingestState, idx int, visiting map[int]struct{}) (objecttype.Type, []byte, error) { + if idx < 0 || idx >= len(state.records) { + return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record index out of bounds") + } + + if _, ok := visiting[idx]; ok { + return objecttype.TypeInvalid, nil, &DeltaCycleError{Offset: state.records[idx].offset} + } + + visiting[idx] = struct{}{} + defer delete(visiting, idx) + + record := &state.records[idx] + if ty, content, ok := state.baseCache.get(idx); ok { + return ty, content, nil + } + + if packfmt.IsBaseObjectType(record.packedType) { + ty, content, err := readBaseRecordContent(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + if record.resolved { + state.baseCache.add(idx, record.realType, content) + + return record.realType, content, nil + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + record.objectID = id + record.realType = ty + record.resolved = true + state.objectToRecord[id] = idx + state.baseCache.add(idx, ty, content) + + return ty, content, nil + } + + var ( + baseType objecttype.Type + baseContent []byte + err error + ) + switch record.packedType { + case objecttype.TypeOfsDelta: + baseIdx, ok := state.offsetToRecord[record.baseOffset] + if !ok { + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: "missing ofs-delta base entry", + } + } + + baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + case objecttype.TypeRefDelta: + baseIdx, ok := state.objectToRecord[record.baseObject] + if ok { + baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + } else { + return objecttype.TypeInvalid, nil, errExternalThinBase + } + case objecttype.TypeInvalid, + objecttype.TypeCommit, + objecttype.TypeTree, + objecttype.TypeBlob, + objecttype.TypeTag, + objecttype.TypeFuture: + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: "unsupported delta type", + } + default: + return objecttype.TypeInvalid, nil, &MalformedPackEntryError{ + Offset: record.offset, + Reason: "unsupported delta type", + } + } + + ty, content, err := applyDeltaRecord(state, idx, baseType, baseContent) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + record.objectID = id + record.realType = ty + record.resolved = true + state.objectToRecord[id] = idx + state.baseCache.add(idx, ty, content) + + return ty, content, nil +} diff --git a/packfile/ingest/records.go b/packfile/ingest/records.go new file mode 100644 index 00000000..06b7f708 --- /dev/null +++ b/packfile/ingest/records.go @@ -0,0 +1,46 @@ +package ingest + +import ( + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// objectRecord stores metadata for one packed object entry. +type objectRecord struct { + // offset is the entry start offset in the pack file. + offset uint64 + // headerLen is packed entry header length in bytes. + headerLen uint32 + // packedLen is total packed entry length in bytes. + packedLen uint64 + // crc32 is the CRC over the full packed entry. + crc32 uint32 + // packedType is the entry type tag from the pack stream. + packedType objecttype.Type + // realType is canonical object type after delta resolution. + realType objecttype.Type + // declaredSize is the declared output object size for this entry. + declaredSize int64 + // dataOffset is compressed payload start offset for this entry. + dataOffset uint64 + // baseOffset is OFS base offset when packedType is OFS delta. + baseOffset uint64 + // baseObject is REF base object ID when packedType is REF delta. + baseObject objectid.ObjectID + // objectID is final resolved object ID. + objectID objectid.ObjectID + // resolved reports whether objectID/realType are finalized. + resolved bool +} + +// ofsDeltaRef maps one OFS delta record to its base offset. +type ofsDeltaRef struct { + baseOffset uint64 + recordIdx int +} + +// refDeltaRef maps one REF delta record to its base object ID. +type refDeltaRef struct { + baseObject objectid.ObjectID + recordIdx int +} diff --git a/packfile/ingest/resolve_all.go b/packfile/ingest/resolve_all.go new file mode 100644 index 00000000..e0ad2281 --- /dev/null +++ b/packfile/ingest/resolve_all.go @@ -0,0 +1,71 @@ +package ingest + +import ( + "errors" + + "codeberg.org/lindenii/furgit/internal/progress" +) + +// resolveAll resolves all delta records and finalizes ObjectID/RealType for every record. +func resolveAll(state *ingestState) error { + state.unresolvedRefDeltas = state.unresolvedRefDeltas[:0] + + var pending uint32 + + for idx := range state.records { + if !state.records[idx].resolved { + pending++ + } + } + + if pending == 0 { + return nil + } + + var done uint32 + + meter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "resolving deltas", + Total: uint64(pending), + }) + + for idx := range state.records { + if state.records[idx].resolved { + continue + } + + done++ + meter.Set(uint64(done), 0) + + visiting := make(map[int]struct{}) + + ty, content, err := resolveRecord(state, idx, visiting) + if err != nil { + if errors.Is(err, errExternalThinBase) { + state.unresolvedRefDeltas = append(state.unresolvedRefDeltas, idx) + + continue + } + + return err + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return err + } + + record := &state.records[idx] + record.realType = ty + record.objectID = id + record.resolved = true + state.objectToRecord[id] = idx + state.baseCache.add(idx, ty, content) + } + + meter.Stop("done") + + return nil +} diff --git a/packfile/ingest/rev_write.go b/packfile/ingest/rev_write.go new file mode 100644 index 00000000..f8c30c1b --- /dev/null +++ b/packfile/ingest/rev_write.go @@ -0,0 +1,138 @@ +package ingest + +import ( + "encoding/binary" + "slices" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/internal/progress" +) + +const ( + revMagic = 0x52494458 + revVersion = 1 +) + +// writeRev writes rev index for resolved records. +func writeRev(state *ingestState) error { + if !state.opts.WriteRev { + return nil + } + + idxOrder := buildIdxOrder(state) + + recordToIdxPos := make([]int, len(state.records)) + for pos, recordIdx := range idxOrder { + recordToIdxPos[recordIdx] = pos + } + + packOrder := buildPackOrder(state) + + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + var scratch [8]byte + + writeProgressf(state, "writing reverse index header...\r") + binary.BigEndian.PutUint32(scratch[:4], revMagic) + + err = writeAndHash(state.revFile, hashImpl, scratch[:4]) + if err != nil { + return err + } + + binary.BigEndian.PutUint32(scratch[:4], revVersion) + + err = writeAndHash(state.revFile, hashImpl, scratch[:4]) + if err != nil { + return err + } + + binary.BigEndian.PutUint32(scratch[:4], state.algo.PackHashID()) + + err = writeAndHash(state.revFile, hashImpl, scratch[:4]) + if err != nil { + return err + } + + writeProgressf(state, "writing reverse index header: done.\n") + + entriesMeter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "writing reverse index entries", + Total: uint64(len(packOrder)), + }) + + var entriesDone uint64 + + for _, recordIdx := range packOrder { + recordPos, err := intconv.IntToUint32(recordToIdxPos[recordIdx]) + if err != nil { + return err + } + + binary.BigEndian.PutUint32(scratch[:4], recordPos) + + err = writeAndHash(state.revFile, hashImpl, scratch[:4]) + if err != nil { + return err + } + + entriesDone++ + entriesMeter.Set(entriesDone, 0) + } + + if entriesDone > 0 { + entriesMeter.Stop("done") + } + + writeProgressf(state, "writing reverse index trailer...\r") + + err = writeAndHash(state.revFile, hashImpl, state.packHash.Bytes()) + if err != nil { + return err + } + + revHash := hashImpl.Sum(nil) + + _, err = state.revFile.Write(revHash) + if err != nil { + return err + } + + err = state.revFile.Sync() + if err != nil { + return err + } + + writeProgressf(state, "writing reverse index trailer: done.\n") + + return nil +} + +// buildPackOrder returns record indexes sorted by pack offset. +func buildPackOrder(state *ingestState) []int { + out := make([]int, 0, len(state.records)) + for idx := range state.records { + out = append(out, idx) + } + + slices.SortFunc(out, func(a, b int) int { + offA := state.records[a].offset + + offB := state.records[b].offset + switch { + case offA < offB: + return -1 + case offA > offB: + return 1 + default: + return 0 + } + }) + + return out +} diff --git a/packfile/ingest/rewrite_header_trailer.go b/packfile/ingest/rewrite_header_trailer.go new file mode 100644 index 00000000..c5f858de --- /dev/null +++ b/packfile/ingest/rewrite_header_trailer.go @@ -0,0 +1,89 @@ +package ingest + +import ( + "encoding/binary" + "io" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objectid" +) + +// rewritePackHeaderAndTrailer rewrites object count and trailer hash using ReadAt/WriteAt. +func rewritePackHeaderAndTrailer(state *ingestState) error { + var countRaw [4]byte + + recordCountUint32, err := intconv.IntToUint32(len(state.records)) + if err != nil { + return err + } + + binary.BigEndian.PutUint32(countRaw[:], recordCountUint32) + + _, err = state.packFile.WriteAt(countRaw[:], 8) + if err != nil { + return err + } + + info, err := state.packFile.Stat() + if err != nil { + return err + } + + endWithoutTrailer := info.Size() + + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + var ( + buf [128 << 10]byte + pos int64 + ) + for pos < endWithoutTrailer { + want := int64(len(buf)) + + remaining := endWithoutTrailer - pos + if remaining < want { + want = remaining + } + + n, err := state.packFile.ReadAt(buf[:want], pos) + if err != nil && err != io.EOF { + return err + } + + if n == 0 { + return io.ErrUnexpectedEOF + } + + _, _ = hashImpl.Write(buf[:n]) + pos += int64(n) + } + + sum := hashImpl.Sum(nil) + + _, err = state.packFile.WriteAt(sum, endWithoutTrailer) + if err != nil { + return err + } + + packHash, err := objectid.FromBytes(state.algo, sum) + if err != nil { + return err + } + + state.packHash = packHash + state.objectCountHeader = recordCountUint32 + + sumLenInt64 := int64(len(sum)) + + newConsumed, err := intconv.Int64ToUint64(endWithoutTrailer + sumLenInt64) + if err != nil { + return err + } + + state.stream.consumed = newConsumed + + return nil +} diff --git a/packfile/ingest/scan.go b/packfile/ingest/scan.go new file mode 100644 index 00000000..41916cdf --- /dev/null +++ b/packfile/ingest/scan.go @@ -0,0 +1,106 @@ +package ingest + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/internal/progress" + "codeberg.org/lindenii/furgit/objectid" +) + +// streamPackAndScan copies src into temp .pack while scanning packed entries. +func streamPackAndScan(state *ingestState) error { + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + state.stream = newStreamScanner( + state.src, + state.packFile, + hashImpl, + state.algo.Size(), + ) + + writeProgressf(state, "validating pack header...\r") + + err = seedStreamWithPackHeader(state) + if err != nil { + return err + } + + writeProgressf(state, "validating pack header: done.\n") + + state.records = make([]objectRecord, 0, state.objectCountHeader) + state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader) + state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader) + + total := state.objectCountHeader + meter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "receiving objects", + Total: uint64(total), + Throughput: true, + }) + + for i := range total { + nextOffset, err := scanOneEntry(state, state.stream.consumed) + if err != nil { + return err + } + + if nextOffset != state.stream.consumed { + return fmt.Errorf("packfile/ingest: internal stream offset mismatch") + } + + done := i + 1 + meter.Set(uint64(done), state.stream.consumed) + } + + meter.Stop("done") + + err = state.stream.finishAndFlushTrailer(state.opts.RequireTrailingEOF) + if err != nil { + return err + } + + if len(state.stream.packTrailer) != state.algo.Size() { + return fmt.Errorf("packfile/ingest: invalid trailer size") + } + + packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer) + if err != nil { + return err + } + + state.packHash = packHash + + return state.stream.flush() +} + +// seedStreamWithPackHeader writes the already-validated PACK header to output, +// seeds the running pack hash, and advances stream offset accounting. +func seedStreamWithPackHeader(state *ingestState) error { + written := 0 + for written < len(state.packHeaderRaw) { + n, err := state.packFile.Write(state.packHeaderRaw[written:]) + if err != nil { + return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)} + } + + if n == 0 { + return &DestinationWriteError{Op: "write pack header: short write"} + } + + written += n + } + + _, err := state.stream.hash.Write(state.packHeaderRaw[:]) + if err != nil { + return err + } + + state.stream.consumed = packHeaderSize + + return nil +} diff --git a/packfile/ingest/state.go b/packfile/ingest/state.go new file mode 100644 index 00000000..d44b6e09 --- /dev/null +++ b/packfile/ingest/state.go @@ -0,0 +1,70 @@ +package ingest + +import ( + "io" + "os" + + "codeberg.org/lindenii/furgit/objectid" +) + +const ( + defaultDeltaBaseCacheMaxBytes = 32 << 20 +) + +// ingestState holds mutable state for one Ingest call. +type ingestState struct { + src io.Reader + destination *os.Root + algo objectid.Algorithm + opts Options + + packHeaderRaw [packHeaderSize]byte + + packFile *os.File + packTmpName string + idxFile *os.File + idxTmpName string + revFile *os.File + revTmpName string + + stream *streamScanner + + records []objectRecord + ofsDeltas []ofsDeltaRef + refDeltas []refDeltaRef + unresolvedRefDeltas []int + offsetToRecord map[uint64]int + objectToRecord map[objectid.ObjectID]int + + baseCache *deltaBaseCache + packHash objectid.ObjectID + + objectCountHeader uint32 + thinFixed bool +} + +// newIngestState constructs one call-local ingest state. +func newIngestState( + src io.Reader, + destination *os.Root, + algo objectid.Algorithm, + opts Options, + header HeaderInfo, + headerRaw [packHeaderSize]byte, +) (*ingestState, error) { + if algo.Size() == 0 { + return nil, objectid.ErrInvalidAlgorithm + } + + return &ingestState{ + src: src, + destination: destination, + algo: algo, + opts: opts, + packHeaderRaw: headerRaw, + objectCountHeader: header.ObjectCount, + offsetToRecord: make(map[uint64]int), + objectToRecord: make(map[objectid.ObjectID]int), + baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes), + }, nil +} diff --git a/packfile/ingest/stream.go b/packfile/ingest/stream.go new file mode 100644 index 00000000..a403087a --- /dev/null +++ b/packfile/ingest/stream.go @@ -0,0 +1,111 @@ +package ingest + +import ( + "errors" + "hash" + "io" + "os" +) + +const streamScannerBufferSize = 64 << 10 + +// streamScanner incrementally reads/consumes one pack stream while mirroring +// consumed bytes into one destination pack file. +type streamScanner struct { + src io.Reader + dstFile *os.File + + // Input buffer window: buf[off:n] is unread. + buf []byte + off int + n int + + // Absolute consumed stream bytes. + consumed uint64 + + // Running pack hash over consumed bytes while hashEnabled is true. + hash hash.Hash + hashSize int + hashEnabled bool + + // Entry CRC state while one entry is being consumed. + entryCRC uint32 + inEntryCRC bool + + packTrailer []byte +} + +// newStreamScanner constructs one scanner with fixed input buffering. +func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner { + return &streamScanner{ + src: src, + dstFile: dstFile, + buf: make([]byte, streamScannerBufferSize), + hash: hash, + hashSize: hashSize, + hashEnabled: true, + } +} + +// Read implements io.Reader. +func (scanner *streamScanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + if scanner.n-scanner.off == 0 { + err := scanner.fill(1) + if err != nil { + if errors.Is(err, io.EOF) { + return 0, io.EOF + } + + return 0, err + } + } + + unread := scanner.n - scanner.off + if unread == 0 { + return 0, io.EOF + } + + n := min(len(dst), unread) + + copy(dst, scanner.buf[scanner.off:scanner.off+n]) + + err := scanner.use(n) + if err != nil { + return 0, err + } + + return n, nil +} + +// ReadByte implements io.ByteReader without allocation. +func (scanner *streamScanner) ReadByte() (byte, error) { + if scanner.n-scanner.off == 0 { + err := scanner.fill(1) + if err != nil { + return 0, err + } + } + + b := scanner.buf[scanner.off] + + err := scanner.use(1) + if err != nil { + return 0, err + } + + return b, nil +} + +// readFull reads exactly len(dst) bytes through receiver. +func (scanner *streamScanner) readFull(dst []byte) error { + _, err := io.ReadFull(scanner, dst) + if err != nil { + return err + } + + return nil +} diff --git a/packfile/ingest/temp.go b/packfile/ingest/temp.go new file mode 100644 index 00000000..d0b7862c --- /dev/null +++ b/packfile/ingest/temp.go @@ -0,0 +1,103 @@ +package ingest + +import ( + "crypto/rand" + "errors" + "fmt" + "io/fs" + "os" +) + +// openTemporaryArtifacts creates/open temp pack/idx/(rev) files under destination. +func openTemporaryArtifacts(state *ingestState) error { + packName, packFile, err := createTempFile(state.destination, "tmp_pack_") + if err != nil { + return err + } + + idxName, idxFile, err := createTempFile(state.destination, "tmp_idx_") + if err != nil { + _ = packFile.Close() + _ = state.destination.Remove(packName) + + return err + } + + revName := "" + + var revFile *os.File + if state.opts.WriteRev { + revName, revFile, err = createTempFile(state.destination, "tmp_rev_") + if err != nil { + _ = idxFile.Close() + _ = state.destination.Remove(idxName) + _ = packFile.Close() + _ = state.destination.Remove(packName) + + return err + } + } + + state.packTmpName = packName + state.packFile = packFile + state.idxTmpName = idxName + state.idxFile = idxFile + state.revTmpName = revName + state.revFile = revFile + + return nil +} + +// closeTemporaryArtifacts closes all temporary artifact file descriptors. +func closeTemporaryArtifacts(state *ingestState) error { + var out error + + if state.packFile != nil { + err := state.packFile.Close() + if err != nil && out == nil { + out = err + } + + state.packFile = nil + } + + if state.idxFile != nil { + err := state.idxFile.Close() + if err != nil && out == nil { + out = err + } + + state.idxFile = nil + } + + if state.revFile != nil { + err := state.revFile.Close() + if err != nil && out == nil { + out = err + } + + state.revFile = nil + } + + return out +} + +// createTempFile creates one temporary file under root using prefix. +func createTempFile(root *os.Root, prefix string) (string, *os.File, error) { + for range 32 { + name := prefix + rand.Text() + + file, err := root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644) + if err == nil { + return name, file, nil + } + + if errors.Is(err, fs.ErrExist) { + continue + } + + return "", nil, fmt.Errorf("packfile/ingest: create temp file %q: %w", name, err) + } + + return "", nil, fmt.Errorf("packfile/ingest: unable to create temporary file for prefix %q", prefix) +} diff --git a/packfile/ingest/testdata/fixtures/sha1/METADATA.txt b/packfile/ingest/testdata/fixtures/sha1/METADATA.txt new file mode 100644 index 00000000..5fcbfe26 --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha1/METADATA.txt @@ -0,0 +1,3 @@ +format=sha1 +head=200c960359dad025b4170284c518919eb4a24305 +base=4bc507fc631ea78474d83c47548743c9f1dda0dc diff --git a/packfile/ingest/testdata/fixtures/sha1/base.pack b/packfile/ingest/testdata/fixtures/sha1/base.pack Binary files differnew file mode 100644 index 00000000..3d7a4903 --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha1/base.pack diff --git a/packfile/ingest/testdata/fixtures/sha1/nonthin.pack b/packfile/ingest/testdata/fixtures/sha1/nonthin.pack Binary files differnew file mode 100644 index 00000000..ea07c9a0 --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha1/nonthin.pack diff --git a/packfile/ingest/testdata/fixtures/sha1/thin.pack b/packfile/ingest/testdata/fixtures/sha1/thin.pack Binary files differnew file mode 100644 index 00000000..95084feb --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha1/thin.pack diff --git a/packfile/ingest/testdata/fixtures/sha256/METADATA.txt b/packfile/ingest/testdata/fixtures/sha256/METADATA.txt new file mode 100644 index 00000000..8a5ea0a2 --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha256/METADATA.txt @@ -0,0 +1,3 @@ +format=sha256 +head=35cc0f4cd1c73524187540494058d233a2ecbd071c85d496a2250d8e0c805ef8 +base=b4abe46895f0bb5aa22fd42d28d428413f265359734c288752e3c2d270eec276 diff --git a/packfile/ingest/testdata/fixtures/sha256/base.pack b/packfile/ingest/testdata/fixtures/sha256/base.pack Binary files differnew file mode 100644 index 00000000..52ceef74 --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha256/base.pack diff --git a/packfile/ingest/testdata/fixtures/sha256/nonthin.pack b/packfile/ingest/testdata/fixtures/sha256/nonthin.pack Binary files differnew file mode 100644 index 00000000..50db05d0 --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha256/nonthin.pack diff --git a/packfile/ingest/testdata/fixtures/sha256/thin.pack b/packfile/ingest/testdata/fixtures/sha256/thin.pack Binary files differnew file mode 100644 index 00000000..b331b915 --- /dev/null +++ b/packfile/ingest/testdata/fixtures/sha256/thin.pack diff --git a/packfile/ingest/thin_append.go b/packfile/ingest/thin_append.go new file mode 100644 index 00000000..161f00a9 --- /dev/null +++ b/packfile/ingest/thin_append.go @@ -0,0 +1,91 @@ +package ingest + +import ( + "compress/zlib" + "hash/crc32" + "io" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// appendBaseObject appends one base object as a new packed non-delta entry. +func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) { + start := state.stream.consumed + + header := encodePackEntryHeader(realType, int64(len(content))) + + startInt64, err := intconv.Uint64ToInt64(start) + if err != nil { + return 0, err + } + + _, err = state.packFile.WriteAt(header, startInt64) + if err != nil { + return 0, err + } + + headerLenInt64 := int64(len(header)) + section := &fileSectionWriter{file: state.packFile, off: startInt64 + headerLenInt64} + crc := crc32.NewIEEE() + + _, err = crc.Write(header) + if err != nil { + return 0, err + } + + counting := &countingWriter{dst: section} + + zw := zlib.NewWriter(io.MultiWriter(counting, crc)) + + _, err = zw.Write(content) + if err != nil { + return 0, err + } + + err = zw.Close() + if err != nil { + return 0, err + } + + headerLenUint64, err := intconv.IntToUint64(len(header)) + if err != nil { + return 0, err + } + + countingNUint64, err := intconv.IntToUint64(counting.n) + if err != nil { + return 0, err + } + + packedLen := headerLenUint64 + countingNUint64 + end := start + packedLen + state.stream.consumed = end + + headerLenUint32, err := intconv.IntToUint32(len(header)) + if err != nil { + return 0, err + } + + record := objectRecord{ + offset: start, + headerLen: headerLenUint32, + packedLen: packedLen, + crc32: crc.Sum32(), + packedType: realType, + realType: realType, + declaredSize: int64(len(content)), + dataOffset: start + headerLenUint64, + objectID: id, + resolved: true, + } + + recordIdx := len(state.records) + state.records = append(state.records, record) + state.offsetToRecord[start] = recordIdx + state.objectToRecord[id] = recordIdx + state.baseCache.add(recordIdx, realType, content) + + return recordIdx, nil +} diff --git a/packfile/ingest/thin_fix.go b/packfile/ingest/thin_fix.go new file mode 100644 index 00000000..f94d8c0b --- /dev/null +++ b/packfile/ingest/thin_fix.go @@ -0,0 +1,100 @@ +package ingest + +import ( + "errors" + "fmt" + + "codeberg.org/lindenii/furgit/internal/intconv" + "codeberg.org/lindenii/furgit/internal/progress" + "codeberg.org/lindenii/furgit/objectstore" +) + +// maybeFixThin appends missing bases and rewrites pack header/trailer when needed. +func maybeFixThin(state *ingestState) error { + if len(state.unresolvedRefDeltas) == 0 { + return nil + } + + writeProgressf( + state, + "fixing thin pack: %d unresolved bases\r", + len(state.unresolvedRefDeltas), + ) + + if !state.opts.FixThin { + return &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)} + } + + if state.opts.Base == nil { + return &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)} + } + + hashSize := int64(state.algo.Size()) + + info, err := state.packFile.Stat() + if err != nil { + return err + } + + size := info.Size() + if size < hashSize { + return fmt.Errorf("packfile/ingest: pack too short to trim trailer") + } + + newEnd := size - hashSize + + err = state.packFile.Truncate(newEnd) + if err != nil { + return err + } + + consumed, err := intconv.Int64ToUint64(newEnd) + if err != nil { + return err + } + + state.stream.consumed = consumed + + baseIDs := unresolvedThinBaseIDs(state) + + total := len(baseIDs) + meter := progress.New(progress.Options{ + Writer: state.opts.Progress, + Flush: state.opts.ProgressFlush, + Title: "fixing thin pack", + Total: uint64(total), + }) + meter.Set(0, 0) + + var appended uint64 + + for _, id := range baseIDs { + ty, content, err := state.opts.Base.ReadBytesContent(id) + if err != nil { + if errors.Is(err, objectstore.ErrObjectNotFound) { + continue + } + + return fmt.Errorf("packfile/ingest: read thin base %s: %w", id, err) + } + + _, err = appendBaseObject(state, id, ty, content) + if err != nil { + return err + } + + state.thinFixed = true + + appended++ + meter.Set(appended, 0) + } + + err = rewritePackHeaderAndTrailer(state) + if err != nil { + return err + } + + meter.Stop(fmt.Sprintf("appended %d/%d, done", appended, total)) + + return nil +} diff --git a/packfile/ingest/thin_unresolved.go b/packfile/ingest/thin_unresolved.go new file mode 100644 index 00000000..347f3f29 --- /dev/null +++ b/packfile/ingest/thin_unresolved.go @@ -0,0 +1,34 @@ +package ingest + +import ( + "bytes" + "slices" + + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// unresolvedThinBaseIDs returns sorted unique unresolved ref base IDs. +func unresolvedThinBaseIDs(state *ingestState) []objectid.ObjectID { + seen := make(map[objectid.ObjectID]struct{}) + + for _, idx := range state.unresolvedRefDeltas { + record := state.records[idx] + if record.packedType != objecttype.TypeRefDelta { + continue + } + + seen[record.baseObject] = struct{}{} + } + + out := make([]objectid.ObjectID, 0, len(seen)) + for id := range seen { + out = append(out, id) + } + + slices.SortFunc(out, func(a, b objectid.ObjectID) int { + return bytes.Compare(a.RawBytes(), b.RawBytes()) + }) + + return out +} diff --git a/packfile/ingest/trailer.go b/packfile/ingest/trailer.go new file mode 100644 index 00000000..7a26a8f2 --- /dev/null +++ b/packfile/ingest/trailer.go @@ -0,0 +1,58 @@ +package ingest + +import ( + "bytes" + "errors" + "fmt" + "io" +) + +// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum, +// and optionally requires the source stream to hit EOF afterward. +func (scanner *streamScanner) finishAndFlushTrailer(requireTrailingEOF bool) error { + if scanner.hashSize <= 0 { + return fmt.Errorf("packfile/ingest: invalid hash size") + } + + trailer := make([]byte, scanner.hashSize) + + scanner.hashEnabled = false + + err := scanner.readFull(trailer) + if err != nil { + return &PackTrailerMismatchError{} + } + + scanner.packTrailer = append(scanner.packTrailer[:0], trailer...) + + if scanner.n-scanner.off > 0 { + return fmt.Errorf("packfile/ingest: pack has trailing garbage") + } + + if !requireTrailingEOF { + computed := scanner.hash.Sum(nil) + if !bytes.Equal(computed, trailer) { + return &PackTrailerMismatchError{} + } + + return nil + } + + var probe [1]byte + + n, err := scanner.Read(probe[:]) + if n > 0 || err == nil { + return fmt.Errorf("packfile/ingest: pack has trailing garbage") + } + + if !errors.Is(err, io.EOF) { + return err + } + + computed := scanner.hash.Sum(nil) + if !bytes.Equal(computed, trailer) { + return &PackTrailerMismatchError{} + } + + return nil +} diff --git a/packfile/ingest/use.go b/packfile/ingest/use.go new file mode 100644 index 00000000..97f8757a --- /dev/null +++ b/packfile/ingest/use.go @@ -0,0 +1,34 @@ +package ingest + +import ( + "fmt" + "hash/crc32" +) + +// use consumes n unread bytes and updates accounting/checksum state. +func (scanner *streamScanner) use(n int) error { + if n < 0 || n > scanner.n-scanner.off { + return fmt.Errorf("packfile/ingest: invalid consume length %d", n) + } + + if n == 0 { + return nil + } + + chunk := scanner.buf[scanner.off : scanner.off+n] + if scanner.hashEnabled { + _, err := scanner.hash.Write(chunk) + if err != nil { + return err + } + } + + if scanner.inEntryCRC { + scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += uint64(n) + + return nil +} diff --git a/packfile/pack.go b/packfile/pack.go new file mode 100644 index 00000000..88d8fd9a --- /dev/null +++ b/packfile/pack.go @@ -0,0 +1,52 @@ +// Package pack provides Git packfile format parsing primitives. +package pack + +import ( + "fmt" + + "codeberg.org/lindenii/furgit/objecttype" +) + +// Signature is the 4-byte "PACK" magic at the start of pack files. +const Signature = 0x5041434b + +// VersionSupported reports whether one pack version is supported. +func VersionSupported(version uint32) bool { + return version == 2 || version == 3 +} + +// IsBaseObjectType reports whether ty is one of the four canonical object +// types encoded directly in pack entries. +func IsBaseObjectType(ty objecttype.Type) bool { + switch ty { + case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag: + return true + case objecttype.TypeInvalid, objecttype.TypeFuture, objecttype.TypeOfsDelta, objecttype.TypeRefDelta: + return false + default: + return false + } +} + +// ParseOfsDeltaDistance parses one ofs-delta backward distance. +func ParseOfsDeltaDistance(buf []byte) (uint64, int, error) { + if len(buf) == 0 { + return 0, 0, fmt.Errorf("packfile: malformed ofs-delta distance") + } + + b := buf[0] + dist := uint64(b & 0x7f) + + consumed := 1 + for b&0x80 != 0 { + if consumed >= len(buf) { + return 0, 0, fmt.Errorf("packfile: malformed ofs-delta distance") + } + + b = buf[consumed] + consumed++ + dist = ((dist + 1) << 7) + uint64(b&0x7f) + } + + return dist, consumed, nil +} |
