diff options
| author | 2026-06-24 05:21:43 +0000 | |
|---|---|---|
| committer | 2026-06-24 05:24:18 +0000 | |
| commit | 937ccce18d9909967d1dea501bd01451cef4eb7a (patch) | |
| tree | 292ce7149689f9cd8c37c236a39e34a6aecccd83 | |
| parent | object/store/packed/internal/ingest: Use new progrss meter (diff) | |
object/store/packed/internal/ingest: Concurrent resolver
| -rw-r--r-- | object/store/packed/internal/ingest/basecache.go | 4 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/ingest.go | 51 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/resolve.go | 181 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/scan.go | 2 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/thin.go | 2 |
5 files changed, 176 insertions, 64 deletions
diff --git a/object/store/packed/internal/ingest/basecache.go b/object/store/packed/internal/ingest/basecache.go index a916ed92..208f3f4f 100644 --- a/object/store/packed/internal/ingest/basecache.go +++ b/object/store/packed/internal/ingest/basecache.go @@ -16,8 +16,8 @@ type cachedContent struct { content []byte } -func newBaseCache() *clock.Clock[baseCacheKey, cachedContent] { - return clock.New(baseCacheMaxWeight, baseContentWeight) +func newBaseCache(workers int) *clock.Clock[baseCacheKey, cachedContent] { + return clock.New(baseCacheMaxWeight*uint64(workers), baseContentWeight) } func baseContentWeight(_ baseCacheKey, base cachedContent) uint64 { diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go index 95b87455..1c47399a 100644 --- a/object/store/packed/internal/ingest/ingest.go +++ b/object/store/packed/internal/ingest/ingest.go @@ -9,11 +9,14 @@ import ( "io" "io/fs" "os" + "runtime" + "sync/atomic" "lindenii.org/go/furgit/internal/cache/clock" "lindenii.org/go/furgit/internal/format/packfile" "lindenii.org/go/furgit/object/id" "lindenii.org/go/furgit/object/store" + "lindenii.org/go/lgo/sync" ) var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names") @@ -51,18 +54,21 @@ type ingestion struct { // byOffset maps an entry offset to its record index, // and byOID maps a resolved object ID to its record index. byOffset map[int]int - byOID map[id.ObjectID]int + byOID sync.Map[id.ObjectID, int] baseCache *clock.Clock[baseCacheKey, cachedContent] + // workers is the delta-resolution concurrency. + workers int + // headerCount is the object count declared by the pack header. headerCount int // deltaCount counts delta records, accumulated during scanning. deltaCount int - // deltasResolved counts resolved delta records, for progress. - deltasResolved int + // deltasResolved counts resolved delta records. + deltasResolved atomic.Int64 // packHash is the final pack trailer hash. packHash id.ObjectID @@ -97,26 +103,27 @@ func WritePack(ctx context.Context, root *os.Root, objectFormat id.ObjectFormat, return Result{}, err } + workers := runtime.GOMAXPROCS(0) + ingestion := &ingestion{ - ctx: ctx, - root: root, - objectFormat: objectFormat, - opts: opts, - src: src, - packFile: nil, - packTmp: "", - temps: nil, - scanner: nil, - records: nil, - byOffset: make(map[int]int), - byOID: make(map[id.ObjectID]int), - baseCache: newBaseCache(), - headerCount: count, - deltaCount: 0, - deltasResolved: 0, - packHash: id.ObjectID{}, - thinFixed: false, - committed: false, + ctx: ctx, + root: root, + objectFormat: objectFormat, + opts: opts, + src: src, + packFile: nil, + packTmp: "", + temps: nil, + scanner: nil, + records: nil, + byOffset: make(map[int]int), + baseCache: newBaseCache(workers), + workers: workers, + headerCount: count, + deltaCount: 0, + packHash: id.ObjectID{}, + thinFixed: false, + committed: false, } defer ingestion.cleanup() diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go index 0f745c4f..0d0207b6 100644 --- a/object/store/packed/internal/ingest/resolve.go +++ b/object/store/packed/internal/ingest/resolve.go @@ -3,6 +3,7 @@ package ingest import ( "fmt" "io" + "sync" "lindenii.org/go/furgit/internal/compress/zlib" "lindenii.org/go/furgit/internal/format/packfile" @@ -85,62 +86,166 @@ func (ingestion *ingestion) buildAdjacency() adjacency { return out } -// resolveFrame is a resolved record whose delta children remain to be resolved. -type resolveFrame struct { +// item is a delta record awaiting resolution, with its delta-chain depth. +type item struct { index int depth int } +// resolver resolves deltas concurrently over a shared LIFO work stack. +// +// Each item is one delta child; +// a worker materializes its base from the cache, re-deriving on a miss, +// resolves the child, +// and pushes the child's own delta children. +// Workers park while the stack is empty but others are still working, +// and exit once it is empty and none are. +type resolver struct { + ingestion *ingestion + adjacency adjacency + meter *progress.Meter + + mu sync.Mutex + cond *sync.Cond + stack []item + active int + firstErr error +} + func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error { - stack := make([]resolveFrame, 0, len(roots)) + var seed []item + for _, root := range roots { - stack = append(stack, resolveFrame{index: root, depth: 0}) + rec := &ingestion.records[root] + for _, group := range [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]} { + for _, child := range group { + seed = append(seed, item{index: child, depth: 1}) + } + } } - for len(stack) > 0 { - err := ingestion.ctx.Err() - if err != nil { - return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + if len(seed) == 0 { + return nil + } + + res := &resolver{ + ingestion: ingestion, + adjacency: adjacency, + meter: meter, + stack: seed, + } + res.cond = sync.NewCond(&res.mu) + + return res.run(ingestion.workers) +} + +func (res *resolver) run(workers int) error { + if workers <= 1 { + res.worker() + + return res.firstErr + } + + var wg sync.WaitGroup + + for range workers { + wg.Add(1) + + go func() { + defer wg.Done() + res.worker() + }() + } + + wg.Wait() + + return res.firstErr +} + +func (res *resolver) worker() { + for { + res.mu.Lock() + + for len(res.stack) == 0 && res.active > 0 && res.firstErr == nil { + res.cond.Wait() + } + + if res.firstErr != nil || len(res.stack) == 0 { + res.mu.Unlock() + + return } - frame := stack[len(stack)-1] - stack = stack[:len(stack)-1] + it := res.stack[len(res.stack)-1] + res.stack = res.stack[:len(res.stack)-1] + res.active++ + res.mu.Unlock() - rec := &ingestion.records[frame.index] + kids, err := res.process(it) - children := [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]} - if len(children[0]) == 0 && len(children[1]) == 0 { - continue + res.mu.Lock() + res.active-- + + if err != nil && res.firstErr == nil { + res.firstErr = err } - baseType, baseContent, err := ingestion.materialize(frame.index) - if err != nil { - return err + if res.firstErr == nil { + res.stack = append(res.stack, kids...) + } + + if res.firstErr != nil || len(kids) > 0 || (res.active == 0 && len(res.stack) == 0) { + res.cond.Broadcast() } - childDepth := frame.depth + 1 + res.mu.Unlock() + } +} - for _, group := range children { - for _, child := range group { - if ingestion.records[child].resolved { - continue - } +// process resolves one delta child and returns its own delta children. +func (res *resolver) process(it item) ([]item, error) { + err := res.ingestion.ctx.Err() + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + rec := &res.ingestion.records[it.index] - if childDepth > delta.MaxChainDepth { - return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, ingestion.records[child].offset) - } + parent, ok := res.ingestion.baseRecordIndex(rec) + if !ok { + return nil, fmt.Errorf("%w: entry at %d: base unavailable while resolving", ErrMalformedPack, rec.offset) + } + + baseType, baseContent, err := res.ingestion.materialize(parent) + if err != nil { + return nil, err + } + + err = res.ingestion.resolveOneChild(it.index, baseType, baseContent, res.meter) + if err != nil { + return nil, err + } + + return res.childItems(it.index, it.depth+1) +} - err = ingestion.resolveOneChild(child, baseType, baseContent, meter) - if err != nil { - return err - } +// childItems returns the delta children of a just-resolved record at depth. +func (res *resolver) childItems(index, depth int) ([]item, error) { + rec := &res.ingestion.records[index] - stack = append(stack, resolveFrame{index: child, depth: childDepth}) + var kids []item + + for _, group := range [2][]int{res.adjacency.byOffset[rec.offset], res.adjacency.byOID[rec.oid]} { + for _, child := range group { + if depth > delta.MaxChainDepth { + return nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, res.ingestion.records[child].offset) } + + kids = append(kids, item{index: child, depth: depth}) } } - return nil + return kids, nil } func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseContent []byte, meter *progress.Meter) error { @@ -158,10 +263,10 @@ func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseCo rec.oid = oid rec.resolved = true - ingestion.byOID[oid] = index + ingestion.byOID.Store(oid, index) ingestion.baseCache.Add(baseCacheKey{offset: rec.offset}, cachedContent{objectType: baseType, content: content}) - ingestion.deltasResolved++ + ingestion.deltasResolved.Add(1) meter.Add(1, 0) return nil @@ -276,7 +381,7 @@ func (ingestion *ingestion) baseRecordIndex(rec *record) (int, bool) { return index, ok case packfile.EntryTypeRefDelta: - index, ok := ingestion.byOID[rec.baseOID] + index, ok := ingestion.byOID.Load(rec.baseOID) return index, ok case packfile.EntryTypeInvalid, @@ -359,7 +464,7 @@ func (ingestion *ingestion) countDeltas() int { // so the unresolved records are exactly the unresolved deltas: // the delta records minus those already resolved. func (ingestion *ingestion) countUnresolved() int { - return ingestion.deltaCount - ingestion.deltasResolved + return ingestion.deltaCount - int(ingestion.deltasResolved.Load()) } // unresolvedExternalBases returns the unique base object IDs @@ -368,7 +473,7 @@ func (ingestion *ingestion) countUnresolved() int { func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID { seen := make(map[id.ObjectID]struct{}) - out := make([]id.ObjectID, 0, ingestion.deltaCount-ingestion.deltasResolved) + out := make([]id.ObjectID, 0, ingestion.deltaCount-int(ingestion.deltasResolved.Load())) for index := range ingestion.records { rec := &ingestion.records[index] @@ -376,7 +481,7 @@ func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID { continue } - if _, ok := ingestion.byOID[rec.baseOID]; ok { + if _, ok := ingestion.byOID.Load(rec.baseOID); ok { continue } diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go index dc8041c1..2cb5c135 100644 --- a/object/store/packed/internal/ingest/scan.go +++ b/object/store/packed/internal/ingest/scan.go @@ -366,7 +366,7 @@ func (ingestion *ingestion) scanEntry(start int) error { ingestion.byOffset[rec.offset] = index if rec.resolved { - ingestion.byOID[rec.oid] = index + ingestion.byOID.Store(rec.oid, index) } return nil diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go index 70aa9a63..15773a56 100644 --- a/object/store/packed/internal/ingest/thin.go +++ b/object/store/packed/internal/ingest/thin.go @@ -150,7 +150,7 @@ func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType ty index := len(ingestion.records) ingestion.records = append(ingestion.records, rec) ingestion.byOffset[start] = index - ingestion.byOID[objectID] = index + ingestion.byOID.Store(objectID, index) return index, nil } |
