diff options
Diffstat (limited to 'object/store/packed/internal')
| -rw-r--r-- | object/store/packed/internal/ingest/basecache.go | 25 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/doc.go | 11 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/errors.go | 36 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/finalize.go | 213 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/ingest.go | 265 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/record.go | 51 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/resolve.go | 495 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/result.go | 29 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/scan.go | 477 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/thin.go | 217 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/writepack_test.go | 607 |
11 files changed, 2426 insertions, 0 deletions
diff --git a/object/store/packed/internal/ingest/basecache.go b/object/store/packed/internal/ingest/basecache.go new file mode 100644 index 00000000..77419aa7 --- /dev/null +++ b/object/store/packed/internal/ingest/basecache.go @@ -0,0 +1,25 @@ +package ingest + +import ( + "lindenii.org/go/furgit/internal/cache/clock" + "lindenii.org/go/furgit/object/typ" +) + +const baseCacheMaxWeight = 96 << 20 + +type baseCacheKey struct { + offset int +} + +type cachedContent struct { + objectType typ.Type + content []byte +} + +func newBaseCache(workers int) *clock.Clock[baseCacheKey, cachedContent] { + return clock.New(baseCacheMaxWeight*uint64(workers), baseContentWeight) //#nosec G115 +} + +func baseContentWeight(_ baseCacheKey, base cachedContent) uint64 { + return uint64(len(base.content)) + 32 +} diff --git a/object/store/packed/internal/ingest/doc.go b/object/store/packed/internal/ingest/doc.go new file mode 100644 index 00000000..67be037b --- /dev/null +++ b/object/store/packed/internal/ingest/doc.go @@ -0,0 +1,11 @@ +// Package ingest writes one incoming pack stream +// into an objects/pack directory +// as a finalized pack, index, and reverse index. +// +// WritePack streams the pack to a temporary file +// while scanning its entries, +// resolves every delta against in-pack bases, +// optionally completes thin packs from an external base reader, +// and publishes the artifacts under content-addressed names +// derived from the pack trailer hash. +package ingest diff --git a/object/store/packed/internal/ingest/errors.go b/object/store/packed/internal/ingest/errors.go new file mode 100644 index 00000000..e268182e --- /dev/null +++ b/object/store/packed/internal/ingest/errors.go @@ -0,0 +1,36 @@ +package ingest + +import ( + "errors" + "fmt" + + "lindenii.org/go/furgit/object/id" +) + +// ErrMalformedPack reports that +// the incoming pack stream is truncated, +// inconsistent, or otherwise unparseable. +var ErrMalformedPack = errors.New("object/store/packed/internal/ingest: malformed pack") + +// ErrThinPackNotPermitted reports that +// the incoming pack is thin, +// referencing bases not contained within it, +// but no external base reader was supplied to complete it. +var ErrThinPackNotPermitted = errors.New("object/store/packed/internal/ingest: thin pack not permitted: no thin base supplied") + +// ThinBasesMissingError reports that +// an incoming thin pack references base objects +// that the supplied thin base reader does not contain, +// so the pack cannot be completed. +type ThinBasesMissingError struct { + // OIDs holds the missing base object IDs, sorted. + OIDs []id.ObjectID +} + +// Error implements error. +func (e *ThinBasesMissingError) Error() string { + return fmt.Sprintf( + "object/store/packed/internal/ingest: thin pack references %d missing base objects", + len(e.OIDs), + ) +} diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go new file mode 100644 index 00000000..c6b1e2c9 --- /dev/null +++ b/object/store/packed/internal/ingest/finalize.go @@ -0,0 +1,213 @@ +package ingest + +import ( + "errors" + "fmt" + "io" + "io/fs" + "slices" + + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/format/packrev" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// finalize writes the index and reverse index, +// then links the pack, reverse index, and index +// to their content-addressed names. +func (ingestion *ingestion) finalize() (Result, error) { + err := ingestion.ctx.Err() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + entries, positions, err := ingestion.indexEntries() + if err != nil { + return Result{}, err + } + + packHash := ingestion.packHash.Bytes() + + idxTmp, err := ingestion.writeTemp("tmp_idx_", func(w io.Writer) error { + return packidx.Write(w, ingestion.objectFormat, entries, packHash) + }) + if err != nil { + return Result{}, err + } + + revTmp, err := ingestion.writeTemp("tmp_rev_", func(w io.Writer) error { + return packrev.Write(w, ingestion.objectFormat, positions, packHash) + }) + if err != nil { + return Result{}, err + } + + bloomBuilder, err := ingestion.buildBloom(entries, packHash) + if err != nil { + return Result{}, err + } + + bloomTmp, err := ingestion.writeTemp("tmp_bloom_", func(w io.Writer) error { + _, err := w.Write(bloomBuilder.Bytes()) + + return err + }) + if err != nil { + return Result{}, err + } + + objectCount, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + base := "pack-" + ingestion.packHash.String() + packFinal := base + ".pack" + idxFinal := base + ".idx" + revFinal := base + ".rev" + bloomFinal := base + ".bloom" + + // Link the data files before the index, + // since the index is what publishes the pack to readers. + artifacts := [...]struct{ tmp, final string }{ + {ingestion.packTmp, packFinal}, + {revTmp, revFinal}, + {bloomTmp, bloomFinal}, + {idxTmp, idxFinal}, + } + + var created []string + + for _, artifact := range artifacts { + linked, err := ingestion.promote(artifact.tmp, artifact.final) + if err != nil { + for i := len(created) - 1; i >= 0; i-- { + _ = ingestion.root.Remove(created[i]) + } + + return Result{}, err + } + + if linked { + created = append(created, artifact.final) + } + } + + return Result{ + PackName: packFinal, + IdxName: idxFinal, + RevName: revFinal, + BloomName: bloomFinal, + PackHash: ingestion.packHash, + ObjectCount: objectCount, + ThinFixed: ingestion.thinFixed, + }, nil +} + +// buildBloom builds a Bloom filter over the index entries' object IDs, +// bound to packHash. +func (ingestion *ingestion) buildBloom(entries []packidx.Entry, packHash []byte) (*bloom.Builder, error) { + bucketCount, k, err := bloom.RecommendParams(ingestion.objectFormat, len(entries)) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + builder, err := bloom.NewBuilder(ingestion.objectFormat, bucketCount, k, packHash) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + size := ingestion.objectFormat.Size() + for i := range entries { + builder.Add(entries[i].OID[:size]) + } + + return builder, nil +} + +// indexEntries returns the index entries in object-ID order +// and, for each record in pack order, its position in that index order. +func (ingestion *ingestion) indexEntries() ([]packidx.Entry, []uint32, error) { + order := make([]int, len(ingestion.records)) + for i := range order { + order[i] = i + } + + slices.SortFunc(order, func(left, right int) int { + return ingestion.records[left].oid.Compare(ingestion.records[right].oid) + }) + + entries := make([]packidx.Entry, len(order)) + positions := make([]uint32, len(ingestion.records)) + + for indexPosition, recordIndex := range order { + rec := &ingestion.records[recordIndex] + + var oidBytes [id.MaxObjectIDSize]byte + copy(oidBytes[:], rec.oid.RawBytes()) + + offset, err := intconv.IntToUint64(rec.offset) + if err != nil { + return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + entries[indexPosition] = packidx.Entry{ + OID: oidBytes, + Offset: offset, + CRC32: rec.crc32, + } + + position, err := intconv.IntToUint32(indexPosition) + if err != nil { + return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + positions[recordIndex] = position + } + + return entries, positions, nil +} + +// writeTemp creates a temporary file, +// writes it via write, syncs it, and returns its name. +func (ingestion *ingestion) writeTemp(prefix string, write func(io.Writer) error) (string, error) { + name, file, err := ingestion.createTemp(prefix) + if err != nil { + return "", err + } + + defer func() { _ = file.Close() }() + + err = write(file) + if err != nil { + return "", err + } + + err = file.Sync() + if err != nil { + return "", fmt.Errorf("object/store/packed/internal/ingest: syncing %q: %w", name, err) + } + + return name, nil +} + +// promote hard-links tmp to final and reports whether final was newly created. +// A pre-existing final is treated as success; rollback must not remove it. +func (ingestion *ingestion) promote(tmp, final string) (bool, error) { + err := ingestion.root.Link(tmp, final) + + switch { + case err == nil: + _ = ingestion.root.Remove(tmp) + + return true, nil + case errors.Is(err, fs.ErrExist): + _ = ingestion.root.Remove(tmp) + + return false, nil + default: + return false, fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err) + } +} diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go new file mode 100644 index 00000000..9b60af85 --- /dev/null +++ b/object/store/packed/internal/ingest/ingest.go @@ -0,0 +1,265 @@ +package ingest + +import ( + "bytes" + "context" + "crypto/rand" + "errors" + "fmt" + "io" + "io/fs" + "os" + "runtime" + "sync/atomic" + + "lindenii.org/go/furgit/internal/cache/clock" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/lgo/sync" +) + +var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names") + +// ingestion holds the state for one WritePack call. +type ingestion struct { + ctx context.Context //nolint:containedctx + + // root is the destination objects/pack directory. + root *os.Root + + // objectFormat is the pack's object format. + objectFormat id.ObjectFormat + + // opts carries the thin base reader and progress sink. + opts store.PackWriteOptions + + // src is the pack stream, positioned just past the header. + src io.Reader + + // packFile is the temporary pack file being written, + // and packTmp is its destination-relative name. + packFile *os.File + packTmp string + + // temps lists every temporary file to remove on failure. + temps []string + + // scanner streams src into packFile while scanning entries. + scanner *scanner + + // records holds one entry per object, in pack-offset order. + records []record + + // byOffset maps an entry offset to its record index, + // and byOID maps a resolved object ID to its record index. + byOffset map[int]int + byOID sync.Map[id.ObjectID, int] + + baseCache *clock.Clock[baseCacheKey, cachedContent] + + // workers is the delta-resolution concurrency. + workers int + + // headerCount is the object count declared by the pack header. + headerCount int + + // deltaCount counts delta records, accumulated during scanning. + deltaCount int + + // deltasResolved counts resolved delta records. + deltasResolved atomic.Int64 + + // packHash is the final pack trailer hash. + packHash id.ObjectID + + // thinFixed reports whether thin completion appended local bases. + thinFixed bool + + // committed suppresses temporary file removal once artifacts are published. + committed bool +} + +// WritePack ingests one pack stream into root, +// publishing a pack, index, and reverse index +// under content-addressed names derived from the pack trailer hash. +// +// WritePack consumes the pack stream through its trailer and stops there. +// It does not require src to reach EOF afterward, +// so it is safe on a still-open transport connection, +// such as receive-pack, +// whose peer keeps the connection open to read the response. +// +// The pack must be the last thing the peer sends before that response: +// any bytes arriving immediately after the trailer +// are rejected as a malformed pack. +func WritePack(ctx context.Context, root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) { + if objectFormat.Size() == 0 { + return Result{}, id.ErrInvalidObjectFormat + } + + headerRaw, count, err := readPackHeader(src) + if err != nil { + return Result{}, err + } + + workers := runtime.GOMAXPROCS(0) + + ingestion := &ingestion{ + ctx: ctx, + root: root, + objectFormat: objectFormat, + opts: opts, + src: src, + packFile: nil, + packTmp: "", + temps: nil, + scanner: nil, + records: nil, + byOffset: make(map[int]int), + baseCache: newBaseCache(workers), + workers: workers, + headerCount: count, + deltaCount: 0, + packHash: id.ObjectID{}, + thinFixed: false, + committed: false, + } + + defer ingestion.cleanup() + + if count == 0 { + return ingestion.finishEmpty(headerRaw) + } + + err = ingestion.openPackTemp(headerRaw) + if err != nil { + return Result{}, err + } + + err = ingestion.streamAndScan() + if err != nil { + return Result{}, err + } + + err = ingestion.resolveDeltas() + if err != nil { + return Result{}, err + } + + err = ingestion.packFile.Sync() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: syncing pack: %w", err) + } + + result, err := ingestion.finalize() + if err != nil { + return Result{}, err + } + + ingestion.committed = true + + return result, nil +} + +// finishEmpty verifies the trailer of a zero-object pack +// and returns success without writing any artifacts. +func (ingestion *ingestion) finishEmpty(headerRaw [packfile.HeaderLen]byte) (Result, error) { + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + trailer := make([]byte, ingestion.objectFormat.Size()) + + _, err = io.ReadFull(ingestion.src, trailer) + if err != nil { + return Result{}, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if !bytes.Equal(hashImpl.Sum(nil), trailer) { + return Result{}, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return Result{ + PackName: "", + IdxName: "", + RevName: "", + PackHash: packHash, + ObjectCount: 0, + ThinFixed: false, + }, nil +} + +// openPackTemp creates the temporary pack file, +// writes the validated header to it, +// and builds the stream scanner seeded with that header. +func (ingestion *ingestion) openPackTemp(headerRaw [packfile.HeaderLen]byte) error { + name, file, err := ingestion.createTemp("tmp_pack_") + if err != nil { + return err + } + + ingestion.packTmp = name + ingestion.packFile = file + + _, err = file.Write(headerRaw[:]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack header: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + ingestion.scanner = newScanner(ingestion.src, ingestion.packFile, hashImpl) + + return nil +} + +// createTemp creates one temporary file under root, +// recording its name for cleanup on failure. +func (ingestion *ingestion) createTemp(prefix string) (string, *os.File, error) { + for range 32 { + name := prefix + rand.Text() + + file, err := ingestion.root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644) + if err == nil { + ingestion.temps = append(ingestion.temps, name) + + return name, file, nil + } + + if !errors.Is(err, fs.ErrExist) { + return "", nil, fmt.Errorf("object/store/packed/internal/ingest: creating temp file: %w", err) + } + } + + return "", nil, errTempNamesExhausted +} + +// cleanup closes the pack file +// and removes temporary files unless the write committed. +func (ingestion *ingestion) cleanup() { + if ingestion.packFile != nil { + _ = ingestion.packFile.Close() + } + + if ingestion.committed { + return + } + + for _, name := range ingestion.temps { + _ = ingestion.root.Remove(name) + } +} diff --git a/object/store/packed/internal/ingest/record.go b/object/store/packed/internal/ingest/record.go new file mode 100644 index 00000000..4031a246 --- /dev/null +++ b/object/store/packed/internal/ingest/record.go @@ -0,0 +1,51 @@ +package ingest + +import ( + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" +) + +// record is the scanned metadata for one packed entry, +// completed in place as deltas are resolved. +// +// Records are appended in pack-offset order, +// so a record's index in the slice is also its pack order. +type record struct { + // offset is the entry's start offset in the pack. + offset int + + // headerLen is the entry header length in bytes, + // so the zlib payload begins at offset+headerLen. + headerLen int + + // packedLen is the total on-disk entry length in bytes, + // covering the header and the compressed payload. + packedLen int + + // crc32 is the CRC32 of the entry's packed bytes. + crc32 uint32 + + // packedType is the entry type as encoded in the pack. + packedType packfile.EntryType + + // declaredSize is the declared inflated payload size. + declaredSize int + + // baseOffset is the base entry offset for an ofs-delta. + baseOffset int + + // baseOID is the base object ID for a ref-delta. + baseOID id.ObjectID + + // oid is the resolved object ID, + // meaningful once resolved is true. + oid id.ObjectID + + // resolved reports whether oid is final. + resolved bool +} + +// dataOffset returns the entry's compressed payload start offset. +func (record *record) dataOffset() int { + return record.offset + record.headerLen +} diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go new file mode 100644 index 00000000..dd26cd47 --- /dev/null +++ b/object/store/packed/internal/ingest/resolve.go @@ -0,0 +1,495 @@ +package ingest + +import ( + "fmt" + "io" + "sync" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packfile/delta" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" +) + +// adjacency maps each resolvable base to its delta children: +// ofs-deltas keyed by base offset, ref-deltas keyed by base object ID. +type adjacency struct { + byOffset map[int][]int + byOID map[id.ObjectID][]int +} + +// resolveDeltas resolves every delta record into a final object ID and type, +// completing thin packs from the external base reader when required. +func (ingestion *ingestion) resolveDeltas() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "resolving deltas", + Total: ingestion.countDeltas(), + Delay: 0, + Sparse: false, + Throughput: false, + }) + + adjacency := ingestion.buildAdjacency() + + err := ingestion.resolveFrom(ingestion.resolvedRoots(), adjacency, meter) + if err != nil { + return err + } + + external := ingestion.unresolvedExternalBases() + + switch { + case len(external) == 0 && ingestion.countUnresolved() > 0: + return fmt.Errorf("%w: unresolvable delta entries", ErrMalformedPack) + case len(external) > 0: + err = ingestion.fixThin(external, adjacency, meter) + if err != nil { + return err + } + } + + meter.Stop("done") + + return nil +} + +// buildAdjacency indexes every delta record by its base, +// so a resolved base can find the children that delta against it. +func (ingestion *ingestion) buildAdjacency() adjacency { + out := adjacency{ + byOffset: make(map[int][]int), + byOID: make(map[id.ObjectID][]int), + } + + for index := range ingestion.records { + rec := &ingestion.records[index] + + switch rec.packedType { + case packfile.EntryTypeOfsDelta: + out.byOffset[rec.baseOffset] = append(out.byOffset[rec.baseOffset], index) + case packfile.EntryTypeRefDelta: + out.byOID[rec.baseOID] = append(out.byOID[rec.baseOID], index) + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } + } + + return out +} + +// item is a delta record awaiting resolution, with its delta-chain depth. +type item struct { + index int + depth int +} + +// resolver resolves deltas concurrently over a shared LIFO work stack. +// +// Each item is one delta child; +// a worker materializes its base from the cache, re-deriving on a miss, +// resolves the child, +// and pushes the child's own delta children. +// Workers park while the stack is empty but others are still working, +// and exit once it is empty and none are. +type resolver struct { + ingestion *ingestion + adjacency adjacency + meter *progress.Meter + + mu sync.Mutex + cond *sync.Cond + stack []item + active int + firstErr error +} + +func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error { + var seed []item + + for _, root := range roots { + rec := &ingestion.records[root] + for _, group := range [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]} { + for _, child := range group { + seed = append(seed, item{index: child, depth: 1}) + } + } + } + + if len(seed) == 0 { + return nil + } + + res := &resolver{ + ingestion: ingestion, + adjacency: adjacency, + meter: meter, + stack: seed, + } + res.cond = sync.NewCond(&res.mu) + + return res.run(ingestion.workers) +} + +func (res *resolver) run(workers int) error { + if workers <= 1 { + res.worker() + + return res.firstErr + } + + var wg sync.WaitGroup + + for range workers { + wg.Go(func() { + res.worker() + }) + } + + wg.Wait() + + return res.firstErr +} + +func (res *resolver) worker() { + for { + res.mu.Lock() + + for len(res.stack) == 0 && res.active > 0 && res.firstErr == nil { + res.cond.Wait() + } + + if res.firstErr != nil || len(res.stack) == 0 { + res.mu.Unlock() + + return + } + + it := res.stack[len(res.stack)-1] + res.stack = res.stack[:len(res.stack)-1] + res.active++ + res.mu.Unlock() + + kids, err := res.process(it) + + res.mu.Lock() + res.active-- + + if err != nil && res.firstErr == nil { + res.firstErr = err + } + + if res.firstErr == nil { + res.stack = append(res.stack, kids...) + } + + if res.firstErr != nil || len(kids) > 0 || (res.active == 0 && len(res.stack) == 0) { + res.cond.Broadcast() + } + + res.mu.Unlock() + } +} + +// process resolves one delta child and returns its own delta children. +func (res *resolver) process(it item) ([]item, error) { + err := res.ingestion.ctx.Err() + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + rec := &res.ingestion.records[it.index] + + parent, ok := res.ingestion.baseRecordIndex(rec) + if !ok { + return nil, fmt.Errorf("%w: entry at %d: base unavailable while resolving", ErrMalformedPack, rec.offset) + } + + baseType, baseContent, err := res.ingestion.materialize(parent) + if err != nil { + return nil, err + } + + err = res.ingestion.resolveOneChild(it.index, baseType, baseContent, res.meter) + if err != nil { + return nil, err + } + + return res.childItems(it.index, it.depth+1) +} + +// childItems returns the delta children of a just-resolved record at depth. +func (res *resolver) childItems(index, depth int) ([]item, error) { + rec := &res.ingestion.records[index] + + var kids []item + + for _, group := range [2][]int{res.adjacency.byOffset[rec.offset], res.adjacency.byOID[rec.oid]} { + for _, child := range group { + if depth > delta.MaxChainDepth { + return nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, res.ingestion.records[child].offset) + } + + kids = append(kids, item{index: child, depth: depth}) + } + } + + return kids, nil +} + +func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseContent []byte, meter *progress.Meter) error { + rec := &ingestion.records[index] + + content, err := ingestion.applyDelta(index, baseContent) + if err != nil { + return err + } + + oid, err := ingestion.hashObject(baseType, content) + if err != nil { + return err + } + + rec.oid = oid + rec.resolved = true + + ingestion.byOID.Store(oid, index) + ingestion.baseCache.Add(baseCacheKey{offset: rec.offset}, cachedContent{objectType: baseType, content: content}) + + ingestion.deltasResolved.Add(1) + meter.Add(1, 0) + + return nil +} + +// materialize returns the inflated content of an already-resolved record, +// from the base cache, +// or re-derived from the nearest cached or base ancestor on a miss. +func (ingestion *ingestion) materialize(index int) (typ.Type, []byte, error) { + var ( + zero typ.Type + chain []int + base []byte + baseType typ.Type + ) + + cur := index + + for { + rec := &ingestion.records[cur] + + if cached, ok := ingestion.baseCache.Get(baseCacheKey{offset: rec.offset}); ok { + base = cached.content + baseType = cached.objectType + + break + } + + if rec.packedType.IsBase() { + objectType, err := rec.packedType.ObjectType() + if err != nil { + return zero, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + content, err := ingestion.inflateRecord(cur) + if err != nil { + return zero, nil, err + } + + base = content + baseType = objectType + + break + } + + if len(chain) >= delta.MaxChainDepth { + return zero, nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset) + } + + chain = append(chain, cur) + + next, ok := ingestion.baseRecordIndex(rec) + if !ok { + return zero, nil, fmt.Errorf("%w: entry at %d: base unavailable while reconstructing", ErrMalformedPack, rec.offset) + } + + cur = next + } + + for i := len(chain) - 1; i >= 0; i-- { + content, err := ingestion.applyDelta(chain[i], base) + if err != nil { + return zero, nil, err + } + + ingestion.baseCache.Add(baseCacheKey{offset: ingestion.records[chain[i]].offset}, cachedContent{objectType: baseType, content: content}) + + base = content + } + + return baseType, base, nil +} + +func (ingestion *ingestion) applyDelta(index int, baseContent []byte) ([]byte, error) { + rec := &ingestion.records[index] + + deltaPayload, err := ingestion.inflateRecord(index) + if err != nil { + return nil, err + } + + baseSize, resultSize, _, err := delta.ParseHeaderSizes(deltaPayload) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + limit := ingestion.opts.MaxObjectSize + if limit > 0 && resultSize > uint64(limit) { + return nil, fmt.Errorf("%w: entry at %d: result size %d exceeds limit %d", store.ErrObjectTooLarge, rec.offset, resultSize, limit) + } + + if baseSize != uint64(len(baseContent)) { + return nil, fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset) + } + + content, err := delta.Apply(baseContent, deltaPayload) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + if uint64(len(content)) != resultSize { + return nil, fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset) + } + + return content, nil +} + +func (ingestion *ingestion) baseRecordIndex(rec *record) (int, bool) { + switch rec.packedType { + case packfile.EntryTypeOfsDelta: + index, ok := ingestion.byOffset[rec.baseOffset] + + return index, ok + case packfile.EntryTypeRefDelta: + index, ok := ingestion.byOID.Load(rec.baseOID) + + return index, ok + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } + + return 0, false +} + +// inflateRecord inflates one record's payload from the temporary pack file. +func (ingestion *ingestion) inflateRecord(index int) ([]byte, error) { + rec := &ingestion.records[index] + + offset := int64(rec.dataOffset()) + compressedLen := int64(rec.packedLen - rec.headerLen) + size := rec.declaredSize + + zr, err := zlib.NewReader(io.NewSectionReader(ingestion.packFile, offset, compressedLen)) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + out := make([]byte, size) + + _, err = io.ReadFull(zr, out) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + return out, nil +} + +// hashObject computes the object ID of one resolved object. +func (ingestion *ingestion) hashObject(objectType typ.Type, content []byte) (id.ObjectID, error) { + var zero id.ObjectID + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, objectType, len(content))) + _, _ = hashImpl.Write(content) + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return oid, nil +} + +// resolvedRoots returns the indices of every currently resolved record. +func (ingestion *ingestion) resolvedRoots() []int { + var roots []int + + for index := range ingestion.records { + if ingestion.records[index].resolved { + roots = append(roots, index) + } + } + + return roots +} + +// countDeltas returns the number of delta records. +func (ingestion *ingestion) countDeltas() int { + return ingestion.deltaCount +} + +// countUnresolved returns the number of records that remain unresolved. +// +// Every base is resolved during scanning or thin completion, +// so the unresolved records are exactly the unresolved deltas: +// the delta records minus those already resolved. +func (ingestion *ingestion) countUnresolved() int { + return ingestion.deltaCount - int(ingestion.deltasResolved.Load()) +} + +// unresolvedExternalBases returns the unique base object IDs +// of unresolved ref-deltas whose base is not present in the pack, +// in first-reference order. +func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID { + seen := make(map[id.ObjectID]struct{}) + + out := make([]id.ObjectID, 0, ingestion.deltaCount-int(ingestion.deltasResolved.Load())) + + for index := range ingestion.records { + rec := &ingestion.records[index] + if rec.resolved || rec.packedType != packfile.EntryTypeRefDelta { + continue + } + + if _, ok := ingestion.byOID.Load(rec.baseOID); ok { + continue + } + + if _, ok := seen[rec.baseOID]; ok { + continue + } + + seen[rec.baseOID] = struct{}{} + out = append(out, rec.baseOID) + } + + return out +} diff --git a/object/store/packed/internal/ingest/result.go b/object/store/packed/internal/ingest/result.go new file mode 100644 index 00000000..9cd6ef1d --- /dev/null +++ b/object/store/packed/internal/ingest/result.go @@ -0,0 +1,29 @@ +package ingest + +import "lindenii.org/go/furgit/object/id" + +// Result describes one finalized pack write. +type Result struct { + // PackName is the destination-relative name of the written pack. + PackName string + + // IdxName is the destination-relative name of the written index. + IdxName string + + // RevName is the destination-relative name of the written reverse index. + RevName string + + // BloomName is the destination-relative name of the written Bloom filter. + BloomName string + + // PackHash is the pack trailer hash + // shared by the pack, index, and reverse index. + PackHash id.ObjectID + + // ObjectCount is the number of objects in the finalized pack, + // including any bases appended during thin completion. + ObjectCount uint32 + + // ThinFixed reports whether thin completion appended local bases. + ThinFixed bool +} diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go new file mode 100644 index 00000000..2cb5c135 --- /dev/null +++ b/object/store/packed/internal/ingest/scan.go @@ -0,0 +1,477 @@ +package ingest + +import ( + "bytes" + "errors" + "fmt" + "hash" + "hash/crc32" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/lgo/intconv" +) + +// scanBufferSize is the stream scanner's fixed input window size. +const scanBufferSize = 64 << 10 + +// scanner reads one pack stream, +// mirroring consumed bytes into the destination pack file +// while maintaining the running pack hash and a per-entry CRC. +// +// It implements [io.Reader] and [io.ByteReader] +// so a zlib reader can consume an entry payload through it +// without reading past the end of that compressed stream. +type scanner struct { + src io.Reader + dst io.Writer + + // buf[off:n] is the unread window. + buf []byte + off int + n int + + // consumed counts stream bytes consumed so far. + consumed int + + // hash accumulates the pack hash over consumed bytes + // while hashing is true. + hash hash.Hash + hashing bool + + // crc accumulates the CRC of the current entry + // while crcing is true. + crc uint32 + crcing bool +} + +// newScanner constructs one scanner mirroring src into dst, +// seeding the running hash from the already-consumed pack header. +func newScanner(src io.Reader, dst io.Writer, packHash hash.Hash) *scanner { + return &scanner{ + src: src, + dst: dst, + buf: make([]byte, scanBufferSize), + consumed: packfile.HeaderLen, + hash: packHash, + hashing: true, + crc: 0, + crcing: false, + } +} + +// readPackHeader reads and validates the pack header from src, +// returning the raw header and its declared object count. +func readPackHeader(src io.Reader) ([packfile.HeaderLen]byte, int, error) { + var raw [packfile.HeaderLen]byte + + _, err := io.ReadFull(src, raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: reading header: %w", ErrMalformedPack, err) + } + + packHeader, err := packfile.ParseHeader(raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: %w", ErrMalformedPack, err) + } + + count, err := intconv.Uint32ToInt(packHeader.ObjectCount) + if err != nil { + return raw, 0, fmt.Errorf("%w: object count: %w", ErrMalformedPack, err) + } + + return raw, count, nil +} + +// Read implements [io.Reader]. +func (scanner *scanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + read := min(len(dst), scanner.n-scanner.off) + + copy(dst, scanner.buf[scanner.off:scanner.off+read]) + + err = scanner.use(read) + if err != nil { + return 0, err + } + + return read, nil +} + +// ReadByte implements [io.ByteReader] without allocation. +func (scanner *scanner) ReadByte() (byte, error) { + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + b := scanner.buf[scanner.off] + + err = scanner.use(1) + if err != nil { + return 0, err + } + + return b, nil +} + +// ensureAvailable makes at least one unread byte available, +// returning [io.EOF] once the source is exhausted. +func (scanner *scanner) ensureAvailable() error { + for scanner.n-scanner.off == 0 { + err := scanner.flushPrefix() + if err != nil { + return err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + if scanner.n-scanner.off == 0 { + return io.EOF + } + + return nil + } + + return fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 && scanner.n-scanner.off == 0 { + return io.ErrNoProgress + } + } + + return nil +} + +// peekHeader returns the unread window grown to at most maxLen bytes +// without consuming, tolerating an early end of stream. +func (scanner *scanner) peekHeader(maxLen int) ([]byte, error) { + maxLen = min(maxLen, len(scanner.buf)) + + for scanner.n-scanner.off < maxLen { + err := scanner.flushPrefix() + if err != nil { + return nil, err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 { + break + } + } + + if scanner.n-scanner.off == 0 { + return nil, fmt.Errorf("%w: unexpected end of stream", ErrMalformedPack) + } + + return scanner.buf[scanner.off:scanner.n], nil +} + +// use consumes n unread bytes, +// folding them into the running hash and entry CRC as enabled. +func (scanner *scanner) use(n int) error { + chunk := scanner.buf[scanner.off : scanner.off+n] + + if scanner.hashing { + _, err := scanner.hash.Write(chunk) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: hashing pack: %w", err) + } + } + + if scanner.crcing { + scanner.crc = crc32.Update(scanner.crc, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += n + + return nil +} + +// flushPrefix writes the consumed buffer prefix to the destination +// and compacts the unread window to the start of the buffer. +func (scanner *scanner) flushPrefix() error { + if scanner.off == 0 { + return nil + } + + _, err := scanner.dst.Write(scanner.buf[:scanner.off]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack: %w", err) + } + + unread := scanner.n - scanner.off + + copy(scanner.buf, scanner.buf[scanner.off:scanner.n]) + + scanner.off = 0 + scanner.n = unread + + return nil +} + +// beginCRC starts CRC accumulation for one entry. +func (scanner *scanner) beginCRC() { + scanner.crc = 0 + scanner.crcing = true +} + +// endCRC ends CRC accumulation and returns the entry CRC. +func (scanner *scanner) endCRC() uint32 { + crc := scanner.crc + scanner.crc = 0 + scanner.crcing = false + + return crc +} + +// finishTrailer reads and verifies the pack trailer hash, +// flushing the remaining buffered pack bytes to the destination. +// +// The trailer is mirrored to the destination but excluded from the pack hash. +func (scanner *scanner) finishTrailer(hashSize int) ([]byte, error) { + trailer := make([]byte, hashSize) + + scanner.hashing = false + + _, err := io.ReadFull(scanner, trailer) + if err != nil { + return nil, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if scanner.n-scanner.off > 0 { + return nil, fmt.Errorf("%w: trailing data after pack", ErrMalformedPack) + } + + if !bytes.Equal(scanner.hash.Sum(nil), trailer) { + return nil, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + err = scanner.flushPrefix() + if err != nil { + return nil, err + } + + return trailer, nil +} + +// streamAndScan streams the pack body to the temporary pack file, +// scanning one record per declared object and verifying the trailer. +func (ingestion *ingestion) streamAndScan() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "receiving objects", + Total: ingestion.headerCount, + Delay: 0, + Sparse: false, + Throughput: true, + }) + + prevConsumed := ingestion.scanner.consumed + + for range ingestion.headerCount { + err := ingestion.ctx.Err() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + err = ingestion.scanEntry(ingestion.scanner.consumed) + if err != nil { + return err + } + + consumed := ingestion.scanner.consumed + meter.Add(1, int64(consumed-prevConsumed)) + prevConsumed = consumed + } + + meter.Stop("done") + + trailer, err := ingestion.scanner.finishTrailer(ingestion.objectFormat.Size()) + if err != nil { + return err + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// scanEntry scans the entry beginning at start into one record. +func (ingestion *ingestion) scanEntry(start int) error { + ingestion.scanner.beginCRC() + + rec, err := ingestion.scanHeader(start) + if err != nil { + return err + } + + inflated, oid, err := ingestion.drainPayload(&rec) + if err != nil { + return err + } + + if inflated != int64(rec.declaredSize) { + return fmt.Errorf( + "%w: entry at %d: inflated size %d differs from declared %d", + ErrMalformedPack, start, inflated, rec.declaredSize, + ) + } + + rec.packedLen = ingestion.scanner.consumed - start + rec.crc32 = ingestion.scanner.endCRC() + + if rec.packedType.IsBase() { + rec.oid = oid + rec.resolved = true + } else { + ingestion.deltaCount++ + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[rec.offset] = index + + if rec.resolved { + ingestion.byOID.Store(rec.oid, index) + } + + return nil +} + +// scanHeader parses and consumes the entry header at start. +func (ingestion *ingestion) scanHeader(start int) (record, error) { + var rec record + + rec.offset = start + + window, err := ingestion.scanner.peekHeader(packfile.MaxEntryHeaderLen(ingestion.objectFormat.Size())) + if err != nil { + return rec, err + } + + entryHeader, err := packfile.ParseEntryHeader(window, ingestion.objectFormat.Size()) + if err != nil { + return rec, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, start, err) + } + + declaredSize, err := intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return rec, fmt.Errorf("%w: entry at %d: declared size overflows int: %w", ErrMalformedPack, start, err) + } + + limit := ingestion.opts.MaxObjectSize + if limit > 0 && declaredSize > limit { + return rec, fmt.Errorf("%w: entry at %d: declared size %d exceeds limit %d", store.ErrObjectTooLarge, start, declaredSize, limit) + } + + rec.packedType = entryHeader.Type + rec.declaredSize = declaredSize + rec.headerLen = entryHeader.HeaderLen + + switch entryHeader.Type { + case packfile.EntryTypeCommit, packfile.EntryTypeTree, packfile.EntryTypeBlob, packfile.EntryTypeTag: + case packfile.EntryTypeOfsDelta: + dist, err := intconv.Uint64ToInt(entryHeader.OfsDistance) + if err != nil || dist == 0 || dist > start { + return rec, fmt.Errorf("%w: entry at %d: ofs-delta base out of bounds", ErrMalformedPack, start) + } + + rec.baseOffset = start - dist + case packfile.EntryTypeRefDelta: + baseID, err := ingestion.objectFormat.FromBytes(entryHeader.RefBase[:ingestion.objectFormat.Size()]) + if err != nil { + return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + rec.baseOID = baseID + case packfile.EntryTypeInvalid, packfile.EntryTypeFuture: + return rec, fmt.Errorf("%w: entry at %d: unsupported entry type", ErrMalformedPack, start) + } + + err = ingestion.scanner.use(entryHeader.HeaderLen) + if err != nil { + return rec, err + } + + return rec, nil +} + +// drainPayload consumes one entry's compressed payload from the stream, +// returning its inflated length and, for base entries, its object ID. +func (ingestion *ingestion) drainPayload(rec *record) (int64, id.ObjectID, error) { + var zero id.ObjectID + + zr, err := zlib.NewReader(ingestion.scanner) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + if !rec.packedType.IsBase() { + read, err := io.Copy(io.Discard, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + return read, zero, nil + } + + objectType, err := rec.packedType.ObjectType() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, objectType, rec.declaredSize)) + + read, err := io.Copy(hashImpl, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return read, oid, nil +} diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go new file mode 100644 index 00000000..15773a56 --- /dev/null +++ b/object/store/packed/internal/ingest/thin.go @@ -0,0 +1,217 @@ +package ingest + +import ( + "errors" + "fmt" + "hash/crc32" + "io" + "os" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" + "lindenii.org/go/lgo/intconv" +) + +// fixThin completes a thin pack +// by appending the external bases it references, +// rewriting the pack header and trailer, +// and resolving the deltas reached from the appended bases. +func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency, meter *progress.Meter) error { + if ingestion.opts.ThinBase == nil { + return ErrThinPackNotPermitted + } + + hashSize := ingestion.objectFormat.Size() + if ingestion.scanner.consumed < packfile.HeaderLen+hashSize { + return fmt.Errorf("%w: pack shorter than trailer", ErrMalformedPack) + } + + // Drop the trailer from the write cursor. + ingestion.scanner.consumed -= hashSize + + appended := make([]int, 0, len(external)) + + for _, baseOID := range external { + err := ingestion.ctx.Err() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID) + if errors.Is(err, store.ErrObjectNotFound) { + continue + } + + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: reading thin base %s: %w", baseOID, err) + } + + index, err := ingestion.appendBaseObject(baseOID, ty, content) + if err != nil { + return err + } + + appended = append(appended, index) + } + + err := ingestion.rewriteHeaderTrailer() + if err != nil { + return err + } + + err = ingestion.resolveFrom(appended, adjacency, meter) + if err != nil { + return err + } + + missing := ingestion.unresolvedExternalBases() + if len(missing) > 0 { + return &ThinBasesMissingError{OIDs: missing} + } + + if ingestion.countUnresolved() > 0 { + return fmt.Errorf("%w: unresolvable delta entries after thin completion", ErrMalformedPack) + } + + ingestion.thinFixed = len(appended) > 0 + + return nil +} + +// appendBaseObject appends one external thin base +// as a non-delta pack entry at the current write cursor, +// verifying that its content hashes to the requested object ID. +func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType typ.Type, content []byte) (int, error) { + entryType, err := packfile.EntryTypeFromObjectType(objectType) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + computed, err := ingestion.hashObject(objectType, content) + if err != nil { + return 0, err + } + + if computed != objectID { + return 0, fmt.Errorf("%w: thin base %s content hashes to %s", ErrMalformedPack, objectID, computed) + } + + start := ingestion.scanner.consumed + startOffset := int64(start) + + headerBytes := packfile.AppendTypeSize(nil, entryType, uint64(len(content))) + + _, err = ingestion.packFile.WriteAt(headerBytes, startOffset) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: writing thin base header: %w", err) + } + + crc := crc32.NewIEEE() + _, _ = crc.Write(headerBytes) + + dataOffset := startOffset + int64(len(headerBytes)) + writer := &offsetWriter{file: ingestion.packFile, offset: dataOffset} + + zw := zlib.NewWriter(io.MultiWriter(writer, crc)) + + _, err = zw.Write(content) + if err != nil { + _ = zw.Close() + + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + err = zw.Close() + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + headerLen := len(headerBytes) + packedLen := headerLen + writer.written + ingestion.scanner.consumed = start + packedLen + + rec := record{ + offset: start, + headerLen: headerLen, + packedLen: packedLen, + crc32: crc.Sum32(), + packedType: entryType, + declaredSize: len(content), + baseOffset: 0, + baseOID: id.ObjectID{}, + oid: objectID, + resolved: true, + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[start] = index + ingestion.byOID.Store(objectID, index) + + return index, nil +} + +// rewriteHeaderTrailer updates the pack object count +// and recomputes the pack trailer hash +// over the entries left after thin completion. +func (ingestion *ingestion) rewriteHeaderTrailer() error { + count, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = ingestion.packFile.WriteAt(packfile.AppendHeader(nil, count), 0) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rewriting header: %w", err) + } + + bodyEnd := int64(ingestion.scanner.consumed) + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = io.Copy(hashImpl, io.NewSectionReader(ingestion.packFile, 0, bodyEnd)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rehashing pack: %w", err) + } + + sum := hashImpl.Sum(nil) + + _, err = ingestion.packFile.WriteAt(sum, bodyEnd) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing trailer: %w", err) + } + + packHash, err := ingestion.objectFormat.FromBytes(sum) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// offsetWriter writes to a file via WriteAt, +// advancing sequentially from a base offset +// and counting the bytes written. +type offsetWriter struct { + file *os.File + offset int64 + written int +} + +// Write implements [io.Writer]. +func (writer *offsetWriter) Write(p []byte) (int, error) { + n, err := writer.file.WriteAt(p, writer.offset) + writer.offset += int64(n) + writer.written += n + + return n, err //nolint:wrapcheck +} diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go new file mode 100644 index 00000000..b2f4d2b8 --- /dev/null +++ b/object/store/packed/internal/ingest/writepack_test.go @@ -0,0 +1,607 @@ +package ingest_test + +import ( + "bytes" + "context" + "errors" + "io" + "os" + "path/filepath" + "testing" + + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/store/packed/internal/ingest" +) + +// TestWritePackMatchesGit verifies that ingesting a normal pack +// matches git's own pack, index, and reverse index. +// +// The pack is streamed through verbatim, +// and the index and reverse index are regenerated deterministically, +// so a successful match also confirms that scanning and delta resolution +// recovered every object ID, offset, and CRC that git recorded. +func TestWritePackMatchesGit(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if want := filepath.Base(gitPrefix) + ".pack"; result.PackName != want { + t.Fatalf("PackName = %q, want %q", result.PackName, want) + } + + for _, artifact := range []struct { + kind string + ours string + want string + }{ + {"pack", result.PackName, gitPrefix + ".pack"}, + {"idx", result.IdxName, gitPrefix + ".idx"}, + {"rev", result.RevName, gitPrefix + ".rev"}, + } { + ours, err := os.ReadFile(filepath.Join(dir, artifact.ours)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile %s: %v", artifact.kind, err) + } + + want, err := os.ReadFile(artifact.want) + if err != nil { + t.Fatalf("ReadFile git %s: %v", artifact.kind, err) + } + + if !bytes.Equal(ours, want) { + t.Errorf("%s differs from git: %d bytes vs %d", artifact.kind, len(ours), len(want)) + } + } + }) + } +} + +// TestWritePackBloom verifies that ingesting a pack writes a Bloom filter +// that reports every object in the pack as present. +func TestWritePackBloom(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if result.BloomName == "" { + t.Fatal("BloomName is empty") + } + + bloomBytes, err := os.ReadFile(filepath.Join(dir, result.BloomName)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile bloom: %v", err) + } + + filter, err := bloom.Parse(bloomBytes, objectFormat) + if err != nil { + t.Fatalf("bloom.Parse: %v", err) + } + + idxBytes, err := os.ReadFile(filepath.Join(dir, result.IdxName)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile idx: %v", err) + } + + index, err := packidx.Parse(idxBytes, objectFormat.Size()) + if err != nil { + t.Fatalf("packidx.Parse: %v", err) + } + + if !bytes.Equal(filter.PackHash(), index.PackHash()) { + t.Fatalf("filter pack hash %x, want %x", filter.PackHash(), index.PackHash()) + } + + for pos := range index.NumObjects() { + if !filter.MayContain(index.OIDAt(pos)) { + t.Fatalf("filter rejects object at index position %d", pos) + } + } + }) + } +} + +// TestWritePackEmpty verifies that a zero-object pack +// succeeds without writing any artifacts. +func TestWritePackEmpty(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + stream, err := repo.PackObjectsStdout(t, nil, testgit.PackObjectsStdoutOptions{ + Revs: false, + Thin: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if result.ObjectCount != 0 { + t.Fatalf("ObjectCount = %d, want 0", result.ObjectCount) + } + + if result.PackName != "" { + t.Fatalf("PackName = %q, want empty", result.PackName) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("ReadDir: %v", err) + } + + if len(entries) != 0 { + t.Fatalf("empty pack wrote %d files, want 0", len(entries)) + } + }) + } +} + +// TestWritePackIdempotent verifies that ingesting the same pack twice +// into one store succeeds and leaves the artifacts in place. +func TestWritePackIdempotent(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + first, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("first WritePack: %v", err) + } + + second, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("second WritePack: %v", err) + } + + if second.PackName != first.PackName { + t.Fatalf("second PackName = %q, want %q", second.PackName, first.PackName) + } + + for _, name := range []string{first.PackName, first.IdxName, first.RevName} { + _, err := os.Stat(filepath.Join(dir, name)) + if err != nil { + t.Fatalf("missing %q after re-write: %v", name, err) + } + } + }) + } +} + +// writePack ingests src into a fresh store directory, +// returning the directory and the ingest result. +func writePack( + t *testing.T, + objectFormat id.ObjectFormat, + src io.Reader, + opts store.PackWriteOptions, +) (string, ingest.Result) { + t.Helper() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + result, err := ingest.WritePack(t.Context(), root, objectFormat, src, opts) + if err != nil { + t.Fatalf("WritePack: %v", err) + } + + return dir, result +} + +// TestWritePackThin verifies that a thin pack is completed from the thin base +// and that git accepts the resulting self-contained pack. +func TestWritePackThin(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + thinBase := fullStore(t, repo, objectFormat, seeded) + stream := thinStream(t, repo, seeded) + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: thinBase, + Progress: nil, + }) + + if !result.ThinFixed { + t.Fatalf("ThinFixed = false, want true (pack was not thin)") + } + + _, err := repo.VerifyPack(t, filepath.Join(dir, result.IdxName)) + if err != nil { + t.Fatalf("VerifyPack on completed pack: %v", err) + } + }) + } +} + +// TestWritePackThinWithoutBase verifies that a thin pack is rejected +// when no thin base is supplied. +func TestWritePackThinWithoutBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if !errors.Is(err, ingest.ErrThinPackNotPermitted) { + t.Fatalf("err = %v, want ErrThinPackNotPermitted", err) + } + }) + } +} + +// TestWritePackThinMissingBase verifies that a thin pack +// whose bases are absent from the thin base +// reports the missing object IDs. +func TestWritePackThinMissingBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + emptyBase := emptyStore(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: emptyBase, + Progress: nil, + }) + + missing, ok := errors.AsType[*ingest.ThinBasesMissingError](err) + if !ok { + t.Fatalf("err = %v, want *ThinBasesMissingError", err) + } + + if len(missing.OIDs) == 0 { + t.Fatalf("ThinBasesMissingError reported no object IDs") + } + }) + } +} + +// TestWritePackContextCancelled verifies that a cancelled context +// aborts ingestion and publishes no artifacts. +func TestWritePackContextCancelled(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: false, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + _, err = ingest.WritePack(ctx, root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if !errors.Is(err, context.Canceled) { + t.Fatalf("err = %v, want context.Canceled", err) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("ReadDir: %v", err) + } + + if len(entries) != 0 { + t.Fatalf("cancelled ingestion left %d files behind", len(entries)) + } + }) + } +} + +// TestWritePackObjectTooLarge verifies that an object exceeding MaxObjectSize +// is rejected and no artifacts are published. +func TestWritePackObjectTooLarge(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: false, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + _, err = ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + MaxObjectSize: 1, + }) + if !errors.Is(err, store.ErrObjectTooLarge) { + t.Fatalf("err = %v, want ErrObjectTooLarge", err) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("ReadDir: %v", err) + } + + if len(entries) != 0 { + t.Fatalf("rejected ingestion left %d files behind", len(entries)) + } + }) + } +} + +// seedHistory creates one repository with a seeded history. +func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) { + t.Helper() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + return repo, seeded +} + +// thinStream produces a thin pack of the tip commit excluding its parent, +// so its deltas reference the omitted parent objects. +func thinStream(t *testing.T, repo *testgit.Repo, seeded testgit.Seeded) []byte { + t.Helper() + + tip := seeded.Commits[len(seeded.Commits)-1] + parent := seeded.Commits[len(seeded.Commits)-2] + + stream, err := repo.PackObjectsStdout(t, []id.ObjectID{tip}, testgit.PackObjectsStdoutOptions{ + Revs: true, + Thin: true, + Exclude: []id.ObjectID{parent}, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + return stream +} + +// fullStore opens a packed store over a pack of every seeded object. +func fullStore(t *testing.T, repo *testgit.Repo, objectFormat id.ObjectFormat, seeded testgit.Seeded) *packed.Packed { + t.Helper() + + prefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: false, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + return openStore(t, filepath.Dir(prefix), objectFormat) +} + +// emptyStore opens a packed store over an empty directory. +func emptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + return openStore(t, t.TempDir(), objectFormat) +} + +// openStore opens a packed store over dir. +func openStore(t *testing.T, dir string, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} + +// freshRoot opens a writable root over a fresh temporary directory. +func freshRoot(t *testing.T) *os.Root { + t.Helper() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + return root +} |
