aboutsummaryrefslogtreecommitdiff
path: root/object
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-06-24 05:21:43 +0000
committerGravatar Runxi Yu2026-06-24 05:24:18 +0000
commit937ccce18d9909967d1dea501bd01451cef4eb7a (patch)
tree292ce7149689f9cd8c37c236a39e34a6aecccd83 /object
parentobject/store/packed/internal/ingest: Use new progrss meter (diff)
object/store/packed/internal/ingest: Concurrent resolver
Diffstat (limited to 'object')
-rw-r--r--object/store/packed/internal/ingest/basecache.go4
-rw-r--r--object/store/packed/internal/ingest/ingest.go51
-rw-r--r--object/store/packed/internal/ingest/resolve.go181
-rw-r--r--object/store/packed/internal/ingest/scan.go2
-rw-r--r--object/store/packed/internal/ingest/thin.go2
5 files changed, 176 insertions, 64 deletions
diff --git a/object/store/packed/internal/ingest/basecache.go b/object/store/packed/internal/ingest/basecache.go
index a916ed92..208f3f4f 100644
--- a/object/store/packed/internal/ingest/basecache.go
+++ b/object/store/packed/internal/ingest/basecache.go
@@ -16,8 +16,8 @@ type cachedContent struct {
content []byte
}
-func newBaseCache() *clock.Clock[baseCacheKey, cachedContent] {
- return clock.New(baseCacheMaxWeight, baseContentWeight)
+func newBaseCache(workers int) *clock.Clock[baseCacheKey, cachedContent] {
+ return clock.New(baseCacheMaxWeight*uint64(workers), baseContentWeight)
}
func baseContentWeight(_ baseCacheKey, base cachedContent) uint64 {
diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go
index 95b87455..1c47399a 100644
--- a/object/store/packed/internal/ingest/ingest.go
+++ b/object/store/packed/internal/ingest/ingest.go
@@ -9,11 +9,14 @@ import (
"io"
"io/fs"
"os"
+ "runtime"
+ "sync/atomic"
"lindenii.org/go/furgit/internal/cache/clock"
"lindenii.org/go/furgit/internal/format/packfile"
"lindenii.org/go/furgit/object/id"
"lindenii.org/go/furgit/object/store"
+ "lindenii.org/go/lgo/sync"
)
var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names")
@@ -51,18 +54,21 @@ type ingestion struct {
// byOffset maps an entry offset to its record index,
// and byOID maps a resolved object ID to its record index.
byOffset map[int]int
- byOID map[id.ObjectID]int
+ byOID sync.Map[id.ObjectID, int]
baseCache *clock.Clock[baseCacheKey, cachedContent]
+ // workers is the delta-resolution concurrency.
+ workers int
+
// headerCount is the object count declared by the pack header.
headerCount int
// deltaCount counts delta records, accumulated during scanning.
deltaCount int
- // deltasResolved counts resolved delta records, for progress.
- deltasResolved int
+ // deltasResolved counts resolved delta records.
+ deltasResolved atomic.Int64
// packHash is the final pack trailer hash.
packHash id.ObjectID
@@ -97,26 +103,27 @@ func WritePack(ctx context.Context, root *os.Root, objectFormat id.ObjectFormat,
return Result{}, err
}
+ workers := runtime.GOMAXPROCS(0)
+
ingestion := &ingestion{
- ctx: ctx,
- root: root,
- objectFormat: objectFormat,
- opts: opts,
- src: src,
- packFile: nil,
- packTmp: "",
- temps: nil,
- scanner: nil,
- records: nil,
- byOffset: make(map[int]int),
- byOID: make(map[id.ObjectID]int),
- baseCache: newBaseCache(),
- headerCount: count,
- deltaCount: 0,
- deltasResolved: 0,
- packHash: id.ObjectID{},
- thinFixed: false,
- committed: false,
+ ctx: ctx,
+ root: root,
+ objectFormat: objectFormat,
+ opts: opts,
+ src: src,
+ packFile: nil,
+ packTmp: "",
+ temps: nil,
+ scanner: nil,
+ records: nil,
+ byOffset: make(map[int]int),
+ baseCache: newBaseCache(workers),
+ workers: workers,
+ headerCount: count,
+ deltaCount: 0,
+ packHash: id.ObjectID{},
+ thinFixed: false,
+ committed: false,
}
defer ingestion.cleanup()
diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go
index 0f745c4f..0d0207b6 100644
--- a/object/store/packed/internal/ingest/resolve.go
+++ b/object/store/packed/internal/ingest/resolve.go
@@ -3,6 +3,7 @@ package ingest
import (
"fmt"
"io"
+ "sync"
"lindenii.org/go/furgit/internal/compress/zlib"
"lindenii.org/go/furgit/internal/format/packfile"
@@ -85,62 +86,166 @@ func (ingestion *ingestion) buildAdjacency() adjacency {
return out
}
-// resolveFrame is a resolved record whose delta children remain to be resolved.
-type resolveFrame struct {
+// item is a delta record awaiting resolution, with its delta-chain depth.
+type item struct {
index int
depth int
}
+// resolver resolves deltas concurrently over a shared LIFO work stack.
+//
+// Each item is one delta child;
+// a worker materializes its base from the cache, re-deriving on a miss,
+// resolves the child,
+// and pushes the child's own delta children.
+// Workers park while the stack is empty but others are still working,
+// and exit once it is empty and none are.
+type resolver struct {
+ ingestion *ingestion
+ adjacency adjacency
+ meter *progress.Meter
+
+ mu sync.Mutex
+ cond *sync.Cond
+ stack []item
+ active int
+ firstErr error
+}
+
func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error {
- stack := make([]resolveFrame, 0, len(roots))
+ var seed []item
+
for _, root := range roots {
- stack = append(stack, resolveFrame{index: root, depth: 0})
+ rec := &ingestion.records[root]
+ for _, group := range [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]} {
+ for _, child := range group {
+ seed = append(seed, item{index: child, depth: 1})
+ }
+ }
}
- for len(stack) > 0 {
- err := ingestion.ctx.Err()
- if err != nil {
- return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ if len(seed) == 0 {
+ return nil
+ }
+
+ res := &resolver{
+ ingestion: ingestion,
+ adjacency: adjacency,
+ meter: meter,
+ stack: seed,
+ }
+ res.cond = sync.NewCond(&res.mu)
+
+ return res.run(ingestion.workers)
+}
+
+func (res *resolver) run(workers int) error {
+ if workers <= 1 {
+ res.worker()
+
+ return res.firstErr
+ }
+
+ var wg sync.WaitGroup
+
+ for range workers {
+ wg.Add(1)
+
+ go func() {
+ defer wg.Done()
+ res.worker()
+ }()
+ }
+
+ wg.Wait()
+
+ return res.firstErr
+}
+
+func (res *resolver) worker() {
+ for {
+ res.mu.Lock()
+
+ for len(res.stack) == 0 && res.active > 0 && res.firstErr == nil {
+ res.cond.Wait()
+ }
+
+ if res.firstErr != nil || len(res.stack) == 0 {
+ res.mu.Unlock()
+
+ return
}
- frame := stack[len(stack)-1]
- stack = stack[:len(stack)-1]
+ it := res.stack[len(res.stack)-1]
+ res.stack = res.stack[:len(res.stack)-1]
+ res.active++
+ res.mu.Unlock()
- rec := &ingestion.records[frame.index]
+ kids, err := res.process(it)
- children := [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]}
- if len(children[0]) == 0 && len(children[1]) == 0 {
- continue
+ res.mu.Lock()
+ res.active--
+
+ if err != nil && res.firstErr == nil {
+ res.firstErr = err
}
- baseType, baseContent, err := ingestion.materialize(frame.index)
- if err != nil {
- return err
+ if res.firstErr == nil {
+ res.stack = append(res.stack, kids...)
+ }
+
+ if res.firstErr != nil || len(kids) > 0 || (res.active == 0 && len(res.stack) == 0) {
+ res.cond.Broadcast()
}
- childDepth := frame.depth + 1
+ res.mu.Unlock()
+ }
+}
- for _, group := range children {
- for _, child := range group {
- if ingestion.records[child].resolved {
- continue
- }
+// process resolves one delta child and returns its own delta children.
+func (res *resolver) process(it item) ([]item, error) {
+ err := res.ingestion.ctx.Err()
+ if err != nil {
+ return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ rec := &res.ingestion.records[it.index]
- if childDepth > delta.MaxChainDepth {
- return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, ingestion.records[child].offset)
- }
+ parent, ok := res.ingestion.baseRecordIndex(rec)
+ if !ok {
+ return nil, fmt.Errorf("%w: entry at %d: base unavailable while resolving", ErrMalformedPack, rec.offset)
+ }
+
+ baseType, baseContent, err := res.ingestion.materialize(parent)
+ if err != nil {
+ return nil, err
+ }
+
+ err = res.ingestion.resolveOneChild(it.index, baseType, baseContent, res.meter)
+ if err != nil {
+ return nil, err
+ }
+
+ return res.childItems(it.index, it.depth+1)
+}
- err = ingestion.resolveOneChild(child, baseType, baseContent, meter)
- if err != nil {
- return err
- }
+// childItems returns the delta children of a just-resolved record at depth.
+func (res *resolver) childItems(index, depth int) ([]item, error) {
+ rec := &res.ingestion.records[index]
- stack = append(stack, resolveFrame{index: child, depth: childDepth})
+ var kids []item
+
+ for _, group := range [2][]int{res.adjacency.byOffset[rec.offset], res.adjacency.byOID[rec.oid]} {
+ for _, child := range group {
+ if depth > delta.MaxChainDepth {
+ return nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, res.ingestion.records[child].offset)
}
+
+ kids = append(kids, item{index: child, depth: depth})
}
}
- return nil
+ return kids, nil
}
func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseContent []byte, meter *progress.Meter) error {
@@ -158,10 +263,10 @@ func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseCo
rec.oid = oid
rec.resolved = true
- ingestion.byOID[oid] = index
+ ingestion.byOID.Store(oid, index)
ingestion.baseCache.Add(baseCacheKey{offset: rec.offset}, cachedContent{objectType: baseType, content: content})
- ingestion.deltasResolved++
+ ingestion.deltasResolved.Add(1)
meter.Add(1, 0)
return nil
@@ -276,7 +381,7 @@ func (ingestion *ingestion) baseRecordIndex(rec *record) (int, bool) {
return index, ok
case packfile.EntryTypeRefDelta:
- index, ok := ingestion.byOID[rec.baseOID]
+ index, ok := ingestion.byOID.Load(rec.baseOID)
return index, ok
case packfile.EntryTypeInvalid,
@@ -359,7 +464,7 @@ func (ingestion *ingestion) countDeltas() int {
// so the unresolved records are exactly the unresolved deltas:
// the delta records minus those already resolved.
func (ingestion *ingestion) countUnresolved() int {
- return ingestion.deltaCount - ingestion.deltasResolved
+ return ingestion.deltaCount - int(ingestion.deltasResolved.Load())
}
// unresolvedExternalBases returns the unique base object IDs
@@ -368,7 +473,7 @@ func (ingestion *ingestion) countUnresolved() int {
func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID {
seen := make(map[id.ObjectID]struct{})
- out := make([]id.ObjectID, 0, ingestion.deltaCount-ingestion.deltasResolved)
+ out := make([]id.ObjectID, 0, ingestion.deltaCount-int(ingestion.deltasResolved.Load()))
for index := range ingestion.records {
rec := &ingestion.records[index]
@@ -376,7 +481,7 @@ func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID {
continue
}
- if _, ok := ingestion.byOID[rec.baseOID]; ok {
+ if _, ok := ingestion.byOID.Load(rec.baseOID); ok {
continue
}
diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go
index dc8041c1..2cb5c135 100644
--- a/object/store/packed/internal/ingest/scan.go
+++ b/object/store/packed/internal/ingest/scan.go
@@ -366,7 +366,7 @@ func (ingestion *ingestion) scanEntry(start int) error {
ingestion.byOffset[rec.offset] = index
if rec.resolved {
- ingestion.byOID[rec.oid] = index
+ ingestion.byOID.Store(rec.oid, index)
}
return nil
diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go
index 70aa9a63..15773a56 100644
--- a/object/store/packed/internal/ingest/thin.go
+++ b/object/store/packed/internal/ingest/thin.go
@@ -150,7 +150,7 @@ func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType ty
index := len(ingestion.records)
ingestion.records = append(ingestion.records, rec)
ingestion.byOffset[start] = index
- ingestion.byOID[objectID] = index
+ ingestion.byOID.Store(objectID, index)
return index, nil
}