From 19944cf9cb7a014115422cf6be9292aa3adfe09e Mon Sep 17 00:00:00 2001 From: Runxi Yu Date: Tue, 23 Jun 2026 19:11:32 +0000 Subject: object/store/packed/internal/ingest: Use iterative resolver --- object/store/packed/internal/ingest/basecache.go | 25 +++ object/store/packed/internal/ingest/ingest.go | 4 + object/store/packed/internal/ingest/record.go | 6 +- object/store/packed/internal/ingest/resolve.go | 215 ++++++++++++++++------- object/store/packed/internal/ingest/scan.go | 1 - object/store/packed/internal/ingest/thin.go | 3 +- 6 files changed, 182 insertions(+), 72 deletions(-) create mode 100644 object/store/packed/internal/ingest/basecache.go diff --git a/object/store/packed/internal/ingest/basecache.go b/object/store/packed/internal/ingest/basecache.go new file mode 100644 index 00000000..a916ed92 --- /dev/null +++ b/object/store/packed/internal/ingest/basecache.go @@ -0,0 +1,25 @@ +package ingest + +import ( + "lindenii.org/go/furgit/internal/cache/clock" + "lindenii.org/go/furgit/object/typ" +) + +const baseCacheMaxWeight = 96 << 20 + +type baseCacheKey struct { + offset int +} + +type cachedContent struct { + objectType typ.Type + content []byte +} + +func newBaseCache() *clock.Clock[baseCacheKey, cachedContent] { + return clock.New(baseCacheMaxWeight, baseContentWeight) +} + +func baseContentWeight(_ baseCacheKey, base cachedContent) uint64 { + return uint64(len(base.content)) + 32 +} diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go index 5422b4af..3e802324 100644 --- a/object/store/packed/internal/ingest/ingest.go +++ b/object/store/packed/internal/ingest/ingest.go @@ -9,6 +9,7 @@ import ( "io/fs" "os" + "lindenii.org/go/furgit/internal/cache/clock" "lindenii.org/go/furgit/internal/format/packfile" "lindenii.org/go/furgit/object/id" "lindenii.org/go/furgit/object/store" @@ -49,6 +50,8 @@ type ingestion struct { byOffset map[int]int byOID map[id.ObjectID]int + baseCache *clock.Clock[baseCacheKey, cachedContent] + // headerCount is the object count declared by the pack header. headerCount int @@ -103,6 +106,7 @@ func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts records: nil, byOffset: make(map[int]int), byOID: make(map[id.ObjectID]int), + baseCache: newBaseCache(), headerCount: count, deltaCount: 0, deltasResolved: 0, diff --git a/object/store/packed/internal/ingest/record.go b/object/store/packed/internal/ingest/record.go index 69101293..4031a246 100644 --- a/object/store/packed/internal/ingest/record.go +++ b/object/store/packed/internal/ingest/record.go @@ -37,15 +37,11 @@ type record struct { // baseOID is the base object ID for a ref-delta. baseOID id.ObjectID - // objectType is the resolved object type, - // meaningful once resolved is true. - objectType packfile.EntryType - // oid is the resolved object ID, // meaningful once resolved is true. oid id.ObjectID - // resolved reports whether oid and objectType are final. + // resolved reports whether oid is final. resolved bool } diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go index 77b0fa0f..bfc4289b 100644 --- a/object/store/packed/internal/ingest/resolve.go +++ b/object/store/packed/internal/ingest/resolve.go @@ -10,6 +10,7 @@ import ( "lindenii.org/go/furgit/internal/progress" "lindenii.org/go/furgit/object/header" "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/typ" ) // adjacency maps each resolvable base to its delta children: @@ -83,108 +84,199 @@ func (ingestion *ingestion) buildAdjacency() adjacency { return out } -// resolveFrom resolves the delta subtree rooted at each resolved record. +// resolveFrame is a resolved record whose delta children remain to be resolved. +type resolveFrame struct { + index int + depth int +} + func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error { + stack := make([]resolveFrame, 0, len(roots)) for _, root := range roots { - content, err := ingestion.inflateRecord(root) - if err != nil { - return err + stack = append(stack, resolveFrame{index: root, depth: 0}) + } + + for len(stack) > 0 { + frame := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + rec := &ingestion.records[frame.index] + + children := [2][]int{adjacency.byOffset[rec.offset], adjacency.byOID[rec.oid]} + if len(children[0]) == 0 && len(children[1]) == 0 { + continue } - err = ingestion.resolveSubtree(root, content, ingestion.records[root].objectType, 0, adjacency, meter) + baseType, baseContent, err := ingestion.materialize(frame.index) if err != nil { return err } + + childDepth := frame.depth + 1 + + for _, group := range children { + for _, child := range group { + if ingestion.records[child].resolved { + continue + } + + if childDepth > delta.MaxChainDepth { + return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, ingestion.records[child].offset) + } + + err = ingestion.resolveOneChild(child, baseType, baseContent, meter) + if err != nil { + return err + } + + stack = append(stack, resolveFrame{index: child, depth: childDepth}) + } + } } return nil } -// resolveSubtree resolves every delta child of one resolved record at depth, -// holding the record's content as the base for its children. -func (ingestion *ingestion) resolveSubtree( - index int, - content []byte, - objectType packfile.EntryType, - depth int, - adjacency adjacency, - meter *progress.Meter, -) error { +func (ingestion *ingestion) resolveOneChild(index int, baseType typ.Type, baseContent []byte, meter *progress.Meter) error { rec := &ingestion.records[index] - for _, child := range adjacency.byOffset[rec.offset] { - err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) - if err != nil { - return err - } + content, err := ingestion.applyDelta(index, baseContent) + if err != nil { + return err } - for _, child := range adjacency.byOID[rec.oid] { - err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) - if err != nil { - return err - } + oid, err := ingestion.hashObject(baseType, content) + if err != nil { + return err } + rec.oid = oid + rec.resolved = true + ingestion.byOID[oid] = index + ingestion.baseCache.Add(baseCacheKey{offset: rec.offset}, cachedContent{objectType: baseType, content: content}) + + ingestion.deltasResolved++ + meter.Set(ingestion.deltasResolved, 0) + return nil } -// resolveChild applies one delta record at depth against its base content, -// finalizes the record, and recurses into its own children. -func (ingestion *ingestion) resolveChild( - index int, - baseContent []byte, - baseType packfile.EntryType, - depth int, - adjacency adjacency, - meter *progress.Meter, -) error { - rec := &ingestion.records[index] - if rec.resolved { - return nil +// materialize returns the inflated content of an already-resolved record, +// from the base cache, +// or re-derived from the nearest cached or base ancestor on a miss. +func (ingestion *ingestion) materialize(index int) (typ.Type, []byte, error) { + var ( + zero typ.Type + chain []int + base []byte + baseType typ.Type + ) + + cur := index + + for { + rec := &ingestion.records[cur] + + if cached, ok := ingestion.baseCache.Get(baseCacheKey{offset: rec.offset}); ok { + base = cached.content + baseType = cached.objectType + + break + } + + if rec.packedType.IsBase() { + objectType, err := rec.packedType.ObjectType() + if err != nil { + return zero, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + content, err := ingestion.inflateRecord(cur) + if err != nil { + return zero, nil, err + } + + base = content + baseType = objectType + + break + } + + if len(chain) >= delta.MaxChainDepth { + return zero, nil, fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset) + } + + chain = append(chain, cur) + + next, ok := ingestion.baseRecordIndex(rec) + if !ok { + return zero, nil, fmt.Errorf("%w: entry at %d: base unavailable while reconstructing", ErrMalformedPack, rec.offset) + } + + cur = next } - if depth > delta.MaxChainDepth { - return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset) + for i := len(chain) - 1; i >= 0; i-- { + content, err := ingestion.applyDelta(chain[i], base) + if err != nil { + return zero, nil, err + } + + ingestion.baseCache.Add(baseCacheKey{offset: ingestion.records[chain[i]].offset}, cachedContent{objectType: baseType, content: content}) + + base = content } + return baseType, base, nil +} + +func (ingestion *ingestion) applyDelta(index int, baseContent []byte) ([]byte, error) { + rec := &ingestion.records[index] + deltaPayload, err := ingestion.inflateRecord(index) if err != nil { - return err + return nil, err } baseSize, resultSize, _, err := delta.ParseHeaderSizes(deltaPayload) if err != nil { - return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) } if baseSize != uint64(len(baseContent)) { - return fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset) + return nil, fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset) } content, err := delta.Apply(baseContent, deltaPayload) if err != nil { - return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) } if uint64(len(content)) != resultSize { - return fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset) - } - - oid, err := ingestion.hashObject(baseType, content) - if err != nil { - return err + return nil, fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset) } - rec.objectType = baseType - rec.oid = oid - rec.resolved = true - ingestion.byOID[oid] = index + return content, nil +} - ingestion.deltasResolved++ - meter.Set(ingestion.deltasResolved, 0) +func (ingestion *ingestion) baseRecordIndex(rec *record) (int, bool) { + switch rec.packedType { + case packfile.EntryTypeOfsDelta: + index, ok := ingestion.byOffset[rec.baseOffset] + + return index, ok + case packfile.EntryTypeRefDelta: + index, ok := ingestion.byOID[rec.baseOID] + + return index, ok + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } - return ingestion.resolveSubtree(index, content, baseType, depth, adjacency, meter) + return 0, false } // inflateRecord inflates one record's payload from the temporary pack file. @@ -213,20 +305,15 @@ func (ingestion *ingestion) inflateRecord(index int) ([]byte, error) { } // hashObject computes the object ID of one resolved object. -func (ingestion *ingestion) hashObject(objectType packfile.EntryType, content []byte) (id.ObjectID, error) { +func (ingestion *ingestion) hashObject(objectType typ.Type, content []byte) (id.ObjectID, error) { var zero id.ObjectID - ty, err := objectType.ObjectType() - if err != nil { - return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) - } - hashImpl, err := ingestion.objectFormat.New() if err != nil { return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) } - _, _ = hashImpl.Write(header.Append(nil, ty, len(content))) + _, _ = hashImpl.Write(header.Append(nil, objectType, len(content))) _, _ = hashImpl.Write(content) oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go index 6b3b73b7..62dd6104 100644 --- a/object/store/packed/internal/ingest/scan.go +++ b/object/store/packed/internal/ingest/scan.go @@ -345,7 +345,6 @@ func (ingestion *ingestion) scanEntry(start int) error { rec.crc32 = ingestion.scanner.endCRC() if rec.packedType.IsBase() { - rec.objectType = rec.packedType rec.oid = oid rec.resolved = true } else { diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go index fa125f2f..dceddac9 100644 --- a/object/store/packed/internal/ingest/thin.go +++ b/object/store/packed/internal/ingest/thin.go @@ -86,7 +86,7 @@ func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType ty return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) } - computed, err := ingestion.hashObject(entryType, content) + computed, err := ingestion.hashObject(objectType, content) if err != nil { return 0, err } @@ -138,7 +138,6 @@ func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType ty declaredSize: len(content), baseOffset: 0, baseOID: id.ObjectID{}, - objectType: entryType, oid: objectID, resolved: true, } -- cgit v1.3.1-10-gc9f91