aboutsummaryrefslogtreecommitdiff
path: root/object/store
diff options
context:
space:
mode:
Diffstat (limited to 'object/store')
-rw-r--r--object/store/dual/quarantine.go11
-rw-r--r--object/store/dual/writer.go5
-rw-r--r--object/store/loose/quarantine.go2
-rw-r--r--object/store/packed/internal/ingest/basecache.go25
-rw-r--r--object/store/packed/internal/ingest/finalize.go100
-rw-r--r--object/store/packed/internal/ingest/ingest.go57
-rw-r--r--object/store/packed/internal/ingest/record.go6
-rw-r--r--object/store/packed/internal/ingest/resolve.go339
-rw-r--r--object/store/packed/internal/ingest/result.go3
-rw-r--r--object/store/packed/internal/ingest/scan.go24
-rw-r--r--object/store/packed/internal/ingest/thin.go10
-rw-r--r--object/store/packed/internal/ingest/writepack_test.go200
-rw-r--r--object/store/packed/lookup.go4
-rw-r--r--object/store/packed/pack.go52
-rw-r--r--object/store/packed/quarantine.go37
-rw-r--r--object/store/packed/writer.go5
-rw-r--r--object/store/packed/writer_test.go2
-rw-r--r--object/store/writer.go14
18 files changed, 735 insertions, 161 deletions
diff --git a/object/store/dual/quarantine.go b/object/store/dual/quarantine.go
index b73e48fe..6052c134 100644
--- a/object/store/dual/quarantine.go
+++ b/object/store/dual/quarantine.go
@@ -1,6 +1,7 @@
package dual
import (
+ "context"
"errors"
"fmt"
"io"
@@ -12,15 +13,11 @@ import (
)
// BeginObjectQuarantine begins an object-wise quarantine on the object side.
-//
-//nolint:ireturn
func (dual *Dual) BeginObjectQuarantine(opts store.ObjectQuarantineOptions) (store.ObjectQuarantine, error) {
return dual.object.BeginObjectQuarantine(opts) //nolint:wrapcheck
}
// BeginPackQuarantine begins a pack-wise quarantine on the pack side.
-//
-//nolint:ireturn
func (dual *Dual) BeginPackQuarantine(opts store.PackQuarantineOptions) (store.PackQuarantine, error) {
return dual.pack.BeginPackQuarantine(opts) //nolint:wrapcheck
}
@@ -29,8 +26,6 @@ func (dual *Dual) BeginPackQuarantine(opts store.PackQuarantineOptions) (store.P
//
// If the pack side fails to begin,
// the already-begun object side is discarded before returning.
-//
-//nolint:ireturn
func (dual *Dual) BeginCoordinatedQuarantine(opts store.CoordinatedQuarantineOptions) (store.CoordinatedQuarantine, error) {
objectQ, err := dual.object.BeginObjectQuarantine(opts.Object)
if err != nil {
@@ -111,8 +106,8 @@ func (quarantine *coordinatedQuarantine) WriteReaderContent(ty typ.Type, size in
return quarantine.objectQ.WriteReaderContent(ty, size, src) //nolint:wrapcheck
}
-func (quarantine *coordinatedQuarantine) WritePack(src io.Reader, opts store.PackWriteOptions) error {
- return quarantine.packQ.WritePack(src, opts) //nolint:wrapcheck
+func (quarantine *coordinatedQuarantine) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error {
+ return quarantine.packQ.WritePack(ctx, src, opts) //nolint:wrapcheck
}
// Promote publishes both halves and joins their errors.
diff --git a/object/store/dual/writer.go b/object/store/dual/writer.go
index f75f49e1..fb59adbe 100644
--- a/object/store/dual/writer.go
+++ b/object/store/dual/writer.go
@@ -1,6 +1,7 @@
package dual
import (
+ "context"
"io"
"lindenii.org/go/furgit/object/id"
@@ -29,6 +30,6 @@ func (dual *Dual) WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.O
}
// WritePack ingests one pack stream into the pack side.
-func (dual *Dual) WritePack(src io.Reader, opts store.PackWriteOptions) error {
- return dual.pack.WritePack(src, opts) //nolint:wrapcheck
+func (dual *Dual) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error {
+ return dual.pack.WritePack(ctx, src, opts) //nolint:wrapcheck
}
diff --git a/object/store/loose/quarantine.go b/object/store/loose/quarantine.go
index 214f7219..cd337670 100644
--- a/object/store/loose/quarantine.go
+++ b/object/store/loose/quarantine.go
@@ -30,7 +30,7 @@ type objectQuarantine struct {
// beneath the destination loose root.
//
// Labels: Deps-Borrowed, Life-Parent, Close-No.
-func (loose *Loose) BeginObjectQuarantine(_ store.ObjectQuarantineOptions) (store.ObjectQuarantine, error) { //nolint:ireturn
+func (loose *Loose) BeginObjectQuarantine(_ store.ObjectQuarantineOptions) (store.ObjectQuarantine, error) {
tempName, tempRoot, err := createLooseQuarantineRoot(loose.root)
if err != nil {
return nil, err
diff --git a/object/store/packed/internal/ingest/basecache.go b/object/store/packed/internal/ingest/basecache.go
new file mode 100644
index 00000000..77419aa7
--- /dev/null
+++ b/object/store/packed/internal/ingest/basecache.go
@@ -0,0 +1,25 @@
+package ingest
+
+import (
+ "lindenii.org/go/furgit/internal/cache/clock"
+ "lindenii.org/go/furgit/object/typ"
+)
+
+const baseCacheMaxWeight = 96 << 20
+
+type baseCacheKey struct {
+ offset int
+}
+
+type cachedContent struct {
+ objectType typ.Type
+ content []byte
+}
+
+func newBaseCache(workers int) *clock.Clock[baseCacheKey, cachedContent] {
+ return clock.New(baseCacheMaxWeight*uint64(workers), baseContentWeight) //#nosec G115
+}
+
+func baseContentWeight(_ baseCacheKey, base cachedContent) uint64 {
+ return uint64(len(base.content)) + 32
+}
diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go
index f0ab6622..c6b1e2c9 100644
--- a/object/store/packed/internal/ingest/finalize.go
+++ b/object/store/packed/internal/ingest/finalize.go
@@ -8,6 +8,7 @@ import (
"slices"
"lindenii.org/go/furgit/internal/format/packidx"
+ "lindenii.org/go/furgit/internal/format/packidx/bloom"
"lindenii.org/go/furgit/internal/format/packrev"
"lindenii.org/go/furgit/object/id"
"lindenii.org/go/lgo/intconv"
@@ -17,6 +18,11 @@ import (
// then links the pack, reverse index, and index
// to their content-addressed names.
func (ingestion *ingestion) finalize() (Result, error) {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
entries, positions, err := ingestion.indexEntries()
if err != nil {
return Result{}, err
@@ -38,24 +44,16 @@ func (ingestion *ingestion) finalize() (Result, error) {
return Result{}, err
}
- base := "pack-" + ingestion.packHash.String()
- packFinal := base + ".pack"
- idxFinal := base + ".idx"
- revFinal := base + ".rev"
-
- // Link the pack and reverse index before the index,
- // since the index is what publishes the pack to readers.
- err = ingestion.link(ingestion.packTmp, packFinal)
+ bloomBuilder, err := ingestion.buildBloom(entries, packHash)
if err != nil {
return Result{}, err
}
- err = ingestion.link(revTmp, revFinal)
- if err != nil {
- return Result{}, err
- }
+ bloomTmp, err := ingestion.writeTemp("tmp_bloom_", func(w io.Writer) error {
+ _, err := w.Write(bloomBuilder.Bytes())
- err = ingestion.link(idxTmp, idxFinal)
+ return err
+ })
if err != nil {
return Result{}, err
}
@@ -65,16 +63,70 @@ func (ingestion *ingestion) finalize() (Result, error) {
return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
}
+ base := "pack-" + ingestion.packHash.String()
+ packFinal := base + ".pack"
+ idxFinal := base + ".idx"
+ revFinal := base + ".rev"
+ bloomFinal := base + ".bloom"
+
+ // Link the data files before the index,
+ // since the index is what publishes the pack to readers.
+ artifacts := [...]struct{ tmp, final string }{
+ {ingestion.packTmp, packFinal},
+ {revTmp, revFinal},
+ {bloomTmp, bloomFinal},
+ {idxTmp, idxFinal},
+ }
+
+ var created []string
+
+ for _, artifact := range artifacts {
+ linked, err := ingestion.promote(artifact.tmp, artifact.final)
+ if err != nil {
+ for i := len(created) - 1; i >= 0; i-- {
+ _ = ingestion.root.Remove(created[i])
+ }
+
+ return Result{}, err
+ }
+
+ if linked {
+ created = append(created, artifact.final)
+ }
+ }
+
return Result{
PackName: packFinal,
IdxName: idxFinal,
RevName: revFinal,
+ BloomName: bloomFinal,
PackHash: ingestion.packHash,
ObjectCount: objectCount,
ThinFixed: ingestion.thinFixed,
}, nil
}
+// buildBloom builds a Bloom filter over the index entries' object IDs,
+// bound to packHash.
+func (ingestion *ingestion) buildBloom(entries []packidx.Entry, packHash []byte) (*bloom.Builder, error) {
+ bucketCount, k, err := bloom.RecommendParams(ingestion.objectFormat, len(entries))
+ if err != nil {
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ builder, err := bloom.NewBuilder(ingestion.objectFormat, bucketCount, k, packHash)
+ if err != nil {
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ size := ingestion.objectFormat.Size()
+ for i := range entries {
+ builder.Add(entries[i].OID[:size])
+ }
+
+ return builder, nil
+}
+
// indexEntries returns the index entries in object-ID order
// and, for each record in pack order, its position in that index order.
func (ingestion *ingestion) indexEntries() ([]packidx.Entry, []uint32, error) {
@@ -141,15 +193,21 @@ func (ingestion *ingestion) writeTemp(prefix string, write func(io.Writer) error
return name, nil
}
-// link hard-links tmp to final,
-// treating an already-present destination as success.
-func (ingestion *ingestion) link(tmp, final string) error {
+// promote hard-links tmp to final and reports whether final was newly created.
+// A pre-existing final is treated as success; rollback must not remove it.
+func (ingestion *ingestion) promote(tmp, final string) (bool, error) {
err := ingestion.root.Link(tmp, final)
- if err != nil && !errors.Is(err, fs.ErrExist) {
- return fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err)
- }
- _ = ingestion.root.Remove(tmp)
+ switch {
+ case err == nil:
+ _ = ingestion.root.Remove(tmp)
- return nil
+ return true, nil
+ case errors.Is(err, fs.ErrExist):
+ _ = ingestion.root.Remove(tmp)
+
+ return false, nil
+ default:
+ return false, fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err)
+ }
}
diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go
index 5422b4af..9b60af85 100644
--- a/object/store/packed/internal/ingest/ingest.go
+++ b/object/store/packed/internal/ingest/ingest.go
@@ -2,22 +2,29 @@ package ingest
import (
"bytes"
+ "context"
"crypto/rand"
"errors"
"fmt"
"io"
"io/fs"
"os"
+ "runtime"
+ "sync/atomic"
+ "lindenii.org/go/furgit/internal/cache/clock"
"lindenii.org/go/furgit/internal/format/packfile"
"lindenii.org/go/furgit/object/id"
"lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/lgo/sync"
)
var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names")
// ingestion holds the state for one WritePack call.
type ingestion struct {
+ ctx context.Context //nolint:containedctx
+
// root is the destination objects/pack directory.
root *os.Root
@@ -47,7 +54,12 @@ type ingestion struct {
// byOffset maps an entry offset to its record index,
// and byOID maps a resolved object ID to its record index.
byOffset map[int]int
- byOID map[id.ObjectID]int
+ byOID sync.Map[id.ObjectID, int]
+
+ baseCache *clock.Clock[baseCacheKey, cachedContent]
+
+ // workers is the delta-resolution concurrency.
+ workers int
// headerCount is the object count declared by the pack header.
headerCount int
@@ -55,8 +67,8 @@ type ingestion struct {
// deltaCount counts delta records, accumulated during scanning.
deltaCount int
- // deltasResolved counts resolved delta records, for progress.
- deltasResolved int
+ // deltasResolved counts resolved delta records.
+ deltasResolved atomic.Int64
// packHash is the final pack trailer hash.
packHash id.ObjectID
@@ -81,7 +93,7 @@ type ingestion struct {
// The pack must be the last thing the peer sends before that response:
// any bytes arriving immediately after the trailer
// are rejected as a malformed pack.
-func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) {
+func WritePack(ctx context.Context, root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) {
if objectFormat.Size() == 0 {
return Result{}, id.ErrInvalidObjectFormat
}
@@ -91,24 +103,27 @@ func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts
return Result{}, err
}
+ workers := runtime.GOMAXPROCS(0)
+
ingestion := &ingestion{
- root: root,
- objectFormat: objectFormat,
- opts: opts,
- src: src,
- packFile: nil,
- packTmp: "",
- temps: nil,
- scanner: nil,
- records: nil,
- byOffset: make(map[int]int),
- byOID: make(map[id.ObjectID]int),
- headerCount: count,
- deltaCount: 0,
- deltasResolved: 0,
- packHash: id.ObjectID{},
- thinFixed: false,
- committed: false,
+ ctx: ctx,
+ root: root,
+ objectFormat: objectFormat,
+ opts: opts,
+ src: src,
+ packFile: nil,
+ packTmp: "",
+ temps: nil,
+ scanner: nil,
+ records: nil,
+ byOffset: make(map[int]int),
+ baseCache: newBaseCache(workers),
+ workers: workers,
+ headerCount: count,
+ deltaCount: 0,
+ packHash: id.ObjectID{},
+ thinFixed: false,
+ committed: false,
}
defer ingestion.cleanup()
diff --git a/object/store/packed/internal/ingest/record.go b/object/store/packed/internal/ingest/record.go
index 69101293..4031a246 100644
--- a/object/store/packed/internal/ingest/record.go
+++ b/object/store/packed/internal/ingest/record.go
@@ -37,15 +37,11 @@ type record struct {
// baseOID is the base object ID for a ref-delta.
baseOID id.ObjectID
- // objectType is the resolved object type,
- // meaningful once resolved is true.
- objectType packfile.EntryType
-
// oid is the resolved object ID,
// meaningful once resolved is true.
oid id.ObjectID
- // resolved reports whether oid and objectType are final.
+ // resolved reports whether oid is final.
resolved bool
}
diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go
index 77b0fa0f..dd26cd47 100644
--- a/object/store/packed/internal/ingest/resolve.go
+++ b/object/store/packed/internal/ingest/resolve.go
@@ -3,6 +3,7 @@ package ingest
import (
"fmt"
"io"
+ "sync"
"lindenii.org/go/furgit/internal/compress/zlib"
"lindenii.org/go/furgit/internal/format/packfile"
@@ -10,6 +11,8 @@ import (
"lindenii.org/go/furgit/internal/progress"
"lindenii.org/go/furgit/object/header"
"lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/furgit/object/typ"
)
// adjacency maps each resolvable base to its delta children:
@@ -83,108 +86,311 @@ func (ingestion *ingestion) buildAdjacency() adjacency {
return out
}
-// resolveFrom resolves the delta subtree rooted at each resolved record.
+// item is a delta record awaiting resolution, with its delta-chain depth.
+type item struct {
+ index int
+ depth int
+}
+
+// resolver resolves deltas concurrently over a shared LIFO work stack.
+//
+// Each item is one delta child;
+// a worker materializes its base from the cache, re-deriving on a miss,
+// resolves the child,
+// and pushes the child's own delta children.
+// Workers park while the stack is empty but others are still working,
+// and exit once it is empty and none are.
+type resolver struct {
+ ingestion *ingestion
+ adjacency adjacency
+ meter *progress.Meter
+
+ mu sync.Mutex
+ cond *sync.Cond
+ stack []item
+ active int
+ firstErr error
+}
+
func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error {
+ var seed []item
+
for _, root := range roots {
- content, err := ingestion.inflateRecord(root)
- if err != nil {
- return err
+ rec := &ingestion.records[root]
+ for _, group := range [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]} {
+ for _, child := range group {
+ seed = append(seed, item{index: child, depth: 1})
+ }
}
+ }
- err = ingestion.resolveSubtree(root, content, ingestion.records[root].objectType, 0, adjacency, meter)
- if err != nil {
- return err
- }
+ if len(seed) == 0 {
+ return nil
}
- return nil
+ res := &resolver{
+ ingestion: ingestion,
+ adjacency: adjacency,
+ meter: meter,
+ stack: seed,
+ }
+ res.cond = sync.NewCond(&res.mu)
+
+ return res.run(ingestion.workers)
}
-// resolveSubtree resolves every delta child of one resolved record at depth,
-// holding the record's content as the base for its children.
-func (ingestion *ingestion) resolveSubtree(
- index int,
- content []byte,
- objectType packfile.EntryType,
- depth int,
- adjacency adjacency,
- meter *progress.Meter,
-) error {
- rec := &ingestion.records[index]
+func (res *resolver) run(workers int) error {
+ if workers <= 1 {
+ res.worker()
- for _, child := range adjacency.byOffset[rec.offset] {
- err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter)
- if err != nil {
- return err
+ return res.firstErr
+ }
+
+ var wg sync.WaitGroup
+
+ for range workers {
+ wg.Go(func() {
+ res.worker()
+ })
+ }
+
+ wg.Wait()
+
+ return res.firstErr
+}
+
+func (res *resolver) worker() {
+ for {
+ res.mu.Lock()
+
+ for len(res.stack) == 0 && res.active > 0 && res.firstErr == nil {
+ res.cond.Wait()
+ }
+
+ if res.firstErr != nil || len(res.stack) == 0 {
+ res.mu.Unlock()
+
+ return
+ }
+
+ it := res.stack[len(res.stack)-1]
+ res.stack = res.stack[:len(res.stack)-1]
+ res.active++
+ res.mu.Unlock()
+
+ kids, err := res.process(it)
+
+ res.mu.Lock()
+ res.active--
+
+ if err != nil && res.firstErr == nil {
+ res.firstErr = err
+ }
+
+ if res.firstErr == nil {
+ res.stack = append(res.stack, kids...)
}
+
+ if res.firstErr != nil || len(kids) > 0 || (res.active == 0 && len(res.stack) == 0) {
+ res.cond.Broadcast()
+ }
+
+ res.mu.Unlock()
}
+}
- for _, child := range adjacency.byOID[rec.oid] {
- err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter)
- if err != nil {
- return err
+// process resolves one delta child and returns its own delta children.
+func (res *resolver) process(it item) ([]item, error) {
+ err := res.ingestion.ctx.Err()
+ if err != nil {
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ rec := &res.ingestion.records[it.index]
+
+ parent, ok := res.ingestion.baseRecordIndex(rec)
+ if !ok {
+ return nil, fmt.Errorf("%w: entry at %d: base unavailable while resolving", ErrMalformedPack, rec.offset)
+ }
+
+ baseType, baseContent, err := res.ingestion.materialize(parent)
+ if err != nil {
+ return nil, err
+ }
+
+ err = res.ingestion.resolveOneChild(it.index, baseType, baseContent, res.meter)
+ if err != nil {
+ return nil, err
+ }
+
+ return res.childItems(it.index, it.depth+1)
+}
+
+// childItems returns the delta children of a just-resolved record at depth.
+func (res *resolver) childItems(index, depth int) ([]item, error) {
+ rec := &res.ingestion.records[index]
+
+ var kids []item
+
+ for _, group := range [2][]int{res.adjacency.byOffset[rec.offset], res.adjacency.byOID[rec.oid]} {
+ for _, child := range group {
+ if depth > delta.MaxChainDepth {
+ return nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, res.ingestion.records[child].offset)
+ }
+
+ kids = append(kids, item{index: child, depth: depth})
}
}
- return nil
+ return kids, nil
}
-// resolveChild applies one delta record at depth against its base content,
-// finalizes the record, and recurses into its own children.
-func (ingestion *ingestion) resolveChild(
- index int,
- baseContent []byte,
- baseType packfile.EntryType,
- depth int,
- adjacency adjacency,
- meter *progress.Meter,
-) error {
+func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseContent []byte, meter *progress.Meter) error {
rec := &ingestion.records[index]
- if rec.resolved {
- return nil
+
+ content, err := ingestion.applyDelta(index, baseContent)
+ if err != nil {
+ return err
+ }
+
+ oid, err := ingestion.hashObject(baseType, content)
+ if err != nil {
+ return err
+ }
+
+ rec.oid = oid
+ rec.resolved = true
+
+ ingestion.byOID.Store(oid, index)
+ ingestion.baseCache.Add(baseCacheKey{offset: rec.offset}, cachedContent{objectType: baseType, content: content})
+
+ ingestion.deltasResolved.Add(1)
+ meter.Add(1, 0)
+
+ return nil
+}
+
+// materialize returns the inflated content of an already-resolved record,
+// from the base cache,
+// or re-derived from the nearest cached or base ancestor on a miss.
+func (ingestion *ingestion) materialize(index int) (typ.Type, []byte, error) {
+ var (
+ zero typ.Type
+ chain []int
+ base []byte
+ baseType typ.Type
+ )
+
+ cur := index
+
+ for {
+ rec := &ingestion.records[cur]
+
+ if cached, ok := ingestion.baseCache.Get(baseCacheKey{offset: rec.offset}); ok {
+ base = cached.content
+ baseType = cached.objectType
+
+ break
+ }
+
+ if rec.packedType.IsBase() {
+ objectType, err := rec.packedType.ObjectType()
+ if err != nil {
+ return zero, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ content, err := ingestion.inflateRecord(cur)
+ if err != nil {
+ return zero, nil, err
+ }
+
+ base = content
+ baseType = objectType
+
+ break
+ }
+
+ if len(chain) >= delta.MaxChainDepth {
+ return zero, nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset)
+ }
+
+ chain = append(chain, cur)
+
+ next, ok := ingestion.baseRecordIndex(rec)
+ if !ok {
+ return zero, nil, fmt.Errorf("%w: entry at %d: base unavailable while reconstructing", ErrMalformedPack, rec.offset)
+ }
+
+ cur = next
}
- if depth > delta.MaxChainDepth {
- return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset)
+ for i := len(chain) - 1; i >= 0; i-- {
+ content, err := ingestion.applyDelta(chain[i], base)
+ if err != nil {
+ return zero, nil, err
+ }
+
+ ingestion.baseCache.Add(baseCacheKey{offset: ingestion.records[chain[i]].offset}, cachedContent{objectType: baseType, content: content})
+
+ base = content
}
+ return baseType, base, nil
+}
+
+func (ingestion *ingestion) applyDelta(index int, baseContent []byte) ([]byte, error) {
+ rec := &ingestion.records[index]
+
deltaPayload, err := ingestion.inflateRecord(index)
if err != nil {
- return err
+ return nil, err
}
baseSize, resultSize, _, err := delta.ParseHeaderSizes(deltaPayload)
if err != nil {
- return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ }
+
+ limit := ingestion.opts.MaxObjectSize
+ if limit > 0 && resultSize > uint64(limit) {
+ return nil, fmt.Errorf("%w: entry at %d: result size %d exceeds limit %d", store.ErrObjectTooLarge, rec.offset, resultSize, limit)
}
if baseSize != uint64(len(baseContent)) {
- return fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset)
+ return nil, fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset)
}
content, err := delta.Apply(baseContent, deltaPayload)
if err != nil {
- return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
+ return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
}
if uint64(len(content)) != resultSize {
- return fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset)
+ return nil, fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset)
}
- oid, err := ingestion.hashObject(baseType, content)
- if err != nil {
- return err
- }
+ return content, nil
+}
- rec.objectType = baseType
- rec.oid = oid
- rec.resolved = true
- ingestion.byOID[oid] = index
+func (ingestion *ingestion) baseRecordIndex(rec *record) (int, bool) {
+ switch rec.packedType {
+ case packfile.EntryTypeOfsDelta:
+ index, ok := ingestion.byOffset[rec.baseOffset]
+
+ return index, ok
+ case packfile.EntryTypeRefDelta:
+ index, ok := ingestion.byOID.Load(rec.baseOID)
- ingestion.deltasResolved++
- meter.Set(ingestion.deltasResolved, 0)
+ return index, ok
+ case packfile.EntryTypeInvalid,
+ packfile.EntryTypeCommit,
+ packfile.EntryTypeTree,
+ packfile.EntryTypeBlob,
+ packfile.EntryTypeTag,
+ packfile.EntryTypeFuture:
+ }
- return ingestion.resolveSubtree(index, content, baseType, depth, adjacency, meter)
+ return 0, false
}
// inflateRecord inflates one record's payload from the temporary pack file.
@@ -213,20 +419,15 @@ func (ingestion *ingestion) inflateRecord(index int) ([]byte, error) {
}
// hashObject computes the object ID of one resolved object.
-func (ingestion *ingestion) hashObject(objectType packfile.EntryType, content []byte) (id.ObjectID, error) {
+func (ingestion *ingestion) hashObject(objectType typ.Type, content []byte) (id.ObjectID, error) {
var zero id.ObjectID
- ty, err := objectType.ObjectType()
- if err != nil {
- return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
- }
-
hashImpl, err := ingestion.objectFormat.New()
if err != nil {
return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
}
- _, _ = hashImpl.Write(header.Append(nil, ty, len(content)))
+ _, _ = hashImpl.Write(header.Append(nil, objectType, len(content)))
_, _ = hashImpl.Write(content)
oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil))
@@ -261,7 +462,7 @@ func (ingestion *ingestion) countDeltas() int {
// so the unresolved records are exactly the unresolved deltas:
// the delta records minus those already resolved.
func (ingestion *ingestion) countUnresolved() int {
- return ingestion.deltaCount - ingestion.deltasResolved
+ return ingestion.deltaCount - int(ingestion.deltasResolved.Load())
}
// unresolvedExternalBases returns the unique base object IDs
@@ -270,7 +471,7 @@ func (ingestion *ingestion) countUnresolved() int {
func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID {
seen := make(map[id.ObjectID]struct{})
- out := make([]id.ObjectID, 0, ingestion.deltaCount-ingestion.deltasResolved)
+ out := make([]id.ObjectID, 0, ingestion.deltaCount-int(ingestion.deltasResolved.Load()))
for index := range ingestion.records {
rec := &ingestion.records[index]
@@ -278,7 +479,7 @@ func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID {
continue
}
- if _, ok := ingestion.byOID[rec.baseOID]; ok {
+ if _, ok := ingestion.byOID.Load(rec.baseOID); ok {
continue
}
diff --git a/object/store/packed/internal/ingest/result.go b/object/store/packed/internal/ingest/result.go
index 0ae5593a..9cd6ef1d 100644
--- a/object/store/packed/internal/ingest/result.go
+++ b/object/store/packed/internal/ingest/result.go
@@ -13,6 +13,9 @@ type Result struct {
// RevName is the destination-relative name of the written reverse index.
RevName string
+ // BloomName is the destination-relative name of the written Bloom filter.
+ BloomName string
+
// PackHash is the pack trailer hash
// shared by the pack, index, and reverse index.
PackHash id.ObjectID
diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go
index 6b3b73b7..2cb5c135 100644
--- a/object/store/packed/internal/ingest/scan.go
+++ b/object/store/packed/internal/ingest/scan.go
@@ -13,6 +13,7 @@ import (
"lindenii.org/go/furgit/internal/progress"
"lindenii.org/go/furgit/object/header"
"lindenii.org/go/furgit/object/id"
+ "lindenii.org/go/furgit/object/store"
"lindenii.org/go/lgo/intconv"
)
@@ -294,13 +295,22 @@ func (ingestion *ingestion) streamAndScan() error {
Throughput: true,
})
- for done := range ingestion.headerCount {
- err := ingestion.scanEntry(ingestion.scanner.consumed)
+ prevConsumed := ingestion.scanner.consumed
+
+ for range ingestion.headerCount {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ err = ingestion.scanEntry(ingestion.scanner.consumed)
if err != nil {
return err
}
- meter.Set(done+1, ingestion.scanner.consumed)
+ consumed := ingestion.scanner.consumed
+ meter.Add(1, int64(consumed-prevConsumed))
+ prevConsumed = consumed
}
meter.Stop("done")
@@ -345,7 +355,6 @@ func (ingestion *ingestion) scanEntry(start int) error {
rec.crc32 = ingestion.scanner.endCRC()
if rec.packedType.IsBase() {
- rec.objectType = rec.packedType
rec.oid = oid
rec.resolved = true
} else {
@@ -357,7 +366,7 @@ func (ingestion *ingestion) scanEntry(start int) error {
ingestion.byOffset[rec.offset] = index
if rec.resolved {
- ingestion.byOID[rec.oid] = index
+ ingestion.byOID.Store(rec.oid, index)
}
return nil
@@ -384,6 +393,11 @@ func (ingestion *ingestion) scanHeader(start int) (record, error) {
return rec, fmt.Errorf("%w: entry at %d: declared size overflows int: %w", ErrMalformedPack, start, err)
}
+ limit := ingestion.opts.MaxObjectSize
+ if limit > 0 && declaredSize > limit {
+ return rec, fmt.Errorf("%w: entry at %d: declared size %d exceeds limit %d", store.ErrObjectTooLarge, start, declaredSize, limit)
+ }
+
rec.packedType = entryHeader.Type
rec.declaredSize = declaredSize
rec.headerLen = entryHeader.HeaderLen
diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go
index fa125f2f..15773a56 100644
--- a/object/store/packed/internal/ingest/thin.go
+++ b/object/store/packed/internal/ingest/thin.go
@@ -36,6 +36,11 @@ func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency,
appended := make([]int, 0, len(external))
for _, baseOID := range external {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID)
if errors.Is(err, store.ErrObjectNotFound) {
continue
@@ -86,7 +91,7 @@ func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType ty
return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
}
- computed, err := ingestion.hashObject(entryType, content)
+ computed, err := ingestion.hashObject(objectType, content)
if err != nil {
return 0, err
}
@@ -138,7 +143,6 @@ func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType ty
declaredSize: len(content),
baseOffset: 0,
baseOID: id.ObjectID{},
- objectType: entryType,
oid: objectID,
resolved: true,
}
@@ -146,7 +150,7 @@ func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType ty
index := len(ingestion.records)
ingestion.records = append(ingestion.records, rec)
ingestion.byOffset[start] = index
- ingestion.byOID[objectID] = index
+ ingestion.byOID.Store(objectID, index)
return index, nil
}
diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go
index 394d8f6e..b2f4d2b8 100644
--- a/object/store/packed/internal/ingest/writepack_test.go
+++ b/object/store/packed/internal/ingest/writepack_test.go
@@ -2,12 +2,15 @@ package ingest_test
import (
"bytes"
+ "context"
"errors"
"io"
"os"
"path/filepath"
"testing"
+ "lindenii.org/go/furgit/internal/format/packidx"
+ "lindenii.org/go/furgit/internal/format/packidx/bloom"
"lindenii.org/go/furgit/internal/testgit"
"lindenii.org/go/furgit/object/id"
"lindenii.org/go/furgit/object/store"
@@ -89,6 +92,81 @@ func TestWritePackMatchesGit(t *testing.T) {
}
}
+// TestWritePackBloom verifies that ingesting a pack writes a Bloom filter
+// that reports every object in the pack as present.
+func TestWritePackBloom(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat})
+ if err != nil {
+ t.Fatalf("NewRepo: %v", err)
+ }
+
+ seeded, err := repo.SeedHistory(t)
+ if err != nil {
+ t.Fatalf("SeedHistory: %v", err)
+ }
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: true,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+
+ if result.BloomName == "" {
+ t.Fatal("BloomName is empty")
+ }
+
+ bloomBytes, err := os.ReadFile(filepath.Join(dir, result.BloomName)) //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile bloom: %v", err)
+ }
+
+ filter, err := bloom.Parse(bloomBytes, objectFormat)
+ if err != nil {
+ t.Fatalf("bloom.Parse: %v", err)
+ }
+
+ idxBytes, err := os.ReadFile(filepath.Join(dir, result.IdxName)) //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile idx: %v", err)
+ }
+
+ index, err := packidx.Parse(idxBytes, objectFormat.Size())
+ if err != nil {
+ t.Fatalf("packidx.Parse: %v", err)
+ }
+
+ if !bytes.Equal(filter.PackHash(), index.PackHash()) {
+ t.Fatalf("filter pack hash %x, want %x", filter.PackHash(), index.PackHash())
+ }
+
+ for pos := range index.NumObjects() {
+ if !filter.MayContain(index.OIDAt(pos)) {
+ t.Fatalf("filter rejects object at index position %d", pos)
+ }
+ }
+ })
+ }
+}
+
// TestWritePackEmpty verifies that a zero-object pack
// succeeds without writing any artifacts.
func TestWritePackEmpty(t *testing.T) {
@@ -179,7 +257,7 @@ func TestWritePackIdempotent(t *testing.T) {
t.Cleanup(func() { _ = root.Close() })
- first, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ first, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
@@ -187,7 +265,7 @@ func TestWritePackIdempotent(t *testing.T) {
t.Fatalf("first WritePack: %v", err)
}
- second, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ second, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
@@ -228,7 +306,7 @@ func writePack(
t.Cleanup(func() { _ = root.Close() })
- result, err := ingest.WritePack(root, objectFormat, src, opts)
+ result, err := ingest.WritePack(t.Context(), root, objectFormat, src, opts)
if err != nil {
t.Fatalf("WritePack: %v", err)
}
@@ -278,7 +356,7 @@ func TestWritePackThinWithoutBase(t *testing.T) {
repo, seeded := seedHistory(t, objectFormat)
stream := thinStream(t, repo, seeded)
- _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
@@ -304,7 +382,7 @@ func TestWritePackThinMissingBase(t *testing.T) {
emptyBase := emptyStore(t, objectFormat)
stream := thinStream(t, repo, seeded)
- _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: emptyBase,
Progress: nil,
})
@@ -321,6 +399,118 @@ func TestWritePackThinMissingBase(t *testing.T) {
}
}
+// TestWritePackContextCancelled verifies that a cancelled context
+// aborts ingestion and publishes no artifacts.
+func TestWritePackContextCancelled(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: false,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ ctx, cancel := context.WithCancel(t.Context())
+ cancel()
+
+ dir := t.TempDir()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ _, err = ingest.WritePack(ctx, root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+ if !errors.Is(err, context.Canceled) {
+ t.Fatalf("err = %v, want context.Canceled", err)
+ }
+
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ t.Fatalf("ReadDir: %v", err)
+ }
+
+ if len(entries) != 0 {
+ t.Fatalf("cancelled ingestion left %d files behind", len(entries))
+ }
+ })
+ }
+}
+
+// TestWritePackObjectTooLarge verifies that an object exceeding MaxObjectSize
+// is rejected and no artifacts are published.
+func TestWritePackObjectTooLarge(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: false,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ dir := t.TempDir()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ _, err = ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ MaxObjectSize: 1,
+ })
+ if !errors.Is(err, store.ErrObjectTooLarge) {
+ t.Fatalf("err = %v, want ErrObjectTooLarge", err)
+ }
+
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ t.Fatalf("ReadDir: %v", err)
+ }
+
+ if len(entries) != 0 {
+ t.Fatalf("rejected ingestion left %d files behind", len(entries))
+ }
+ })
+ }
+}
+
// seedHistory creates one repository with a seeded history.
func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) {
t.Helper()
diff --git a/object/store/packed/lookup.go b/object/store/packed/lookup.go
index e54d34b2..e06870a9 100644
--- a/object/store/packed/lookup.go
+++ b/object/store/packed/lookup.go
@@ -24,6 +24,10 @@ func (packed *Packed) lookup(objectID id.ObjectID) (*pack, int, error) {
oid := objectID.RawBytes()
for _, p := range packed.order.Keys() {
+ if p.filter != nil && !p.filter.MayContain(oid) {
+ continue
+ }
+
offsetU, found, err := p.idx.Lookup(oid)
if err != nil {
return nil, 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err)
diff --git a/object/store/packed/pack.go b/object/store/packed/pack.go
index dd43bc7a..9cd6162b 100644
--- a/object/store/packed/pack.go
+++ b/object/store/packed/pack.go
@@ -8,6 +8,7 @@ import (
"lindenii.org/go/furgit/internal/format/packfile"
"lindenii.org/go/furgit/internal/format/packidx"
+ "lindenii.org/go/furgit/internal/format/packidx/bloom"
"lindenii.org/go/furgit/internal/mmap"
"lindenii.org/go/furgit/object/id"
"lindenii.org/go/lgo/intconv"
@@ -36,6 +37,9 @@ type pack struct {
// and data aliases them.
dataMapping *mmap.Mmap
data []byte
+
+ bloomMapping *mmap.Mmap
+ filter *bloom.Bloom
}
// openPack opens, maps, and validates
@@ -69,15 +73,41 @@ func openPack(root *os.Root, name string, objectFormat id.ObjectFormat) (*pack,
return nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, name, err)
}
+ bloomMapping, filter := openBloom(root, name, objectFormat, idx.PackHash())
+
return &pack{
- name: name,
- idxMapping: idxMapping,
- idx: idx,
- dataMapping: dataMapping,
- data: dataMapping.Data(),
+ name: name,
+ idxMapping: idxMapping,
+ idx: idx,
+ dataMapping: dataMapping,
+ data: dataMapping.Data(),
+ bloomMapping: bloomMapping,
+ filter: filter,
}, nil
}
+func openBloom(root *os.Root, name string, objectFormat id.ObjectFormat, packHash []byte) (*mmap.Mmap, *bloom.Bloom) {
+ mapping, err := mapFile(root, name+".bloom")
+ if err != nil {
+ return nil, nil
+ }
+
+ filter, err := bloom.Parse(mapping.Data(), objectFormat)
+ if err != nil {
+ _ = mapping.Close()
+
+ return nil, nil
+ }
+
+ if !bytes.Equal(filter.PackHash(), packHash) {
+ _ = mapping.Close()
+
+ return nil, nil
+ }
+
+ return mapping, &filter
+}
+
// mapFile opens and maps one file under root.
func mapFile(root *os.Root, name string) (*mmap.Mmap, error) {
file, err := root.Open(name)
@@ -125,10 +155,16 @@ func validatePackData(data []byte, idx *packidx.Packidx, hashSize int) error {
return nil
}
-// close releases the pack data and index mappings.
+// close releases the pack data, index, and filter mappings.
func (pack *pack) close() error {
- return errors.Join(
+ errs := []error{
pack.dataMapping.Close(),
pack.idxMapping.Close(),
- )
+ }
+
+ if pack.bloomMapping != nil {
+ errs = append(errs, pack.bloomMapping.Close())
+ }
+
+ return errors.Join(errs...)
}
diff --git a/object/store/packed/quarantine.go b/object/store/packed/quarantine.go
index 5e0b85cb..6f6a8c18 100644
--- a/object/store/packed/quarantine.go
+++ b/object/store/packed/quarantine.go
@@ -95,29 +95,46 @@ func (quarantine *packQuarantine) promoteAll() error {
return packPromotionPriority(left.Name()) - packPromotionPriority(right.Name())
})
+ var created []string
+
for _, entry := range entries {
- err := quarantine.promoteFile(entry.Name())
+ linked, err := quarantine.promoteFile(entry.Name())
if err != nil {
+ for i := len(created) - 1; i >= 0; i-- {
+ _ = quarantine.parent.root.Remove(created[i])
+ }
+
return err
}
+
+ if linked {
+ created = append(created, entry.Name())
+ }
}
return nil
}
-// promoteFile links one quarantined artifact into the parent store,
-// treating an already-present destination as success.
-func (quarantine *packQuarantine) promoteFile(name string) error {
+// promoteFile links one quarantined artifact into the parent store
+// and reports whether the destination was newly created.
+// A pre-existing destination is treated as success; rollback must not remove it.
+func (quarantine *packQuarantine) promoteFile(name string) (bool, error) {
src := quarantine.tempName + "/" + name
err := quarantine.parent.root.Link(src, name)
- if err != nil && !errors.Is(err, fs.ErrExist) {
- return fmt.Errorf("object/store/packed: promoting %q: %w", name, err)
- }
- _ = quarantine.parent.root.Remove(src)
+ switch {
+ case err == nil:
+ _ = quarantine.parent.root.Remove(src)
- return nil
+ return true, nil
+ case errors.Is(err, fs.ErrExist):
+ _ = quarantine.parent.root.Remove(src)
+
+ return false, nil
+ default:
+ return false, fmt.Errorf("object/store/packed: promoting %q: %w", name, err)
+ }
}
// createPackQuarantineRoot creates a private quarantine directory beneath parent
@@ -156,6 +173,8 @@ func packPromotionPriority(name string) int {
return 1
case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".rev"):
return 2
+ case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".bloom"):
+ return 2
case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".idx"):
return 3
default:
diff --git a/object/store/packed/writer.go b/object/store/packed/writer.go
index 59309c24..6476cc42 100644
--- a/object/store/packed/writer.go
+++ b/object/store/packed/writer.go
@@ -1,6 +1,7 @@
package packed
import (
+ "context"
"fmt"
"io"
@@ -23,8 +24,8 @@ var _ store.PackWriter = (*Packed)(nil)
// The pack must be the last thing the peer sends before that response:
// any bytes arriving immediately after the trailer
// are rejected as a malformed pack.
-func (packed *Packed) WritePack(src io.Reader, opts store.PackWriteOptions) error {
- _, err := ingest.WritePack(packed.root, packed.objectFormat, src, opts)
+func (packed *Packed) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error {
+ _, err := ingest.WritePack(ctx, packed.root, packed.objectFormat, src, opts)
if err != nil {
return err //nolint:wrapcheck
}
diff --git a/object/store/packed/writer_test.go b/object/store/packed/writer_test.go
index 8227caa7..d668647b 100644
--- a/object/store/packed/writer_test.go
+++ b/object/store/packed/writer_test.go
@@ -42,7 +42,7 @@ func TestWritePack(t *testing.T) {
packedStore := openEmptyStore(t, objectFormat)
- err = packedStore.WritePack(bytes.NewReader(stream), store.PackWriteOptions{
+ err = packedStore.WritePack(t.Context(), bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
diff --git a/object/store/writer.go b/object/store/writer.go
index d83eec6a..0437505d 100644
--- a/object/store/writer.go
+++ b/object/store/writer.go
@@ -1,6 +1,7 @@
package store
import (
+ "context"
"errors"
"io"
@@ -12,6 +13,10 @@ import (
// ErrInvalidObject indicates a malformed object passed to a write.
var ErrInvalidObject = errors.New("object/store: invalid object")
+// ErrObjectTooLarge indicates that an object exceeds
+// the size limit configured for the write.
+var ErrObjectTooLarge = errors.New("object/store: object too large")
+
// ObjectWriter writes individual Git objects.
type ObjectWriter interface {
// WriteBytesFull writes one full serialized object byte slice as "type size\x00content".
@@ -32,7 +37,7 @@ type PackWriter interface {
// WritePack ingests one pack stream,
// such that the objects contained therein
// become available in the relevant store.
- WritePack(src io.Reader, opts PackWriteOptions) error
+ WritePack(ctx context.Context, src io.Reader, opts PackWriteOptions) error
}
// PackWriteOptions controls one pack write operation.
@@ -65,4 +70,11 @@ type PackWriteOptions struct {
//
// When nil, no progress output is emitted.
Progress iowrap.WriteFlusher
+
+ // MaxObjectSize rejects ingestion of any object
+ // whose declared inflated size or delta result size exceeds it,
+ // bounding the memory spent reconstructing a single object.
+ //
+ // Zero or negative means no limit.
+ MaxObjectSize int
}