aboutsummaryrefslogtreecommitdiff
path: root/object/store/packed/internal
diff options
context:
space:
mode:
Diffstat (limited to 'object/store/packed/internal')
-rw-r--r--object/store/packed/internal/ingest/basecache.go25
-rw-r--r--object/store/packed/internal/ingest/doc.go11
-rw-r--r--object/store/packed/internal/ingest/errors.go36
-rw-r--r--object/store/packed/internal/ingest/finalize.go213
-rw-r--r--object/store/packed/internal/ingest/ingest.go265
-rw-r--r--object/store/packed/internal/ingest/record.go51
-rw-r--r--object/store/packed/internal/ingest/resolve.go495
-rw-r--r--object/store/packed/internal/ingest/result.go29
-rw-r--r--object/store/packed/internal/ingest/scan.go477
-rw-r--r--object/store/packed/internal/ingest/thin.go217
-rw-r--r--object/store/packed/internal/ingest/writepack_test.go607
11 files changed, 2426 insertions, 0 deletions
diff --git a/object/store/packed/internal/ingest/basecache.go b/object/store/packed/internal/ingest/basecache.go
new file mode 100644
index 00000000..77419aa7
--- /dev/null
+++ b/object/store/packed/internal/ingest/basecache.go
@@ -0,0 +1,25 @@
+package ingest
+
+import (
+ "lindenii.org/go/furgit/internal/cache/clock"
+ "lindenii.org/go/furgit/object/typ"
+)
+
+const baseCacheMaxWeight = 96 << 20
+
+type baseCacheKey struct {
+ offset int
+}
+
+type cachedContent struct {
+ objectType typ.Type
+ content []byte
+}
+
+func newBaseCache(workers int) *clock.Clock[baseCacheKey, cachedContent] {
+ return clock.New(baseCacheMaxWeight*uint64(workers), baseContentWeight) //#nosec G115
+}
+
+func baseContentWeight(_ baseCacheKey, base cachedContent) uint64 {
+ return uint64(len(base.content)) + 32
+}
diff --git a/object/store/packed/internal/ingest/doc.go b/object/store/packed/internal/ingest/doc.go
new file mode 100644
index 00000000..67be037b
--- /dev/null
+++ b/object/store/packed/internal/ingest/doc.go
@@ -0,0 +1,11 @@
+// Package ingest writes one incoming pack stream
+// into an objects/pack directory
+// as a finalized pack, index, and reverse index.
+//
+// WritePack streams the pack to a temporary file
+// while scanning its entries,
+// resolves every delta against in-pack bases,
+// optionally completes thin packs from an external base reader,
+// and publishes the artifacts under content-addressed names
+// derived from the pack trailer hash.
+package ingest
diff --git a/object/store/packed/internal/ingest/errors.go b/object/store/packed/internal/ingest/errors.go
new file mode 100644
index 00000000..e268182e
--- /dev/null
+++ b/object/store/packed/internal/ingest/errors.go
@@ -0,0 +1,36 @@
+package ingest
+
+import (
+ "errors"
+ "fmt"
+
+ "lindenii.org/go/furgit/object/id"
+)
+
+// ErrMalformedPack reports that
+// the incoming pack stream is truncated,
+// inconsistent, or otherwise unparseable.
+var ErrMalformedPack = errors.New("object/store/packed/internal/ingest: malformed pack")
+
+// ErrThinPackNotPermitted reports that
+// the incoming pack is thin,
+// referencing bases not contained within it,
+// but no external base reader was supplied to complete it.
+var ErrThinPackNotPermitted = errors.New("object/store/packed/internal/ingest: thin pack not permitted: no thin base supplied")
+
+// ThinBasesMissingError reports that
+// an incoming thin pack references base objects
+// that the supplied thin base reader does not contain,
+// so the pack cannot be completed.
+type ThinBasesMissingError struct {
+ // OIDs holds the missing base object IDs, sorted.
+ OIDs []id.ObjectID
+}
+
+// Error implements error.
+func (e *ThinBasesMissingError) Error() string {
+ return fmt.Sprintf(
+ "object/store/packed/internal/ingest: thin pack references %d missing base objects",
+ len(e.OIDs),
+ )
+}
diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go
new file mode 100644
index 00000000..c6b1e2c9
--- /dev/null
+++ b/object/store/packed/internal/ingest/finalize.go
@@ -0,0 +1,213 @@
+package ingest
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "io/fs"
+ "slices"
+
+ "lindenii.org/go/furgit/internal/format/packidx"
+ "lindenii.org/go/furgit/internal/format/packidx/bloom"
+ "lindenii.org/go/furgit/internal/format/packrev"
+ "lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/lgo/intconv"
+)
+
+// finalize writes the index and reverse index,
+// then links the pack, reverse index, and index
+// to their content-addressed names.
+func (ingestion *ingestion) finalize() (Result, error) {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ entries, positions, err := ingestion.indexEntries()
+ if err != nil {
+ return Result{}, err
+ }
+
+ packHash := ingestion.packHash.Bytes()
+
+ idxTmp, err := ingestion.writeTemp("tmp_idx_", func(w io.Writer) error {
+ return packidx.Write(w, ingestion.objectFormat, entries, packHash)
+ })
+ if err != nil {
+ return Result{}, err
+ }
+
+ revTmp, err := ingestion.writeTemp("tmp_rev_", func(w io.Writer) error {
+ return packrev.Write(w, ingestion.objectFormat, positions, packHash)
+ })
+ if err != nil {
+ return Result{}, err
+ }
+
+ bloomBuilder, err := ingestion.buildBloom(entries, packHash)
+ if err != nil {
+ return Result{}, err
+ }
+
+ bloomTmp, err := ingestion.writeTemp("tmp_bloom_", func(w io.Writer) error {
+ _, err := w.Write(bloomBuilder.Bytes())
+
+ return err
+ })
+ if err != nil {
+ return Result{}, err
+ }
+
+ objectCount, err := intconv.IntToUint32(len(ingestion.records))
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ base := "pack-" + ingestion.packHash.String()
+ packFinal := base + ".pack"
+ idxFinal := base + ".idx"
+ revFinal := base + ".rev"
+ bloomFinal := base + ".bloom"
+
+ // Link the data files before the index,
+ // since the index is what publishes the pack to readers.
+ artifacts := [...]struct{ tmp, final string }{
+ {ingestion.packTmp, packFinal},
+ {revTmp, revFinal},
+ {bloomTmp, bloomFinal},
+ {idxTmp, idxFinal},
+ }
+
+ var created []string
+
+ for _, artifact := range artifacts {
+ linked, err := ingestion.promote(artifact.tmp, artifact.final)
+ if err != nil {
+ for i := len(created) - 1; i >= 0; i-- {
+ _ = ingestion.root.Remove(created[i])
+ }
+
+ return Result{}, err
+ }
+
+ if linked {
+ created = append(created, artifact.final)
+ }
+ }
+
+ return Result{
+ PackName: packFinal,
+ IdxName: idxFinal,
+ RevName: revFinal,
+ BloomName: bloomFinal,
+ PackHash: ingestion.packHash,
+ ObjectCount: objectCount,
+ ThinFixed: ingestion.thinFixed,
+ }, nil
+}
+
+// buildBloom builds a Bloom filter over the index entries' object IDs,
+// bound to packHash.
+func (ingestion *ingestion) buildBloom(entries []packidx.Entry, packHash []byte) (*bloom.Builder, error) {
+ bucketCount, k, err := bloom.RecommendParams(ingestion.objectFormat, len(entries))
+ if err != nil {
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ builder, err := bloom.NewBuilder(ingestion.objectFormat, bucketCount, k, packHash)
+ if err != nil {
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ size := ingestion.objectFormat.Size()
+ for i := range entries {
+ builder.Add(entries[i].OID[:size])
+ }
+
+ return builder, nil
+}
+
+// indexEntries returns the index entries in object-ID order
+// and, for each record in pack order, its position in that index order.
+func (ingestion *ingestion) indexEntries() ([]packidx.Entry, []uint32, error) {
+ order := make([]int, len(ingestion.records))
+ for i := range order {
+ order[i] = i
+ }
+
+ slices.SortFunc(order, func(left, right int) int {
+ return ingestion.records[left].oid.Compare(ingestion.records[right].oid)
+ })
+
+ entries := make([]packidx.Entry, len(order))
+ positions := make([]uint32, len(ingestion.records))
+
+ for indexPosition, recordIndex := range order {
+ rec := &ingestion.records[recordIndex]
+
+ var oidBytes [id.MaxObjectIDSize]byte
+ copy(oidBytes[:], rec.oid.RawBytes())
+
+ offset, err := intconv.IntToUint64(rec.offset)
+ if err != nil {
+ return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ entries[indexPosition] = packidx.Entry{
+ OID: oidBytes,
+ Offset: offset,
+ CRC32: rec.crc32,
+ }
+
+ position, err := intconv.IntToUint32(indexPosition)
+ if err != nil {
+ return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ positions[recordIndex] = position
+ }
+
+ return entries, positions, nil
+}
+
+// writeTemp creates a temporary file,
+// writes it via write, syncs it, and returns its name.
+func (ingestion *ingestion) writeTemp(prefix string, write func(io.Writer) error) (string, error) {
+ name, file, err := ingestion.createTemp(prefix)
+ if err != nil {
+ return "", err
+ }
+
+ defer func() { _ = file.Close() }()
+
+ err = write(file)
+ if err != nil {
+ return "", err
+ }
+
+ err = file.Sync()
+ if err != nil {
+ return "", fmt.Errorf("object/store/packed/internal/ingest: syncing %q: %w", name, err)
+ }
+
+ return name, nil
+}
+
+// promote hard-links tmp to final and reports whether final was newly created.
+// A pre-existing final is treated as success; rollback must not remove it.
+func (ingestion *ingestion) promote(tmp, final string) (bool, error) {
+ err := ingestion.root.Link(tmp, final)
+
+ switch {
+ case err == nil:
+ _ = ingestion.root.Remove(tmp)
+
+ return true, nil
+ case errors.Is(err, fs.ErrExist):
+ _ = ingestion.root.Remove(tmp)
+
+ return false, nil
+ default:
+ return false, fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err)
+ }
+}
diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go
new file mode 100644
index 00000000..9b60af85
--- /dev/null
+++ b/object/store/packed/internal/ingest/ingest.go
@@ -0,0 +1,265 @@
+package ingest
+
+import (
+ "bytes"
+ "context"
+ "crypto/rand"
+ "errors"
+ "fmt"
+ "io"
+ "io/fs"
+ "os"
+ "runtime"
+ "sync/atomic"
+
+ "lindenii.org/go/furgit/internal/cache/clock"
+ "lindenii.org/go/furgit/internal/format/packfile"
+ "lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/lgo/sync"
+)
+
+var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names")
+
+// ingestion holds the state for one WritePack call.
+type ingestion struct {
+ ctx context.Context //nolint:containedctx
+
+ // root is the destination objects/pack directory.
+ root *os.Root
+
+ // objectFormat is the pack's object format.
+ objectFormat id.ObjectFormat
+
+ // opts carries the thin base reader and progress sink.
+ opts store.PackWriteOptions
+
+ // src is the pack stream, positioned just past the header.
+ src io.Reader
+
+ // packFile is the temporary pack file being written,
+ // and packTmp is its destination-relative name.
+ packFile *os.File
+ packTmp string
+
+ // temps lists every temporary file to remove on failure.
+ temps []string
+
+ // scanner streams src into packFile while scanning entries.
+ scanner *scanner
+
+ // records holds one entry per object, in pack-offset order.
+ records []record
+
+ // byOffset maps an entry offset to its record index,
+ // and byOID maps a resolved object ID to its record index.
+ byOffset map[int]int
+ byOID sync.Map[id.ObjectID, int]
+
+ baseCache *clock.Clock[baseCacheKey, cachedContent]
+
+ // workers is the delta-resolution concurrency.
+ workers int
+
+ // headerCount is the object count declared by the pack header.
+ headerCount int
+
+ // deltaCount counts delta records, accumulated during scanning.
+ deltaCount int
+
+ // deltasResolved counts resolved delta records.
+ deltasResolved atomic.Int64
+
+ // packHash is the final pack trailer hash.
+ packHash id.ObjectID
+
+ // thinFixed reports whether thin completion appended local bases.
+ thinFixed bool
+
+ // committed suppresses temporary file removal once artifacts are published.
+ committed bool
+}
+
+// WritePack ingests one pack stream into root,
+// publishing a pack, index, and reverse index
+// under content-addressed names derived from the pack trailer hash.
+//
+// WritePack consumes the pack stream through its trailer and stops there.
+// It does not require src to reach EOF afterward,
+// so it is safe on a still-open transport connection,
+// such as receive-pack,
+// whose peer keeps the connection open to read the response.
+//
+// The pack must be the last thing the peer sends before that response:
+// any bytes arriving immediately after the trailer
+// are rejected as a malformed pack.
+func WritePack(ctx context.Context, root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) {
+ if objectFormat.Size() == 0 {
+ return Result{}, id.ErrInvalidObjectFormat
+ }
+
+ headerRaw, count, err := readPackHeader(src)
+ if err != nil {
+ return Result{}, err
+ }
+
+ workers := runtime.GOMAXPROCS(0)
+
+ ingestion := &ingestion{
+ ctx: ctx,
+ root: root,
+ objectFormat: objectFormat,
+ opts: opts,
+ src: src,
+ packFile: nil,
+ packTmp: "",
+ temps: nil,
+ scanner: nil,
+ records: nil,
+ byOffset: make(map[int]int),
+ baseCache: newBaseCache(workers),
+ workers: workers,
+ headerCount: count,
+ deltaCount: 0,
+ packHash: id.ObjectID{},
+ thinFixed: false,
+ committed: false,
+ }
+
+ defer ingestion.cleanup()
+
+ if count == 0 {
+ return ingestion.finishEmpty(headerRaw)
+ }
+
+ err = ingestion.openPackTemp(headerRaw)
+ if err != nil {
+ return Result{}, err
+ }
+
+ err = ingestion.streamAndScan()
+ if err != nil {
+ return Result{}, err
+ }
+
+ err = ingestion.resolveDeltas()
+ if err != nil {
+ return Result{}, err
+ }
+
+ err = ingestion.packFile.Sync()
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: syncing pack: %w", err)
+ }
+
+ result, err := ingestion.finalize()
+ if err != nil {
+ return Result{}, err
+ }
+
+ ingestion.committed = true
+
+ return result, nil
+}
+
+// finishEmpty verifies the trailer of a zero-object pack
+// and returns success without writing any artifacts.
+func (ingestion *ingestion) finishEmpty(headerRaw [packfile.HeaderLen]byte) (Result, error) {
+ hashImpl, err := ingestion.objectFormat.New()
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ _, _ = hashImpl.Write(headerRaw[:])
+
+ trailer := make([]byte, ingestion.objectFormat.Size())
+
+ _, err = io.ReadFull(ingestion.src, trailer)
+ if err != nil {
+ return Result{}, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err)
+ }
+
+ if !bytes.Equal(hashImpl.Sum(nil), trailer) {
+ return Result{}, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack)
+ }
+
+ packHash, err := ingestion.objectFormat.FromBytes(trailer)
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ return Result{
+ PackName: "",
+ IdxName: "",
+ RevName: "",
+ PackHash: packHash,
+ ObjectCount: 0,
+ ThinFixed: false,
+ }, nil
+}
+
+// openPackTemp creates the temporary pack file,
+// writes the validated header to it,
+// and builds the stream scanner seeded with that header.
+func (ingestion *ingestion) openPackTemp(headerRaw [packfile.HeaderLen]byte) error {
+ name, file, err := ingestion.createTemp("tmp_pack_")
+ if err != nil {
+ return err
+ }
+
+ ingestion.packTmp = name
+ ingestion.packFile = file
+
+ _, err = file.Write(headerRaw[:])
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: writing pack header: %w", err)
+ }
+
+ hashImpl, err := ingestion.objectFormat.New()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ _, _ = hashImpl.Write(headerRaw[:])
+
+ ingestion.scanner = newScanner(ingestion.src, ingestion.packFile, hashImpl)
+
+ return nil
+}
+
+// createTemp creates one temporary file under root,
+// recording its name for cleanup on failure.
+func (ingestion *ingestion) createTemp(prefix string) (string, *os.File, error) {
+ for range 32 {
+ name := prefix + rand.Text()
+
+ file, err := ingestion.root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644)
+ if err == nil {
+ ingestion.temps = append(ingestion.temps, name)
+
+ return name, file, nil
+ }
+
+ if !errors.Is(err, fs.ErrExist) {
+ return "", nil, fmt.Errorf("object/store/packed/internal/ingest: creating temp file: %w", err)
+ }
+ }
+
+ return "", nil, errTempNamesExhausted
+}
+
+// cleanup closes the pack file
+// and removes temporary files unless the write committed.
+func (ingestion *ingestion) cleanup() {
+ if ingestion.packFile != nil {
+ _ = ingestion.packFile.Close()
+ }
+
+ if ingestion.committed {
+ return
+ }
+
+ for _, name := range ingestion.temps {
+ _ = ingestion.root.Remove(name)
+ }
+}
diff --git a/object/store/packed/internal/ingest/record.go b/object/store/packed/internal/ingest/record.go
new file mode 100644
index 00000000..4031a246
--- /dev/null
+++ b/object/store/packed/internal/ingest/record.go
@@ -0,0 +1,51 @@
+package ingest
+
+import (
+ "lindenii.org/go/furgit/internal/format/packfile"
+ "lindenii.org/go/furgit/object/id"
+)
+
+// record is the scanned metadata for one packed entry,
+// completed in place as deltas are resolved.
+//
+// Records are appended in pack-offset order,
+// so a record's index in the slice is also its pack order.
+type record struct {
+ // offset is the entry's start offset in the pack.
+ offset int
+
+ // headerLen is the entry header length in bytes,
+ // so the zlib payload begins at offset+headerLen.
+ headerLen int
+
+ // packedLen is the total on-disk entry length in bytes,
+ // covering the header and the compressed payload.
+ packedLen int
+
+ // crc32 is the CRC32 of the entry's packed bytes.
+ crc32 uint32
+
+ // packedType is the entry type as encoded in the pack.
+ packedType packfile.EntryType
+
+ // declaredSize is the declared inflated payload size.
+ declaredSize int
+
+ // baseOffset is the base entry offset for an ofs-delta.
+ baseOffset int
+
+ // baseOID is the base object ID for a ref-delta.
+ baseOID id.ObjectID
+
+ // oid is the resolved object ID,
+ // meaningful once resolved is true.
+ oid id.ObjectID
+
+ // resolved reports whether oid is final.
+ resolved bool
+}
+
+// dataOffset returns the entry's compressed payload start offset.
+func (record *record) dataOffset() int {
+ return record.offset + record.headerLen
+}
diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go
new file mode 100644
index 00000000..dd26cd47
--- /dev/null
+++ b/object/store/packed/internal/ingest/resolve.go
@@ -0,0 +1,495 @@
+package ingest
+
+import (
+ "fmt"
+ "io"
+ "sync"
+
+ "lindenii.org/go/furgit/internal/compress/zlib"
+ "lindenii.org/go/furgit/internal/format/packfile"
+ "lindenii.org/go/furgit/internal/format/packfile/delta"
+ "lindenii.org/go/furgit/internal/progress"
+ "lindenii.org/go/furgit/object/header"
+ "lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/furgit/object/typ"
+)
+
+// adjacency maps each resolvable base to its delta children:
+// ofs-deltas keyed by base offset, ref-deltas keyed by base object ID.
+type adjacency struct {
+ byOffset map[int][]int
+ byOID map[id.ObjectID][]int
+}
+
+// resolveDeltas resolves every delta record into a final object ID and type,
+// completing thin packs from the external base reader when required.
+func (ingestion *ingestion) resolveDeltas() error {
+ meter := progress.New(progress.Options{
+ Writer: ingestion.opts.Progress,
+ Title: "resolving deltas",
+ Total: ingestion.countDeltas(),
+ Delay: 0,
+ Sparse: false,
+ Throughput: false,
+ })
+
+ adjacency := ingestion.buildAdjacency()
+
+ err := ingestion.resolveFrom(ingestion.resolvedRoots(), adjacency, meter)
+ if err != nil {
+ return err
+ }
+
+ external := ingestion.unresolvedExternalBases()
+
+ switch {
+ case len(external) == 0 && ingestion.countUnresolved() > 0:
+ return fmt.Errorf("%w: unresolvable delta entries", ErrMalformedPack)
+ case len(external) > 0:
+ err = ingestion.fixThin(external, adjacency, meter)
+ if err != nil {
+ return err
+ }
+ }
+
+ meter.Stop("done")
+
+ return nil
+}
+
+// buildAdjacency indexes every delta record by its base,
+// so a resolved base can find the children that delta against it.
+func (ingestion *ingestion) buildAdjacency() adjacency {
+ out := adjacency{
+ byOffset: make(map[int][]int),
+ byOID: make(map[id.ObjectID][]int),
+ }
+
+ for index := range ingestion.records {
+ rec := &ingestion.records[index]
+
+ switch rec.packedType {
+ case packfile.EntryTypeOfsDelta:
+ out.byOffset[rec.baseOffset] = append(out.byOffset[rec.baseOffset], index)
+ case packfile.EntryTypeRefDelta:
+ out.byOID[rec.baseOID] = append(out.byOID[rec.baseOID], index)
+ case packfile.EntryTypeInvalid,
+ packfile.EntryTypeCommit,
+ packfile.EntryTypeTree,
+ packfile.EntryTypeBlob,
+ packfile.EntryTypeTag,
+ packfile.EntryTypeFuture:
+ }
+ }
+
+ return out
+}
+
+// item is a delta record awaiting resolution, with its delta-chain depth.
+type item struct {
+ index int
+ depth int
+}
+
+// resolver resolves deltas concurrently over a shared LIFO work stack.
+//
+// Each item is one delta child;
+// a worker materializes its base from the cache, re-deriving on a miss,
+// resolves the child,
+// and pushes the child's own delta children.
+// Workers park while the stack is empty but others are still working,
+// and exit once it is empty and none are.
+type resolver struct {
+ ingestion *ingestion
+ adjacency adjacency
+ meter *progress.Meter
+
+ mu sync.Mutex
+ cond *sync.Cond
+ stack []item
+ active int
+ firstErr error
+}
+
+func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error {
+ var seed []item
+
+ for _, root := range roots {
+ rec := &ingestion.records[root]
+ for _, group := range [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]} {
+ for _, child := range group {
+ seed = append(seed, item{index: child, depth: 1})
+ }
+ }
+ }
+
+ if len(seed) == 0 {
+ return nil
+ }
+
+ res := &resolver{
+ ingestion: ingestion,
+ adjacency: adjacency,
+ meter: meter,
+ stack: seed,
+ }
+ res.cond = sync.NewCond(&res.mu)
+
+ return res.run(ingestion.workers)
+}
+
+func (res *resolver) run(workers int) error {
+ if workers <= 1 {
+ res.worker()
+
+ return res.firstErr
+ }
+
+ var wg sync.WaitGroup
+
+ for range workers {
+ wg.Go(func() {
+ res.worker()
+ })
+ }
+
+ wg.Wait()
+
+ return res.firstErr
+}
+
+func (res *resolver) worker() {
+ for {
+ res.mu.Lock()
+
+ for len(res.stack) == 0 && res.active > 0 && res.firstErr == nil {
+ res.cond.Wait()
+ }
+
+ if res.firstErr != nil || len(res.stack) == 0 {
+ res.mu.Unlock()
+
+ return
+ }
+
+ it := res.stack[len(res.stack)-1]
+ res.stack = res.stack[:len(res.stack)-1]
+ res.active++
+ res.mu.Unlock()
+
+ kids, err := res.process(it)
+
+ res.mu.Lock()
+ res.active--
+
+ if err != nil && res.firstErr == nil {
+ res.firstErr = err
+ }
+
+ if res.firstErr == nil {
+ res.stack = append(res.stack, kids...)
+ }
+
+ if res.firstErr != nil || len(kids) > 0 || (res.active == 0 && len(res.stack) == 0) {
+ res.cond.Broadcast()
+ }
+
+ res.mu.Unlock()
+ }
+}
+
+// process resolves one delta child and returns its own delta children.
+func (res *resolver) process(it item) ([]item, error) {
+ err := res.ingestion.ctx.Err()
+ if err != nil {
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ rec := &res.ingestion.records[it.index]
+
+ parent, ok := res.ingestion.baseRecordIndex(rec)
+ if !ok {
+ return nil, fmt.Errorf("%w: entry at %d: base unavailable while resolving", ErrMalformedPack, rec.offset)
+ }
+
+ baseType, baseContent, err := res.ingestion.materialize(parent)
+ if err != nil {
+ return nil, err
+ }
+
+ err = res.ingestion.resolveOneChild(it.index, baseType, baseContent, res.meter)
+ if err != nil {
+ return nil, err
+ }
+
+ return res.childItems(it.index, it.depth+1)
+}
+
+// childItems returns the delta children of a just-resolved record at depth.
+func (res *resolver) childItems(index, depth int) ([]item, error) {
+ rec := &res.ingestion.records[index]
+
+ var kids []item
+
+ for _, group := range [2][]int{res.adjacency.byOffset[rec.offset], res.adjacency.byOID[rec.oid]} {
+ for _, child := range group {
+ if depth > delta.MaxChainDepth {
+ return nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, res.ingestion.records[child].offset)
+ }
+
+ kids = append(kids, item{index: child, depth: depth})
+ }
+ }
+
+ return kids, nil
+}
+
+func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseContent []byte, meter *progress.Meter) error {
+ rec := &ingestion.records[index]
+
+ content, err := ingestion.applyDelta(index, baseContent)
+ if err != nil {
+ return err
+ }
+
+ oid, err := ingestion.hashObject(baseType, content)
+ if err != nil {
+ return err
+ }
+
+ rec.oid = oid
+ rec.resolved = true
+
+ ingestion.byOID.Store(oid, index)
+ ingestion.baseCache.Add(baseCacheKey{offset: rec.offset}, cachedContent{objectType: baseType, content: content})
+
+ ingestion.deltasResolved.Add(1)
+ meter.Add(1, 0)
+
+ return nil
+}
+
+// materialize returns the inflated content of an already-resolved record,
+// from the base cache,
+// or re-derived from the nearest cached or base ancestor on a miss.
+func (ingestion *ingestion) materialize(index int) (typ.Type, []byte, error) {
+ var (
+ zero typ.Type
+ chain []int
+ base []byte
+ baseType typ.Type
+ )
+
+ cur := index
+
+ for {
+ rec := &ingestion.records[cur]
+
+ if cached, ok := ingestion.baseCache.Get(baseCacheKey{offset: rec.offset}); ok {
+ base = cached.content
+ baseType = cached.objectType
+
+ break
+ }
+
+ if rec.packedType.IsBase() {
+ objectType, err := rec.packedType.ObjectType()
+ if err != nil {
+ return zero, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ content, err := ingestion.inflateRecord(cur)
+ if err != nil {
+ return zero, nil, err
+ }
+
+ base = content
+ baseType = objectType
+
+ break
+ }
+
+ if len(chain) >= delta.MaxChainDepth {
+ return zero, nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset)
+ }
+
+ chain = append(chain, cur)
+
+ next, ok := ingestion.baseRecordIndex(rec)
+ if !ok {
+ return zero, nil, fmt.Errorf("%w: entry at %d: base unavailable while reconstructing", ErrMalformedPack, rec.offset)
+ }
+
+ cur = next
+ }
+
+ for i := len(chain) - 1; i >= 0; i-- {
+ content, err := ingestion.applyDelta(chain[i], base)
+ if err != nil {
+ return zero, nil, err
+ }
+
+ ingestion.baseCache.Add(baseCacheKey{offset: ingestion.records[chain[i]].offset}, cachedContent{objectType: baseType, content: content})
+
+ base = content
+ }
+
+ return baseType, base, nil
+}
+
+func (ingestion *ingestion) applyDelta(index int, baseContent []byte) ([]byte, error) {
+ rec := &ingestion.records[index]
+
+ deltaPayload, err := ingestion.inflateRecord(index)
+ if err != nil {
+ return nil, err
+ }
+
+ baseSize, resultSize, _, err := delta.ParseHeaderSizes(deltaPayload)
+ if err != nil {
+ return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ limit := ingestion.opts.MaxObjectSize
+ if limit > 0 && resultSize > uint64(limit) {
+ return nil, fmt.Errorf("%w: entry at %d: result size %d exceeds limit %d", store.ErrObjectTooLarge, rec.offset, resultSize, limit)
+ }
+
+ if baseSize != uint64(len(baseContent)) {
+ return nil, fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset)
+ }
+
+ content, err := delta.Apply(baseContent, deltaPayload)
+ if err != nil {
+ return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ if uint64(len(content)) != resultSize {
+ return nil, fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset)
+ }
+
+ return content, nil
+}
+
+func (ingestion *ingestion) baseRecordIndex(rec *record) (int, bool) {
+ switch rec.packedType {
+ case packfile.EntryTypeOfsDelta:
+ index, ok := ingestion.byOffset[rec.baseOffset]
+
+ return index, ok
+ case packfile.EntryTypeRefDelta:
+ index, ok := ingestion.byOID.Load(rec.baseOID)
+
+ return index, ok
+ case packfile.EntryTypeInvalid,
+ packfile.EntryTypeCommit,
+ packfile.EntryTypeTree,
+ packfile.EntryTypeBlob,
+ packfile.EntryTypeTag,
+ packfile.EntryTypeFuture:
+ }
+
+ return 0, false
+}
+
+// inflateRecord inflates one record's payload from the temporary pack file.
+func (ingestion *ingestion) inflateRecord(index int) ([]byte, error) {
+ rec := &ingestion.records[index]
+
+ offset := int64(rec.dataOffset())
+ compressedLen := int64(rec.packedLen - rec.headerLen)
+ size := rec.declaredSize
+
+ zr, err := zlib.NewReader(io.NewSectionReader(ingestion.packFile, offset, compressedLen))
+ if err != nil {
+ return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ defer func() { _ = zr.Close() }()
+
+ out := make([]byte, size)
+
+ _, err = io.ReadFull(zr, out)
+ if err != nil {
+ return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ return out, nil
+}
+
+// hashObject computes the object ID of one resolved object.
+func (ingestion *ingestion) hashObject(objectType typ.Type, content []byte) (id.ObjectID, error) {
+ var zero id.ObjectID
+
+ hashImpl, err := ingestion.objectFormat.New()
+ if err != nil {
+ return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ _, _ = hashImpl.Write(header.Append(nil, objectType, len(content)))
+ _, _ = hashImpl.Write(content)
+
+ oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil))
+ if err != nil {
+ return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ return oid, nil
+}
+
+// resolvedRoots returns the indices of every currently resolved record.
+func (ingestion *ingestion) resolvedRoots() []int {
+ var roots []int
+
+ for index := range ingestion.records {
+ if ingestion.records[index].resolved {
+ roots = append(roots, index)
+ }
+ }
+
+ return roots
+}
+
+// countDeltas returns the number of delta records.
+func (ingestion *ingestion) countDeltas() int {
+ return ingestion.deltaCount
+}
+
+// countUnresolved returns the number of records that remain unresolved.
+//
+// Every base is resolved during scanning or thin completion,
+// so the unresolved records are exactly the unresolved deltas:
+// the delta records minus those already resolved.
+func (ingestion *ingestion) countUnresolved() int {
+ return ingestion.deltaCount - int(ingestion.deltasResolved.Load())
+}
+
+// unresolvedExternalBases returns the unique base object IDs
+// of unresolved ref-deltas whose base is not present in the pack,
+// in first-reference order.
+func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID {
+ seen := make(map[id.ObjectID]struct{})
+
+ out := make([]id.ObjectID, 0, ingestion.deltaCount-int(ingestion.deltasResolved.Load()))
+
+ for index := range ingestion.records {
+ rec := &ingestion.records[index]
+ if rec.resolved || rec.packedType != packfile.EntryTypeRefDelta {
+ continue
+ }
+
+ if _, ok := ingestion.byOID.Load(rec.baseOID); ok {
+ continue
+ }
+
+ if _, ok := seen[rec.baseOID]; ok {
+ continue
+ }
+
+ seen[rec.baseOID] = struct{}{}
+ out = append(out, rec.baseOID)
+ }
+
+ return out
+}
diff --git a/object/store/packed/internal/ingest/result.go b/object/store/packed/internal/ingest/result.go
new file mode 100644
index 00000000..9cd6ef1d
--- /dev/null
+++ b/object/store/packed/internal/ingest/result.go
@@ -0,0 +1,29 @@
+package ingest
+
+import "lindenii.org/go/furgit/object/id"
+
+// Result describes one finalized pack write.
+type Result struct {
+ // PackName is the destination-relative name of the written pack.
+ PackName string
+
+ // IdxName is the destination-relative name of the written index.
+ IdxName string
+
+ // RevName is the destination-relative name of the written reverse index.
+ RevName string
+
+ // BloomName is the destination-relative name of the written Bloom filter.
+ BloomName string
+
+ // PackHash is the pack trailer hash
+ // shared by the pack, index, and reverse index.
+ PackHash id.ObjectID
+
+ // ObjectCount is the number of objects in the finalized pack,
+ // including any bases appended during thin completion.
+ ObjectCount uint32
+
+ // ThinFixed reports whether thin completion appended local bases.
+ ThinFixed bool
+}
diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go
new file mode 100644
index 00000000..2cb5c135
--- /dev/null
+++ b/object/store/packed/internal/ingest/scan.go
@@ -0,0 +1,477 @@
+package ingest
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "hash"
+ "hash/crc32"
+ "io"
+
+ "lindenii.org/go/furgit/internal/compress/zlib"
+ "lindenii.org/go/furgit/internal/format/packfile"
+ "lindenii.org/go/furgit/internal/progress"
+ "lindenii.org/go/furgit/object/header"
+ "lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/lgo/intconv"
+)
+
+// scanBufferSize is the stream scanner's fixed input window size.
+const scanBufferSize = 64 << 10
+
+// scanner reads one pack stream,
+// mirroring consumed bytes into the destination pack file
+// while maintaining the running pack hash and a per-entry CRC.
+//
+// It implements [io.Reader] and [io.ByteReader]
+// so a zlib reader can consume an entry payload through it
+// without reading past the end of that compressed stream.
+type scanner struct {
+ src io.Reader
+ dst io.Writer
+
+ // buf[off:n] is the unread window.
+ buf []byte
+ off int
+ n int
+
+ // consumed counts stream bytes consumed so far.
+ consumed int
+
+ // hash accumulates the pack hash over consumed bytes
+ // while hashing is true.
+ hash hash.Hash
+ hashing bool
+
+ // crc accumulates the CRC of the current entry
+ // while crcing is true.
+ crc uint32
+ crcing bool
+}
+
+// newScanner constructs one scanner mirroring src into dst,
+// seeding the running hash from the already-consumed pack header.
+func newScanner(src io.Reader, dst io.Writer, packHash hash.Hash) *scanner {
+ return &scanner{
+ src: src,
+ dst: dst,
+ buf: make([]byte, scanBufferSize),
+ consumed: packfile.HeaderLen,
+ hash: packHash,
+ hashing: true,
+ crc: 0,
+ crcing: false,
+ }
+}
+
+// readPackHeader reads and validates the pack header from src,
+// returning the raw header and its declared object count.
+func readPackHeader(src io.Reader) ([packfile.HeaderLen]byte, int, error) {
+ var raw [packfile.HeaderLen]byte
+
+ _, err := io.ReadFull(src, raw[:])
+ if err != nil {
+ return raw, 0, fmt.Errorf("%w: reading header: %w", ErrMalformedPack, err)
+ }
+
+ packHeader, err := packfile.ParseHeader(raw[:])
+ if err != nil {
+ return raw, 0, fmt.Errorf("%w: %w", ErrMalformedPack, err)
+ }
+
+ count, err := intconv.Uint32ToInt(packHeader.ObjectCount)
+ if err != nil {
+ return raw, 0, fmt.Errorf("%w: object count: %w", ErrMalformedPack, err)
+ }
+
+ return raw, count, nil
+}
+
+// Read implements [io.Reader].
+func (scanner *scanner) Read(dst []byte) (int, error) {
+ if len(dst) == 0 {
+ return 0, nil
+ }
+
+ err := scanner.ensureAvailable()
+ if err != nil {
+ return 0, err
+ }
+
+ read := min(len(dst), scanner.n-scanner.off)
+
+ copy(dst, scanner.buf[scanner.off:scanner.off+read])
+
+ err = scanner.use(read)
+ if err != nil {
+ return 0, err
+ }
+
+ return read, nil
+}
+
+// ReadByte implements [io.ByteReader] without allocation.
+func (scanner *scanner) ReadByte() (byte, error) {
+ err := scanner.ensureAvailable()
+ if err != nil {
+ return 0, err
+ }
+
+ b := scanner.buf[scanner.off]
+
+ err = scanner.use(1)
+ if err != nil {
+ return 0, err
+ }
+
+ return b, nil
+}
+
+// ensureAvailable makes at least one unread byte available,
+// returning [io.EOF] once the source is exhausted.
+func (scanner *scanner) ensureAvailable() error {
+ for scanner.n-scanner.off == 0 {
+ err := scanner.flushPrefix()
+ if err != nil {
+ return err
+ }
+
+ read, err := scanner.src.Read(scanner.buf[scanner.n:])
+ scanner.n += read
+
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ if scanner.n-scanner.off == 0 {
+ return io.EOF
+ }
+
+ return nil
+ }
+
+ return fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err)
+ }
+
+ if read == 0 && scanner.n-scanner.off == 0 {
+ return io.ErrNoProgress
+ }
+ }
+
+ return nil
+}
+
+// peekHeader returns the unread window grown to at most maxLen bytes
+// without consuming, tolerating an early end of stream.
+func (scanner *scanner) peekHeader(maxLen int) ([]byte, error) {
+ maxLen = min(maxLen, len(scanner.buf))
+
+ for scanner.n-scanner.off < maxLen {
+ err := scanner.flushPrefix()
+ if err != nil {
+ return nil, err
+ }
+
+ read, err := scanner.src.Read(scanner.buf[scanner.n:])
+ scanner.n += read
+
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err)
+ }
+
+ if read == 0 {
+ break
+ }
+ }
+
+ if scanner.n-scanner.off == 0 {
+ return nil, fmt.Errorf("%w: unexpected end of stream", ErrMalformedPack)
+ }
+
+ return scanner.buf[scanner.off:scanner.n], nil
+}
+
+// use consumes n unread bytes,
+// folding them into the running hash and entry CRC as enabled.
+func (scanner *scanner) use(n int) error {
+ chunk := scanner.buf[scanner.off : scanner.off+n]
+
+ if scanner.hashing {
+ _, err := scanner.hash.Write(chunk)
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: hashing pack: %w", err)
+ }
+ }
+
+ if scanner.crcing {
+ scanner.crc = crc32.Update(scanner.crc, crc32.IEEETable, chunk)
+ }
+
+ scanner.off += n
+ scanner.consumed += n
+
+ return nil
+}
+
+// flushPrefix writes the consumed buffer prefix to the destination
+// and compacts the unread window to the start of the buffer.
+func (scanner *scanner) flushPrefix() error {
+ if scanner.off == 0 {
+ return nil
+ }
+
+ _, err := scanner.dst.Write(scanner.buf[:scanner.off])
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: writing pack: %w", err)
+ }
+
+ unread := scanner.n - scanner.off
+
+ copy(scanner.buf, scanner.buf[scanner.off:scanner.n])
+
+ scanner.off = 0
+ scanner.n = unread
+
+ return nil
+}
+
+// beginCRC starts CRC accumulation for one entry.
+func (scanner *scanner) beginCRC() {
+ scanner.crc = 0
+ scanner.crcing = true
+}
+
+// endCRC ends CRC accumulation and returns the entry CRC.
+func (scanner *scanner) endCRC() uint32 {
+ crc := scanner.crc
+ scanner.crc = 0
+ scanner.crcing = false
+
+ return crc
+}
+
+// finishTrailer reads and verifies the pack trailer hash,
+// flushing the remaining buffered pack bytes to the destination.
+//
+// The trailer is mirrored to the destination but excluded from the pack hash.
+func (scanner *scanner) finishTrailer(hashSize int) ([]byte, error) {
+ trailer := make([]byte, hashSize)
+
+ scanner.hashing = false
+
+ _, err := io.ReadFull(scanner, trailer)
+ if err != nil {
+ return nil, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err)
+ }
+
+ if scanner.n-scanner.off > 0 {
+ return nil, fmt.Errorf("%w: trailing data after pack", ErrMalformedPack)
+ }
+
+ if !bytes.Equal(scanner.hash.Sum(nil), trailer) {
+ return nil, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack)
+ }
+
+ err = scanner.flushPrefix()
+ if err != nil {
+ return nil, err
+ }
+
+ return trailer, nil
+}
+
+// streamAndScan streams the pack body to the temporary pack file,
+// scanning one record per declared object and verifying the trailer.
+func (ingestion *ingestion) streamAndScan() error {
+ meter := progress.New(progress.Options{
+ Writer: ingestion.opts.Progress,
+ Title: "receiving objects",
+ Total: ingestion.headerCount,
+ Delay: 0,
+ Sparse: false,
+ Throughput: true,
+ })
+
+ prevConsumed := ingestion.scanner.consumed
+
+ for range ingestion.headerCount {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ err = ingestion.scanEntry(ingestion.scanner.consumed)
+ if err != nil {
+ return err
+ }
+
+ consumed := ingestion.scanner.consumed
+ meter.Add(1, int64(consumed-prevConsumed))
+ prevConsumed = consumed
+ }
+
+ meter.Stop("done")
+
+ trailer, err := ingestion.scanner.finishTrailer(ingestion.objectFormat.Size())
+ if err != nil {
+ return err
+ }
+
+ packHash, err := ingestion.objectFormat.FromBytes(trailer)
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ ingestion.packHash = packHash
+
+ return nil
+}
+
+// scanEntry scans the entry beginning at start into one record.
+func (ingestion *ingestion) scanEntry(start int) error {
+ ingestion.scanner.beginCRC()
+
+ rec, err := ingestion.scanHeader(start)
+ if err != nil {
+ return err
+ }
+
+ inflated, oid, err := ingestion.drainPayload(&rec)
+ if err != nil {
+ return err
+ }
+
+ if inflated != int64(rec.declaredSize) {
+ return fmt.Errorf(
+ "%w: entry at %d: inflated size %d differs from declared %d",
+ ErrMalformedPack, start, inflated, rec.declaredSize,
+ )
+ }
+
+ rec.packedLen = ingestion.scanner.consumed - start
+ rec.crc32 = ingestion.scanner.endCRC()
+
+ if rec.packedType.IsBase() {
+ rec.oid = oid
+ rec.resolved = true
+ } else {
+ ingestion.deltaCount++
+ }
+
+ index := len(ingestion.records)
+ ingestion.records = append(ingestion.records, rec)
+ ingestion.byOffset[rec.offset] = index
+
+ if rec.resolved {
+ ingestion.byOID.Store(rec.oid, index)
+ }
+
+ return nil
+}
+
+// scanHeader parses and consumes the entry header at start.
+func (ingestion *ingestion) scanHeader(start int) (record, error) {
+ var rec record
+
+ rec.offset = start
+
+ window, err := ingestion.scanner.peekHeader(packfile.MaxEntryHeaderLen(ingestion.objectFormat.Size()))
+ if err != nil {
+ return rec, err
+ }
+
+ entryHeader, err := packfile.ParseEntryHeader(window, ingestion.objectFormat.Size())
+ if err != nil {
+ return rec, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, start, err)
+ }
+
+ declaredSize, err := intconv.Uint64ToInt(entryHeader.Size)
+ if err != nil {
+ return rec, fmt.Errorf("%w: entry at %d: declared size overflows int: %w", ErrMalformedPack, start, err)
+ }
+
+ limit := ingestion.opts.MaxObjectSize
+ if limit > 0 && declaredSize > limit {
+ return rec, fmt.Errorf("%w: entry at %d: declared size %d exceeds limit %d", store.ErrObjectTooLarge, start, declaredSize, limit)
+ }
+
+ rec.packedType = entryHeader.Type
+ rec.declaredSize = declaredSize
+ rec.headerLen = entryHeader.HeaderLen
+
+ switch entryHeader.Type {
+ case packfile.EntryTypeCommit, packfile.EntryTypeTree, packfile.EntryTypeBlob, packfile.EntryTypeTag:
+ case packfile.EntryTypeOfsDelta:
+ dist, err := intconv.Uint64ToInt(entryHeader.OfsDistance)
+ if err != nil || dist == 0 || dist > start {
+ return rec, fmt.Errorf("%w: entry at %d: ofs-delta base out of bounds", ErrMalformedPack, start)
+ }
+
+ rec.baseOffset = start - dist
+ case packfile.EntryTypeRefDelta:
+ baseID, err := ingestion.objectFormat.FromBytes(entryHeader.RefBase[:ingestion.objectFormat.Size()])
+ if err != nil {
+ return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ rec.baseOID = baseID
+ case packfile.EntryTypeInvalid, packfile.EntryTypeFuture:
+ return rec, fmt.Errorf("%w: entry at %d: unsupported entry type", ErrMalformedPack, start)
+ }
+
+ err = ingestion.scanner.use(entryHeader.HeaderLen)
+ if err != nil {
+ return rec, err
+ }
+
+ return rec, nil
+}
+
+// drainPayload consumes one entry's compressed payload from the stream,
+// returning its inflated length and, for base entries, its object ID.
+func (ingestion *ingestion) drainPayload(rec *record) (int64, id.ObjectID, error) {
+ var zero id.ObjectID
+
+ zr, err := zlib.NewReader(ingestion.scanner)
+ if err != nil {
+ return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ defer func() { _ = zr.Close() }()
+
+ if !rec.packedType.IsBase() {
+ read, err := io.Copy(io.Discard, zr)
+ if err != nil {
+ return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ return read, zero, nil
+ }
+
+ objectType, err := rec.packedType.ObjectType()
+ if err != nil {
+ return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ hashImpl, err := ingestion.objectFormat.New()
+ if err != nil {
+ return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ _, _ = hashImpl.Write(header.Append(nil, objectType, rec.declaredSize))
+
+ read, err := io.Copy(hashImpl, zr)
+ if err != nil {
+ return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil))
+ if err != nil {
+ return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ return read, oid, nil
+}
diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go
new file mode 100644
index 00000000..15773a56
--- /dev/null
+++ b/object/store/packed/internal/ingest/thin.go
@@ -0,0 +1,217 @@
+package ingest
+
+import (
+ "errors"
+ "fmt"
+ "hash/crc32"
+ "io"
+ "os"
+
+ "lindenii.org/go/furgit/internal/compress/zlib"
+ "lindenii.org/go/furgit/internal/format/packfile"
+ "lindenii.org/go/furgit/internal/progress"
+ "lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/furgit/object/typ"
+ "lindenii.org/go/lgo/intconv"
+)
+
+// fixThin completes a thin pack
+// by appending the external bases it references,
+// rewriting the pack header and trailer,
+// and resolving the deltas reached from the appended bases.
+func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency, meter *progress.Meter) error {
+ if ingestion.opts.ThinBase == nil {
+ return ErrThinPackNotPermitted
+ }
+
+ hashSize := ingestion.objectFormat.Size()
+ if ingestion.scanner.consumed < packfile.HeaderLen+hashSize {
+ return fmt.Errorf("%w: pack shorter than trailer", ErrMalformedPack)
+ }
+
+ // Drop the trailer from the write cursor.
+ ingestion.scanner.consumed -= hashSize
+
+ appended := make([]int, 0, len(external))
+
+ for _, baseOID := range external {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID)
+ if errors.Is(err, store.ErrObjectNotFound) {
+ continue
+ }
+
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: reading thin base %s: %w", baseOID, err)
+ }
+
+ index, err := ingestion.appendBaseObject(baseOID, ty, content)
+ if err != nil {
+ return err
+ }
+
+ appended = append(appended, index)
+ }
+
+ err := ingestion.rewriteHeaderTrailer()
+ if err != nil {
+ return err
+ }
+
+ err = ingestion.resolveFrom(appended, adjacency, meter)
+ if err != nil {
+ return err
+ }
+
+ missing := ingestion.unresolvedExternalBases()
+ if len(missing) > 0 {
+ return &ThinBasesMissingError{OIDs: missing}
+ }
+
+ if ingestion.countUnresolved() > 0 {
+ return fmt.Errorf("%w: unresolvable delta entries after thin completion", ErrMalformedPack)
+ }
+
+ ingestion.thinFixed = len(appended) > 0
+
+ return nil
+}
+
+// appendBaseObject appends one external thin base
+// as a non-delta pack entry at the current write cursor,
+// verifying that its content hashes to the requested object ID.
+func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType typ.Type, content []byte) (int, error) {
+ entryType, err := packfile.EntryTypeFromObjectType(objectType)
+ if err != nil {
+ return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ computed, err := ingestion.hashObject(objectType, content)
+ if err != nil {
+ return 0, err
+ }
+
+ if computed != objectID {
+ return 0, fmt.Errorf("%w: thin base %s content hashes to %s", ErrMalformedPack, objectID, computed)
+ }
+
+ start := ingestion.scanner.consumed
+ startOffset := int64(start)
+
+ headerBytes := packfile.AppendTypeSize(nil, entryType, uint64(len(content)))
+
+ _, err = ingestion.packFile.WriteAt(headerBytes, startOffset)
+ if err != nil {
+ return 0, fmt.Errorf("object/store/packed/internal/ingest: writing thin base header: %w", err)
+ }
+
+ crc := crc32.NewIEEE()
+ _, _ = crc.Write(headerBytes)
+
+ dataOffset := startOffset + int64(len(headerBytes))
+ writer := &offsetWriter{file: ingestion.packFile, offset: dataOffset}
+
+ zw := zlib.NewWriter(io.MultiWriter(writer, crc))
+
+ _, err = zw.Write(content)
+ if err != nil {
+ _ = zw.Close()
+
+ return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err)
+ }
+
+ err = zw.Close()
+ if err != nil {
+ return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err)
+ }
+
+ headerLen := len(headerBytes)
+ packedLen := headerLen + writer.written
+ ingestion.scanner.consumed = start + packedLen
+
+ rec := record{
+ offset: start,
+ headerLen: headerLen,
+ packedLen: packedLen,
+ crc32: crc.Sum32(),
+ packedType: entryType,
+ declaredSize: len(content),
+ baseOffset: 0,
+ baseOID: id.ObjectID{},
+ oid: objectID,
+ resolved: true,
+ }
+
+ index := len(ingestion.records)
+ ingestion.records = append(ingestion.records, rec)
+ ingestion.byOffset[start] = index
+ ingestion.byOID.Store(objectID, index)
+
+ return index, nil
+}
+
+// rewriteHeaderTrailer updates the pack object count
+// and recomputes the pack trailer hash
+// over the entries left after thin completion.
+func (ingestion *ingestion) rewriteHeaderTrailer() error {
+ count, err := intconv.IntToUint32(len(ingestion.records))
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ _, err = ingestion.packFile.WriteAt(packfile.AppendHeader(nil, count), 0)
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: rewriting header: %w", err)
+ }
+
+ bodyEnd := int64(ingestion.scanner.consumed)
+
+ hashImpl, err := ingestion.objectFormat.New()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ _, err = io.Copy(hashImpl, io.NewSectionReader(ingestion.packFile, 0, bodyEnd))
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: rehashing pack: %w", err)
+ }
+
+ sum := hashImpl.Sum(nil)
+
+ _, err = ingestion.packFile.WriteAt(sum, bodyEnd)
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: writing trailer: %w", err)
+ }
+
+ packHash, err := ingestion.objectFormat.FromBytes(sum)
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ ingestion.packHash = packHash
+
+ return nil
+}
+
+// offsetWriter writes to a file via WriteAt,
+// advancing sequentially from a base offset
+// and counting the bytes written.
+type offsetWriter struct {
+ file *os.File
+ offset int64
+ written int
+}
+
+// Write implements [io.Writer].
+func (writer *offsetWriter) Write(p []byte) (int, error) {
+ n, err := writer.file.WriteAt(p, writer.offset)
+ writer.offset += int64(n)
+ writer.written += n
+
+ return n, err //nolint:wrapcheck
+}
diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go
new file mode 100644
index 00000000..b2f4d2b8
--- /dev/null
+++ b/object/store/packed/internal/ingest/writepack_test.go
@@ -0,0 +1,607 @@
+package ingest_test
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "io"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "lindenii.org/go/furgit/internal/format/packidx"
+ "lindenii.org/go/furgit/internal/format/packidx/bloom"
+ "lindenii.org/go/furgit/internal/testgit"
+ "lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/furgit/object/store/packed"
+ "lindenii.org/go/furgit/object/store/packed/internal/ingest"
+)
+
+// TestWritePackMatchesGit verifies that ingesting a normal pack
+// matches git's own pack, index, and reverse index.
+//
+// The pack is streamed through verbatim,
+// and the index and reverse index are regenerated deterministically,
+// so a successful match also confirms that scanning and delta resolution
+// recovered every object ID, offset, and CRC that git recorded.
+func TestWritePackMatchesGit(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat})
+ if err != nil {
+ t.Fatalf("NewRepo: %v", err)
+ }
+
+ seeded, err := repo.SeedHistory(t)
+ if err != nil {
+ t.Fatalf("SeedHistory: %v", err)
+ }
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: true,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+
+ if want := filepath.Base(gitPrefix) + ".pack"; result.PackName != want {
+ t.Fatalf("PackName = %q, want %q", result.PackName, want)
+ }
+
+ for _, artifact := range []struct {
+ kind string
+ ours string
+ want string
+ }{
+ {"pack", result.PackName, gitPrefix + ".pack"},
+ {"idx", result.IdxName, gitPrefix + ".idx"},
+ {"rev", result.RevName, gitPrefix + ".rev"},
+ } {
+ ours, err := os.ReadFile(filepath.Join(dir, artifact.ours)) //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile %s: %v", artifact.kind, err)
+ }
+
+ want, err := os.ReadFile(artifact.want)
+ if err != nil {
+ t.Fatalf("ReadFile git %s: %v", artifact.kind, err)
+ }
+
+ if !bytes.Equal(ours, want) {
+ t.Errorf("%s differs from git: %d bytes vs %d", artifact.kind, len(ours), len(want))
+ }
+ }
+ })
+ }
+}
+
+// TestWritePackBloom verifies that ingesting a pack writes a Bloom filter
+// that reports every object in the pack as present.
+func TestWritePackBloom(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat})
+ if err != nil {
+ t.Fatalf("NewRepo: %v", err)
+ }
+
+ seeded, err := repo.SeedHistory(t)
+ if err != nil {
+ t.Fatalf("SeedHistory: %v", err)
+ }
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: true,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+
+ if result.BloomName == "" {
+ t.Fatal("BloomName is empty")
+ }
+
+ bloomBytes, err := os.ReadFile(filepath.Join(dir, result.BloomName)) //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile bloom: %v", err)
+ }
+
+ filter, err := bloom.Parse(bloomBytes, objectFormat)
+ if err != nil {
+ t.Fatalf("bloom.Parse: %v", err)
+ }
+
+ idxBytes, err := os.ReadFile(filepath.Join(dir, result.IdxName)) //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile idx: %v", err)
+ }
+
+ index, err := packidx.Parse(idxBytes, objectFormat.Size())
+ if err != nil {
+ t.Fatalf("packidx.Parse: %v", err)
+ }
+
+ if !bytes.Equal(filter.PackHash(), index.PackHash()) {
+ t.Fatalf("filter pack hash %x, want %x", filter.PackHash(), index.PackHash())
+ }
+
+ for pos := range index.NumObjects() {
+ if !filter.MayContain(index.OIDAt(pos)) {
+ t.Fatalf("filter rejects object at index position %d", pos)
+ }
+ }
+ })
+ }
+}
+
+// TestWritePackEmpty verifies that a zero-object pack
+// succeeds without writing any artifacts.
+func TestWritePackEmpty(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat})
+ if err != nil {
+ t.Fatalf("NewRepo: %v", err)
+ }
+
+ stream, err := repo.PackObjectsStdout(t, nil, testgit.PackObjectsStdoutOptions{
+ Revs: false,
+ Thin: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjectsStdout: %v", err)
+ }
+
+ dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+
+ if result.ObjectCount != 0 {
+ t.Fatalf("ObjectCount = %d, want 0", result.ObjectCount)
+ }
+
+ if result.PackName != "" {
+ t.Fatalf("PackName = %q, want empty", result.PackName)
+ }
+
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ t.Fatalf("ReadDir: %v", err)
+ }
+
+ if len(entries) != 0 {
+ t.Fatalf("empty pack wrote %d files, want 0", len(entries))
+ }
+ })
+ }
+}
+
+// TestWritePackIdempotent verifies that ingesting the same pack twice
+// into one store succeeds and leaves the artifacts in place.
+func TestWritePackIdempotent(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat})
+ if err != nil {
+ t.Fatalf("NewRepo: %v", err)
+ }
+
+ seeded, err := repo.SeedHistory(t)
+ if err != nil {
+ t.Fatalf("SeedHistory: %v", err)
+ }
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: true,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ dir := t.TempDir()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ first, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+ if err != nil {
+ t.Fatalf("first WritePack: %v", err)
+ }
+
+ second, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+ if err != nil {
+ t.Fatalf("second WritePack: %v", err)
+ }
+
+ if second.PackName != first.PackName {
+ t.Fatalf("second PackName = %q, want %q", second.PackName, first.PackName)
+ }
+
+ for _, name := range []string{first.PackName, first.IdxName, first.RevName} {
+ _, err := os.Stat(filepath.Join(dir, name))
+ if err != nil {
+ t.Fatalf("missing %q after re-write: %v", name, err)
+ }
+ }
+ })
+ }
+}
+
+// writePack ingests src into a fresh store directory,
+// returning the directory and the ingest result.
+func writePack(
+ t *testing.T,
+ objectFormat id.ObjectFormat,
+ src io.Reader,
+ opts store.PackWriteOptions,
+) (string, ingest.Result) {
+ t.Helper()
+
+ dir := t.TempDir()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ result, err := ingest.WritePack(t.Context(), root, objectFormat, src, opts)
+ if err != nil {
+ t.Fatalf("WritePack: %v", err)
+ }
+
+ return dir, result
+}
+
+// TestWritePackThin verifies that a thin pack is completed from the thin base
+// and that git accepts the resulting self-contained pack.
+func TestWritePackThin(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+ thinBase := fullStore(t, repo, objectFormat, seeded)
+ stream := thinStream(t, repo, seeded)
+
+ dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: thinBase,
+ Progress: nil,
+ })
+
+ if !result.ThinFixed {
+ t.Fatalf("ThinFixed = false, want true (pack was not thin)")
+ }
+
+ _, err := repo.VerifyPack(t, filepath.Join(dir, result.IdxName))
+ if err != nil {
+ t.Fatalf("VerifyPack on completed pack: %v", err)
+ }
+ })
+ }
+}
+
+// TestWritePackThinWithoutBase verifies that a thin pack is rejected
+// when no thin base is supplied.
+func TestWritePackThinWithoutBase(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+ stream := thinStream(t, repo, seeded)
+
+ _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+
+ if !errors.Is(err, ingest.ErrThinPackNotPermitted) {
+ t.Fatalf("err = %v, want ErrThinPackNotPermitted", err)
+ }
+ })
+ }
+}
+
+// TestWritePackThinMissingBase verifies that a thin pack
+// whose bases are absent from the thin base
+// reports the missing object IDs.
+func TestWritePackThinMissingBase(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+ emptyBase := emptyStore(t, objectFormat)
+ stream := thinStream(t, repo, seeded)
+
+ _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: emptyBase,
+ Progress: nil,
+ })
+
+ missing, ok := errors.AsType[*ingest.ThinBasesMissingError](err)
+ if !ok {
+ t.Fatalf("err = %v, want *ThinBasesMissingError", err)
+ }
+
+ if len(missing.OIDs) == 0 {
+ t.Fatalf("ThinBasesMissingError reported no object IDs")
+ }
+ })
+ }
+}
+
+// TestWritePackContextCancelled verifies that a cancelled context
+// aborts ingestion and publishes no artifacts.
+func TestWritePackContextCancelled(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: false,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ ctx, cancel := context.WithCancel(t.Context())
+ cancel()
+
+ dir := t.TempDir()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ _, err = ingest.WritePack(ctx, root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+ if !errors.Is(err, context.Canceled) {
+ t.Fatalf("err = %v, want context.Canceled", err)
+ }
+
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ t.Fatalf("ReadDir: %v", err)
+ }
+
+ if len(entries) != 0 {
+ t.Fatalf("cancelled ingestion left %d files behind", len(entries))
+ }
+ })
+ }
+}
+
+// TestWritePackObjectTooLarge verifies that an object exceeding MaxObjectSize
+// is rejected and no artifacts are published.
+func TestWritePackObjectTooLarge(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: false,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ dir := t.TempDir()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ _, err = ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ MaxObjectSize: 1,
+ })
+ if !errors.Is(err, store.ErrObjectTooLarge) {
+ t.Fatalf("err = %v, want ErrObjectTooLarge", err)
+ }
+
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ t.Fatalf("ReadDir: %v", err)
+ }
+
+ if len(entries) != 0 {
+ t.Fatalf("rejected ingestion left %d files behind", len(entries))
+ }
+ })
+ }
+}
+
+// seedHistory creates one repository with a seeded history.
+func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) {
+ t.Helper()
+
+ repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat})
+ if err != nil {
+ t.Fatalf("NewRepo: %v", err)
+ }
+
+ seeded, err := repo.SeedHistory(t)
+ if err != nil {
+ t.Fatalf("SeedHistory: %v", err)
+ }
+
+ return repo, seeded
+}
+
+// thinStream produces a thin pack of the tip commit excluding its parent,
+// so its deltas reference the omitted parent objects.
+func thinStream(t *testing.T, repo *testgit.Repo, seeded testgit.Seeded) []byte {
+ t.Helper()
+
+ tip := seeded.Commits[len(seeded.Commits)-1]
+ parent := seeded.Commits[len(seeded.Commits)-2]
+
+ stream, err := repo.PackObjectsStdout(t, []id.ObjectID{tip}, testgit.PackObjectsStdoutOptions{
+ Revs: true,
+ Thin: true,
+ Exclude: []id.ObjectID{parent},
+ })
+ if err != nil {
+ t.Fatalf("PackObjectsStdout: %v", err)
+ }
+
+ return stream
+}
+
+// fullStore opens a packed store over a pack of every seeded object.
+func fullStore(t *testing.T, repo *testgit.Repo, objectFormat id.ObjectFormat, seeded testgit.Seeded) *packed.Packed {
+ t.Helper()
+
+ prefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: false,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ return openStore(t, filepath.Dir(prefix), objectFormat)
+}
+
+// emptyStore opens a packed store over an empty directory.
+func emptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed {
+ t.Helper()
+
+ return openStore(t, t.TempDir(), objectFormat)
+}
+
+// openStore opens a packed store over dir.
+func openStore(t *testing.T, dir string, objectFormat id.ObjectFormat) *packed.Packed {
+ t.Helper()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ packedStore, err := packed.New(root, objectFormat)
+ if err != nil {
+ t.Fatalf("New: %v", err)
+ }
+
+ t.Cleanup(func() { _ = packedStore.Close() })
+
+ return packedStore
+}
+
+// freshRoot opens a writable root over a fresh temporary directory.
+func freshRoot(t *testing.T) *os.Root {
+ t.Helper()
+
+ root, err := os.OpenRoot(t.TempDir())
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ return root
+}