From dd027e1e5379019bfeffc48ff1274b5e05581ff3 Mon Sep 17 00:00:00 2001 From: Runxi Yu Date: Sun, 8 Mar 2026 15:33:36 +0800 Subject: objectstore: Refresh * Add manual Refresh for various objectstore's * RefreshPolicy option * Refreshable MRU and atomic snapshotting --- format/pack/ingest/thin_fix.go | 5 +- objectstore/chain/refresh.go | 21 +++++ objectstore/loose/refresh.go | 6 ++ objectstore/memory/refresh.go | 6 ++ objectstore/mix/refresh.go | 30 +++++++ objectstore/objectstore.go | 4 + objectstore/packed/helpers_test.go | 2 +- objectstore/packed/idx_candidate.go | 10 --- objectstore/packed/idx_candidates_mru.go | 118 +++++++++++++++++++++------- objectstore/packed/idx_lookup_candidates.go | 74 +++++++++-------- objectstore/packed/new.go | 23 ++++-- objectstore/packed/options.go | 16 ++++ objectstore/packed/read_test.go | 2 +- objectstore/packed/store.go | 29 +++---- objectstore/packed/store_lookup.go | 73 ++++++++++++++--- objectstore/packed/trailer_match.go | 4 +- receivepack/service/ingest_quarantine.go | 12 +++ receivepack/service/quarantine_objects.go | 2 +- repository/objects.go | 6 +- 19 files changed, 327 insertions(+), 116 deletions(-) create mode 100644 objectstore/chain/refresh.go create mode 100644 objectstore/loose/refresh.go create mode 100644 objectstore/memory/refresh.go create mode 100644 objectstore/mix/refresh.go delete mode 100644 objectstore/packed/idx_candidate.go create mode 100644 objectstore/packed/options.go diff --git a/format/pack/ingest/thin_fix.go b/format/pack/ingest/thin_fix.go index 9dc65475..9013dbc8 100644 --- a/format/pack/ingest/thin_fix.go +++ b/format/pack/ingest/thin_fix.go @@ -64,6 +64,7 @@ func maybeFixThin(state *ingestState) error { Title: "fixing thin pack", Total: uint64(total), }) + meter.Set(0, 0) var appended uint64 @@ -93,9 +94,7 @@ func maybeFixThin(state *ingestState) error { return err } - if state.thinFixed { - meter.Stop("done") - } + meter.Stop(fmt.Sprintf("appended %d/%d, done", appended, total)) return nil } diff --git a/objectstore/chain/refresh.go b/objectstore/chain/refresh.go new file mode 100644 index 00000000..66c6f0a0 --- /dev/null +++ b/objectstore/chain/refresh.go @@ -0,0 +1,21 @@ +package chain + +import "errors" + +// Refresh forwards refresh calls to all backends. +func (chain *Chain) Refresh() error { + var errs []error + + for _, backend := range chain.backends { + if backend == nil { + continue + } + + err := backend.Refresh() + if err != nil { + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} diff --git a/objectstore/loose/refresh.go b/objectstore/loose/refresh.go new file mode 100644 index 00000000..b720ebc6 --- /dev/null +++ b/objectstore/loose/refresh.go @@ -0,0 +1,6 @@ +package loose + +// Refresh is a no-op for loose object stores. +func (store *Store) Refresh() error { + return nil +} diff --git a/objectstore/memory/refresh.go b/objectstore/memory/refresh.go new file mode 100644 index 00000000..1e18eef3 --- /dev/null +++ b/objectstore/memory/refresh.go @@ -0,0 +1,6 @@ +package memory + +// Refresh is a no-op for in-memory object stores. +func (store *Store) Refresh() error { + return nil +} diff --git a/objectstore/mix/refresh.go b/objectstore/mix/refresh.go new file mode 100644 index 00000000..a9418a62 --- /dev/null +++ b/objectstore/mix/refresh.go @@ -0,0 +1,30 @@ +package mix + +import ( + "errors" + + "codeberg.org/lindenii/furgit/objectstore" +) + +// Refresh forwards refresh calls to refresh-capable backends. +func (mix *Mix) Refresh() error { + mix.mu.RLock() + + backends := make([]objectstore.Store, 0, len(mix.backendNodeByStore)) + for node := mix.backendHead; node != nil; node = node.next { + backends = append(backends, node.backend) + } + + mix.mu.RUnlock() + + var errs []error + + for _, backend := range backends { + err := backend.Refresh() + if err != nil { + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} diff --git a/objectstore/objectstore.go b/objectstore/objectstore.go index a68175ac..58967147 100644 --- a/objectstore/objectstore.go +++ b/objectstore/objectstore.go @@ -38,6 +38,10 @@ type Store interface { ReadSize(id objectid.ObjectID) (int64, error) // ReadHeader reads an object's type and declared content length. ReadHeader(id objectid.ObjectID) (objecttype.Type, int64, error) + // Refresh updates any backend-local discovery/cache view of on-disk objects. + // + // Backends without dynamic discovery should return nil. + Refresh() error // Close releases resources associated with the backend. Close() error } diff --git a/objectstore/packed/helpers_test.go b/objectstore/packed/helpers_test.go index 581c0dd7..f07ff4e3 100644 --- a/objectstore/packed/helpers_test.go +++ b/objectstore/packed/helpers_test.go @@ -19,7 +19,7 @@ func openPackedStore(t *testing.T, testRepo *testgit.TestRepo, algo objectid.Alg root := testRepo.OpenPackRoot(t) - store, err := packed.New(root, algo) + store, err := packed.New(root, algo, packed.Options{}) if err != nil { t.Fatalf("packed.New: %v", err) } diff --git a/objectstore/packed/idx_candidate.go b/objectstore/packed/idx_candidate.go deleted file mode 100644 index 2f2ad7a9..00000000 --- a/objectstore/packed/idx_candidate.go +++ /dev/null @@ -1,10 +0,0 @@ -package packed - -// candidateForPack returns one discovered candidate for a pack basename. -func (store *Store) candidateForPack(packName string) (packCandidate, bool) { - store.candidatesMu.RLock() - candidate, ok := store.candidateByPack[packName] - store.candidatesMu.RUnlock() - - return candidate, ok -} diff --git a/objectstore/packed/idx_candidates_mru.go b/objectstore/packed/idx_candidates_mru.go index 593e84f4..b0960df5 100644 --- a/objectstore/packed/idx_candidates_mru.go +++ b/objectstore/packed/idx_candidates_mru.go @@ -2,21 +2,76 @@ package packed // packCandidateNode is one node in the candidate MRU order list. type packCandidateNode struct { - candidate packCandidate - prev *packCandidateNode - next *packCandidateNode + packName string + prev *packCandidateNode + next *packCandidateNode +} + +func (store *Store) reconcileMRU(candidates []packCandidate) { + store.mruMu.Lock() + defer store.mruMu.Unlock() + + if store.mruNodeByPack == nil { + store.mruNodeByPack = make(map[string]*packCandidateNode, len(candidates)) + } + + present := make(map[string]struct{}, len(candidates)) + for _, candidate := range candidates { + present[candidate.packName] = struct{}{} + } + + ordered := make([]string, 0, len(candidates)) + + for node := store.mruHead; node != nil; node = node.next { + if _, ok := present[node.packName]; !ok { + continue + } + + ordered = append(ordered, node.packName) + delete(present, node.packName) + } + + for _, candidate := range candidates { + if _, ok := present[candidate.packName]; !ok { + continue + } + + ordered = append(ordered, candidate.packName) + delete(present, candidate.packName) + } + + store.mruHead = nil + store.mruTail = nil + store.mruNodeByPack = make(map[string]*packCandidateNode, len(ordered)) + + for _, packName := range ordered { + node := &packCandidateNode{ + packName: packName, + prev: store.mruTail, + } + if store.mruTail != nil { + store.mruTail.next = node + } + + if store.mruHead == nil { + store.mruHead = node + } + + store.mruTail = node + store.mruNodeByPack[packName] = node + } } // touchCandidate moves one candidate to the front of the lookup order. // This is done on a best-effort basis. func (store *Store) touchCandidate(packName string) { - if !store.candidatesMu.TryLock() { + if !store.mruMu.TryLock() { return } - defer store.candidatesMu.Unlock() + defer store.mruMu.Unlock() - node := store.candidateNodeByPack[packName] - if node == nil || node == store.candidateHead { + node := store.mruNodeByPack[packName] + if node == nil || node == store.mruHead { return } @@ -28,46 +83,53 @@ func (store *Store) touchCandidate(packName string) { node.next.prev = node.prev } - if store.candidateTail == node { - store.candidateTail = node.prev + if store.mruTail == node { + store.mruTail = node.prev } node.prev = nil - - node.next = store.candidateHead - if store.candidateHead != nil { - store.candidateHead.prev = node + node.next = store.mruHead + if store.mruHead != nil { + store.mruHead.prev = node } - store.candidateHead = node - if store.candidateTail == nil { - store.candidateTail = node + store.mruHead = node + if store.mruTail == nil { + store.mruTail = node } } // firstCandidatePackName returns the current head pack name, or "" when none // are available. -func (store *Store) firstCandidatePackName() string { - store.candidatesMu.RLock() - defer store.candidatesMu.RUnlock() +func (store *Store) firstCandidatePackName(snapshot *candidateSnapshot) string { + store.mruMu.RLock() + defer store.mruMu.RUnlock() - if store.candidateHead == nil { - return "" + for node := store.mruHead; node != nil; node = node.next { + if _, ok := snapshot.candidateByPack[node.packName]; ok { + return node.packName + } } - return store.candidateHead.candidate.packName + return "" } // nextCandidatePackName returns the pack name after currentPack in current MRU // order, or "" at end / when currentPack is not present. -func (store *Store) nextCandidatePackName(currentPack string) string { - store.candidatesMu.RLock() - defer store.candidatesMu.RUnlock() +func (store *Store) nextCandidatePackName(currentPack string, snapshot *candidateSnapshot) string { + store.mruMu.RLock() + defer store.mruMu.RUnlock() - node := store.candidateNodeByPack[currentPack] - if node == nil || node.next == nil { + node := store.mruNodeByPack[currentPack] + if node == nil { return "" } - return node.next.candidate.packName + for node = node.next; node != nil; node = node.next { + if _, ok := snapshot.candidateByPack[node.packName]; ok { + return node.packName + } + } + + return "" } diff --git a/objectstore/packed/idx_lookup_candidates.go b/objectstore/packed/idx_lookup_candidates.go index 9534476a..e69d07d7 100644 --- a/objectstore/packed/idx_lookup_candidates.go +++ b/objectstore/packed/idx_lookup_candidates.go @@ -17,50 +17,48 @@ type packCandidate struct { mtime int64 } -// ensureCandidates discovers pack/index pairs once. -func (store *Store) ensureCandidates() error { - store.discoverOnce.Do(func() { - candidates, err := store.discoverCandidates() - candidateByPack := make(map[string]packCandidate, len(candidates)) - nodeByPack := make(map[string]*packCandidateNode, len(candidates)) - - var ( - head *packCandidateNode - tail *packCandidateNode - ) - - for _, candidate := range candidates { - node := &packCandidateNode{ - candidate: candidate, - prev: tail, - } - if tail != nil { - tail.next = node - } +type candidateSnapshot struct { + candidates []packCandidate + candidateByPack map[string]packCandidate +} - if head == nil { - head = node - } +// Refresh rescans objects/pack and atomically installs a fresh candidate list. +func (store *Store) Refresh() error { + store.refreshMu.Lock() + defer store.refreshMu.Unlock() - tail = node - candidateByPack[candidate.packName] = candidate - nodeByPack[candidate.packName] = node - } + candidates, err := store.discoverCandidates() + if err != nil { + return err + } + + candidateByPack := make(map[string]packCandidate, len(candidates)) + for _, candidate := range candidates { + candidateByPack[candidate.packName] = candidate + } - store.candidatesMu.Lock() - store.candidateHead = head - store.candidateTail = tail - store.candidateByPack = candidateByPack - store.candidateNodeByPack = nodeByPack - store.discoverErr = err - store.candidatesMu.Unlock() + store.reconcileMRU(candidates) + + store.candidates.Store(&candidateSnapshot{ + candidates: candidates, + candidateByPack: candidateByPack, }) - store.candidatesMu.RLock() - err := store.discoverErr - store.candidatesMu.RUnlock() + return nil +} + +func (store *Store) ensureCandidates() (*candidateSnapshot, error) { + snapshot := store.candidates.Load() + if snapshot != nil { + return snapshot, nil + } + + err := store.Refresh() + if err != nil { + return nil, err + } - return err + return store.candidates.Load(), nil } // discoverCandidates scans the objects/pack root and returns sorted pack/index diff --git a/objectstore/packed/new.go b/objectstore/packed/new.go index 407bc1d0..c8e7338e 100644 --- a/objectstore/packed/new.go +++ b/objectstore/packed/new.go @@ -1,24 +1,31 @@ package packed import ( + "fmt" "os" "codeberg.org/lindenii/furgit/objectid" ) // New creates a packed-object store rooted at an objects/pack directory. -func New(root *os.Root, algo objectid.Algorithm) (*Store, error) { +func New(root *os.Root, algo objectid.Algorithm, opts Options) (*Store, error) { if algo.Size() == 0 { return nil, objectid.ErrInvalidAlgorithm } + switch opts.RefreshPolicy { + case RefreshPolicyOnMissing, RefreshPolicyNever: + default: + return nil, fmt.Errorf("objectstore/packed: invalid refresh policy %d", opts.RefreshPolicy) + } + return &Store{ - root: root, - algo: algo, - candidateByPack: make(map[string]packCandidate), - candidateNodeByPack: make(map[string]*packCandidateNode), - idxByPack: make(map[string]*idxFile), - packs: make(map[string]*packFile), - deltaCache: newDeltaCache(defaultDeltaCacheMaxBytes), + root: root, + algo: algo, + refreshPolicy: opts.RefreshPolicy, + mruNodeByPack: make(map[string]*packCandidateNode), + idxByPack: make(map[string]*idxFile), + packs: make(map[string]*packFile), + deltaCache: newDeltaCache(defaultDeltaCacheMaxBytes), }, nil } diff --git a/objectstore/packed/options.go b/objectstore/packed/options.go new file mode 100644 index 00000000..05cbee30 --- /dev/null +++ b/objectstore/packed/options.go @@ -0,0 +1,16 @@ +package packed + +// RefreshPolicy configures when candidate pack/index discovery refreshes. +type RefreshPolicy uint8 + +const ( + // RefreshPolicyOnMissing refreshes candidates once after a lookup miss. + RefreshPolicyOnMissing RefreshPolicy = iota + // RefreshPolicyNever disables automatic refresh after lookup misses. + RefreshPolicyNever +) + +// Options configures a packed object store. +type Options struct { + RefreshPolicy RefreshPolicy +} diff --git a/objectstore/packed/read_test.go b/objectstore/packed/read_test.go index 435bc350..0e5da5d8 100644 --- a/objectstore/packed/read_test.go +++ b/objectstore/packed/read_test.go @@ -191,7 +191,7 @@ func TestPackedStoreInvalidAlgorithm(t *testing.T) { root := testRepo.OpenPackRoot(t) - _, err := packed.New(root, objectid.AlgorithmUnknown) + _, err := packed.New(root, objectid.AlgorithmUnknown, packed.Options{}) if !errors.Is(err, objectid.ErrInvalidAlgorithm) { t.Fatalf("packed.New invalid algorithm error = %v", err) } diff --git a/objectstore/packed/store.go b/objectstore/packed/store.go index 000e04f2..1c6082f6 100644 --- a/objectstore/packed/store.go +++ b/objectstore/packed/store.go @@ -4,6 +4,7 @@ package packed import ( "os" "sync" + "sync/atomic" "codeberg.org/lindenii/furgit/objectid" "codeberg.org/lindenii/furgit/objectstore" @@ -17,26 +18,26 @@ type Store struct { root *os.Root // algo is the expected object ID algorithm for lookups. algo objectid.Algorithm + // refreshPolicy controls automatic candidate refresh on lookup misses. + refreshPolicy RefreshPolicy - // discoverOnce guards one-time pack candidate discovery. - discoverOnce sync.Once - // discoverErr stores candidate discovery failures. - discoverErr error - // candidateHead is the first candidate in lookup priority order. - candidateHead *packCandidateNode - // candidateTail is the last candidate in lookup priority order. - candidateTail *packCandidateNode - // candidateByPack maps pack basename to discovered candidate. - candidateByPack map[string]packCandidate - // candidateNodeByPack maps pack basename to linked-list node. - candidateNodeByPack map[string]*packCandidateNode + // candidates stores the latest immutable candidate snapshot. + candidates atomic.Pointer[candidateSnapshot] + // refreshMu serializes candidate refresh. + refreshMu sync.Mutex + // mruMu guards candidate MRU linked-list state. + mruMu sync.RWMutex + // mruHead is the first pack in MRU order. + mruHead *packCandidateNode + // mruTail is the last pack in MRU order. + mruTail *packCandidateNode + // mruNodeByPack maps pack basename to MRU node. + mruNodeByPack map[string]*packCandidateNode // idxByPack caches opened and parsed indexes by pack basename. idxByPack map[string]*idxFile // stateMu guards pack cache and close state. stateMu sync.RWMutex - // candidatesMu guards discovered candidates and MRU order. - candidatesMu sync.RWMutex // idxMu guards parsed index cache. idxMu sync.RWMutex // cacheMu guards delta cache operations. diff --git a/objectstore/packed/store_lookup.go b/objectstore/packed/store_lookup.go index cf5a580d..a1bd9b85 100644 --- a/objectstore/packed/store_lookup.go +++ b/objectstore/packed/store_lookup.go @@ -14,38 +14,93 @@ func (store *Store) lookup(id objectid.ObjectID) (location, error) { return zero, errors.New("objectstore/packed: object id algorithm mismatch") } - err := store.ensureCandidates() + snapshot, err := store.ensureCandidates() if err != nil { return zero, err } - nextPackName := store.firstCandidatePackName() + loc, ok, err := store.lookupInCandidates(id, snapshot) + if err != nil { + return zero, err + } + + if ok { + return loc, nil + } + + if store.refreshPolicy == RefreshPolicyOnMissing { + err = store.Refresh() + if err != nil { + return zero, err + } + + refreshed := store.candidates.Load() + if refreshed != nil && refreshed != snapshot { + loc, ok, err = store.lookupInCandidates(id, refreshed) + if err != nil { + return zero, err + } + + if ok { + return loc, nil + } + } + } + + return zero, objectstore.ErrObjectNotFound +} + +func (store *Store) lookupInCandidates( + id objectid.ObjectID, + snapshot *candidateSnapshot, +) (location, bool, error) { + var zero location + + nextPackName := store.firstCandidatePackName(snapshot) for nextPackName != "" { - candidate, ok := store.candidateForPack(nextPackName) + candidate, ok := snapshot.candidateByPack[nextPackName] if !ok { - nextPackName = store.firstCandidatePackName() + nextPackName = store.firstCandidatePackName(snapshot) continue } - nextPackName = store.nextCandidatePackName(candidate.packName) + nextPackName = store.nextCandidatePackName(candidate.packName, snapshot) index, err := store.openIndex(candidate) if err != nil { - return zero, err + return zero, false, err } offset, ok, err := index.lookup(id) if err != nil { - return zero, err + return zero, false, err } if ok { store.touchCandidate(candidate.packName) - return location{packName: index.packName, offset: offset}, nil + return location{packName: index.packName, offset: offset}, true, nil } } - return zero, objectstore.ErrObjectNotFound + for _, candidate := range snapshot.candidates { + index, err := store.openIndex(candidate) + if err != nil { + return zero, false, err + } + + offset, ok, err := index.lookup(id) + if err != nil { + return zero, false, err + } + + if ok { + store.touchCandidate(candidate.packName) + + return location{packName: index.packName, offset: offset}, true, nil + } + } + + return zero, false, nil } diff --git a/objectstore/packed/trailer_match.go b/objectstore/packed/trailer_match.go index 25337cd7..dc43e37d 100644 --- a/objectstore/packed/trailer_match.go +++ b/objectstore/packed/trailer_match.go @@ -5,12 +5,12 @@ import "fmt" // verifyPackMatchesIndexes checks that one opened pack's trailer hash matches // every loaded index that references the same pack name. func (store *Store) verifyPackMatchesIndexes(pack *packFile) error { - err := store.ensureCandidates() + snapshot, err := store.ensureCandidates() if err != nil { return err } - candidate, ok := store.candidateForPack(pack.name) + candidate, ok := snapshot.candidateByPack[pack.name] if !ok { return fmt.Errorf("objectstore/packed: missing index for pack %q", pack.name) } diff --git a/receivepack/service/ingest_quarantine.go b/receivepack/service/ingest_quarantine.go index d4819447..5b2b706b 100644 --- a/receivepack/service/ingest_quarantine.go +++ b/receivepack/service/ingest_quarantine.go @@ -34,6 +34,18 @@ func (service *Service) ingestQuarantine( return "", nil, false } + var err error + + err = service.opts.ExistingObjects.Refresh() + if err != nil { + utils.BestEffortFprintf(service.opts.Progress, "unpack failed: refresh existing objects: %v.\n", err) + + result.UnpackError = err.Error() + fillCommandErrors(result, commands, err.Error()) + + return "", nil, false + } + pending, err := ingest.Ingest( req.Pack, service.opts.Algorithm, diff --git a/receivepack/service/quarantine_objects.go b/receivepack/service/quarantine_objects.go index 69e07a1d..0b267531 100644 --- a/receivepack/service/quarantine_objects.go +++ b/receivepack/service/quarantine_objects.go @@ -29,7 +29,7 @@ func (service *Service) openQuarantinedObjects(quarantineName string) (objectsto packRoot, err := looseRoot.OpenRoot("pack") if err == nil { - packedStore, packedErr := packed.New(packRoot, service.opts.Algorithm) + packedStore, packedErr := packed.New(packRoot, service.opts.Algorithm, packed.Options{}) if packedErr != nil { _ = packRoot.Close() _ = looseStore.Close() diff --git a/repository/objects.go b/repository/objects.go index 63cad4ed..8e1394b0 100644 --- a/repository/objects.go +++ b/repository/objects.go @@ -30,7 +30,11 @@ func openObjectStore(root *os.Root, algo objectid.Algorithm) (objectstore.Store, if err == nil { var packedStore *objectpacked.Store - packedStore, err = objectpacked.New(packRoot, algo) + packedStore, err = objectpacked.New( + packRoot, + algo, + objectpacked.Options{RefreshPolicy: objectpacked.RefreshPolicyNever}, + ) if err != nil { _ = looseStore.Close() -- cgit v1.3.1-10-gc9f91