aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-06-24 04:09:31 +0000
committerGravatar Runxi Yu2026-06-24 04:29:16 +0000
commit44047867cee760410fe85abf9956750dfc4faad5 (patch)
tree7b295583f4bc3b0c38a2a9fcce9dca9e865e8069
parentobject/store{,/packed}: Ingest allocation guard (diff)
object/store/packed: fix promotion
-rw-r--r--object/store/packed/internal/ingest/finalize.go64
-rw-r--r--object/store/packed/quarantine.go35
2 files changed, 63 insertions, 36 deletions
diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go
index fe0d770c..c6b1e2c9 100644
--- a/object/store/packed/internal/ingest/finalize.go
+++ b/object/store/packed/internal/ingest/finalize.go
@@ -58,37 +58,41 @@ func (ingestion *ingestion) finalize() (Result, error) {
return Result{}, err
}
+ objectCount, err := intconv.IntToUint32(len(ingestion.records))
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
base := "pack-" + ingestion.packHash.String()
packFinal := base + ".pack"
idxFinal := base + ".idx"
revFinal := base + ".rev"
bloomFinal := base + ".bloom"
- // Link the pack, reverse index, and Bloom filter before the index,
+ // Link the data files before the index,
// since the index is what publishes the pack to readers.
- err = ingestion.link(ingestion.packTmp, packFinal)
- if err != nil {
- return Result{}, err
+ artifacts := [...]struct{ tmp, final string }{
+ {ingestion.packTmp, packFinal},
+ {revTmp, revFinal},
+ {bloomTmp, bloomFinal},
+ {idxTmp, idxFinal},
}
- err = ingestion.link(revTmp, revFinal)
- if err != nil {
- return Result{}, err
- }
+ var created []string
- err = ingestion.link(bloomTmp, bloomFinal)
- if err != nil {
- return Result{}, err
- }
+ for _, artifact := range artifacts {
+ linked, err := ingestion.promote(artifact.tmp, artifact.final)
+ if err != nil {
+ for i := len(created) - 1; i >= 0; i-- {
+ _ = ingestion.root.Remove(created[i])
+ }
- err = ingestion.link(idxTmp, idxFinal)
- if err != nil {
- return Result{}, err
- }
+ return Result{}, err
+ }
- objectCount, err := intconv.IntToUint32(len(ingestion.records))
- if err != nil {
- return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ if linked {
+ created = append(created, artifact.final)
+ }
}
return Result{
@@ -189,15 +193,21 @@ func (ingestion *ingestion) writeTemp(prefix string, write func(io.Writer) error
return name, nil
}
-// link hard-links tmp to final,
-// treating an already-present destination as success.
-func (ingestion *ingestion) link(tmp, final string) error {
+// promote hard-links tmp to final and reports whether final was newly created.
+// A pre-existing final is treated as success; rollback must not remove it.
+func (ingestion *ingestion) promote(tmp, final string) (bool, error) {
err := ingestion.root.Link(tmp, final)
- if err != nil && !errors.Is(err, fs.ErrExist) {
- return fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err)
- }
- _ = ingestion.root.Remove(tmp)
+ switch {
+ case err == nil:
+ _ = ingestion.root.Remove(tmp)
+
+ return true, nil
+ case errors.Is(err, fs.ErrExist):
+ _ = ingestion.root.Remove(tmp)
- return nil
+ return false, nil
+ default:
+ return false, fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err)
+ }
}
diff --git a/object/store/packed/quarantine.go b/object/store/packed/quarantine.go
index 977a9543..6f6a8c18 100644
--- a/object/store/packed/quarantine.go
+++ b/object/store/packed/quarantine.go
@@ -95,29 +95,46 @@ func (quarantine *packQuarantine) promoteAll() error {
return packPromotionPriority(left.Name()) - packPromotionPriority(right.Name())
})
+ var created []string
+
for _, entry := range entries {
- err := quarantine.promoteFile(entry.Name())
+ linked, err := quarantine.promoteFile(entry.Name())
if err != nil {
+ for i := len(created) - 1; i >= 0; i-- {
+ _ = quarantine.parent.root.Remove(created[i])
+ }
+
return err
}
+
+ if linked {
+ created = append(created, entry.Name())
+ }
}
return nil
}
-// promoteFile links one quarantined artifact into the parent store,
-// treating an already-present destination as success.
-func (quarantine *packQuarantine) promoteFile(name string) error {
+// promoteFile links one quarantined artifact into the parent store
+// and reports whether the destination was newly created.
+// A pre-existing destination is treated as success; rollback must not remove it.
+func (quarantine *packQuarantine) promoteFile(name string) (bool, error) {
src := quarantine.tempName + "/" + name
err := quarantine.parent.root.Link(src, name)
- if err != nil && !errors.Is(err, fs.ErrExist) {
- return fmt.Errorf("object/store/packed: promoting %q: %w", name, err)
- }
- _ = quarantine.parent.root.Remove(src)
+ switch {
+ case err == nil:
+ _ = quarantine.parent.root.Remove(src)
- return nil
+ return true, nil
+ case errors.Is(err, fs.ErrExist):
+ _ = quarantine.parent.root.Remove(src)
+
+ return false, nil
+ default:
+ return false, fmt.Errorf("object/store/packed: promoting %q: %w", name, err)
+ }
}
// createPackQuarantineRoot creates a private quarantine directory beneath parent