diff options
| author | 2026-03-07 19:40:27 +0800 | |
|---|---|---|
| committer | 2026-03-07 20:53:17 +0800 | |
| commit | 8aa2e9f0903a80c90a9d8308138439d6f8732050 (patch) | |
| tree | d2fecbf29b0eaa78da87b017005139960783f669 /receivepack/internal | |
| parent | refstore/files: Implement batching (diff) | |
| signature | No signature | |
receivepack: Use refs
Diffstat (limited to 'receivepack/internal')
| -rw-r--r-- | receivepack/internal/service/apply.go | 107 | ||||
| -rw-r--r-- | receivepack/internal/service/command.go | 13 | ||||
| -rw-r--r-- | receivepack/internal/service/command_result.go | 10 | ||||
| -rw-r--r-- | receivepack/internal/service/execute.go | 61 | ||||
| -rw-r--r-- | receivepack/internal/service/options.go | 2 | ||||
| -rw-r--r-- | receivepack/internal/service/quarantine.go | 202 | ||||
| -rw-r--r-- | receivepack/internal/service/quarantine_test.go | 163 | ||||
| -rw-r--r-- | receivepack/internal/service/request.go | 1 |
8 files changed, 542 insertions, 17 deletions
diff --git a/receivepack/internal/service/apply.go b/receivepack/internal/service/apply.go new file mode 100644 index 00000000..c24d2a95 --- /dev/null +++ b/receivepack/internal/service/apply.go @@ -0,0 +1,107 @@ +package service + +import ( + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/refstore" +) + +func (service *Service) applyAtomic(result *Result, commands []Command) error { + tx, err := service.opts.Refs.BeginTransaction() + if err != nil { + return err + } + + for _, command := range commands { + err = queueWriteTransaction(tx, command) + if err != nil { + _ = tx.Abort() + fillCommandErrors(result, commands, err.Error()) + + return nil + } + } + + err = tx.Commit() + if err != nil { + fillCommandErrors(result, commands, err.Error()) + + return nil + } + + result.Applied = true + for _, command := range commands { + result.Commands = append(result.Commands, successCommandResult(command)) + } + + return nil +} + +func (service *Service) applyBatch(result *Result, commands []Command) error { + batch, err := service.opts.Refs.BeginBatch() + if err != nil { + return err + } + + for _, command := range commands { + queueWriteBatch(batch, command) + } + + batchResults, err := batch.Apply() + if err != nil && len(batchResults) == 0 { + return err + } + + appliedAny := false + + for i, command := range commands { + item := successCommandResult(command) + if i < len(batchResults) && batchResults[i].Error != nil { + item.Error = batchResults[i].Error.Error() + } else { + appliedAny = true + } + + result.Commands = append(result.Commands, item) + } + + result.Applied = appliedAny + + return nil +} + +func queueWriteTransaction(tx refstore.Transaction, command Command) error { + if isDelete(command) { + return tx.Delete(command.Name, command.OldID) + } + + if command.OldID == objectid.Zero(command.OldID.Algorithm()) { + return tx.Create(command.Name, command.NewID) + } + + return tx.Update(command.Name, command.NewID, command.OldID) +} + +func queueWriteBatch(batch refstore.Batch, command Command) { + if isDelete(command) { + batch.Delete(command.Name, command.OldID) + + return + } + + if command.OldID == objectid.Zero(command.OldID.Algorithm()) { + batch.Create(command.Name, command.NewID) + + return + } + + batch.Update(command.Name, command.NewID, command.OldID) +} + +func successCommandResult(command Command) CommandResult { + return CommandResult{ + Name: command.Name, + RefName: command.Name, + OldID: objectIDPointer(command.OldID), + NewID: objectIDPointer(command.NewID), + } +} diff --git a/receivepack/internal/service/command.go b/receivepack/internal/service/command.go index f51461ff..33342e41 100644 --- a/receivepack/internal/service/command.go +++ b/receivepack/internal/service/command.go @@ -12,8 +12,11 @@ type Command struct { func fillCommandErrors(result *Result, commands []Command, errText string) { for _, command := range commands { result.Commands = append(result.Commands, CommandResult{ - Name: command.Name, - Error: errText, + Name: command.Name, + Error: errText, + RefName: command.Name, + OldID: objectIDPointer(command.OldID), + NewID: objectIDPointer(command.NewID), }) } } @@ -21,3 +24,9 @@ func fillCommandErrors(result *Result, commands []Command, errText string) { func isDelete(command Command) bool { return command.NewID == objectid.Zero(command.NewID.Algorithm()) } + +func objectIDPointer(id objectid.ObjectID) *objectid.ObjectID { + out := id + + return &out +} diff --git a/receivepack/internal/service/command_result.go b/receivepack/internal/service/command_result.go index 1234c8ef..18e39acc 100644 --- a/receivepack/internal/service/command_result.go +++ b/receivepack/internal/service/command_result.go @@ -1,7 +1,13 @@ package service +import "codeberg.org/lindenii/furgit/objectid" + // CommandResult is one per-command execution result. type CommandResult struct { - Name string - Error string + Name string + Error string + RefName string + OldID *objectid.ObjectID + NewID *objectid.ObjectID + ForcedUpdate bool } diff --git a/receivepack/internal/service/execute.go b/receivepack/internal/service/execute.go index b3d47d29..ebba9003 100644 --- a/receivepack/internal/service/execute.go +++ b/receivepack/internal/service/execute.go @@ -2,7 +2,7 @@ package service import ( "context" - "log" + "os" "codeberg.org/lindenii/furgit/format/pack/ingest" ) @@ -12,14 +12,17 @@ import ( // // TODO: Invoke hook or policy callbacks to decide whether each planned update // should be allowed. -// TODO: Apply planned ref updates with one atomic compare-and-swap ref -// transaction once ref writing exists. func (service *Service) Execute(ctx context.Context, req *Request) (*Result, error) { _ = ctx result := &Result{ Commands: make([]CommandResult, 0, len(req.Commands)), } + var ( + quarantineName string + quarantineRoot *os.Root + err error + ) if req.PackExpected { if req.Pack == nil { @@ -36,7 +39,7 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err return result, nil } - quarantineName, quarantineRoot, err := service.createQuarantineRoot() + quarantineName, quarantineRoot, err = service.createQuarantineRoot() if err != nil { result.UnpackError = err.Error() fillCommandErrors(result, req.Commands, err.Error()) @@ -46,14 +49,24 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err defer func() { _ = quarantineRoot.Close() - // TODO: Promote accepted quarantined objects into the permanent object - // store once atomic ref application exists. _ = service.opts.ObjectsRoot.RemoveAll(quarantineName) }() + quarantinePackRoot, err := service.openQuarantinePackRoot(quarantineRoot) + if err != nil { + result.UnpackError = err.Error() + fillCommandErrors(result, req.Commands, err.Error()) + + return result, nil + } + + defer func() { + _ = quarantinePackRoot.Close() + }() + ingested, err := ingest.Ingest( req.Pack, - quarantineRoot, + quarantinePackRoot, service.opts.Algorithm, true, true, @@ -78,11 +91,35 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err }) } - fillCommandErrors(result, req.Commands, "ref updates not implemented yet") - log.Printf( - "receivepack: planned %d ref updates, but hook/policy checks and atomic ref writes are not implemented yet", - len(result.Planned), - ) + if len(req.Commands) == 0 { + return result, nil + } + + if req.PackExpected { + // Git migrates quarantined objects into permanent storage immediately + // before starting ref updates. + err = service.promoteQuarantine(quarantineName, quarantineRoot) + if err != nil { + result.UnpackError = err.Error() + fillCommandErrors(result, req.Commands, err.Error()) + + return result, nil + } + } + + if req.Atomic { + err := service.applyAtomic(result, req.Commands) + if err != nil { + return result, err + } + + return result, nil + } + + err = service.applyBatch(result, req.Commands) + if err != nil { + return result, err + } return result, nil } diff --git a/receivepack/internal/service/options.go b/receivepack/internal/service/options.go index 2bc70058..4ba06827 100644 --- a/receivepack/internal/service/options.go +++ b/receivepack/internal/service/options.go @@ -11,7 +11,7 @@ import ( // Options configures one protocol-independent receive-pack service. type Options struct { Algorithm objectid.Algorithm - Refs refstore.ReadingStore + Refs refstore.ReadWriteStore ExistingObjects objectstore.Store ObjectsRoot *os.Root // TODO: Hook and such callbacks. diff --git a/receivepack/internal/service/quarantine.go b/receivepack/internal/service/quarantine.go index 17ff6279..101cadc7 100644 --- a/receivepack/internal/service/quarantine.go +++ b/receivepack/internal/service/quarantine.go @@ -1,8 +1,15 @@ package service import ( + "bytes" "crypto/rand" + "errors" + "fmt" + "io" + "io/fs" "os" + "path" + "slices" ) // createQuarantineRoot creates one per-push quarantine directory beneath the @@ -24,3 +31,198 @@ func (service *Service) createQuarantineRoot() (string, *os.Root, error) { return name, root, nil } + +func (service *Service) openQuarantinePackRoot(quarantineRoot *os.Root) (*os.Root, error) { + err := quarantineRoot.Mkdir("pack", 0o755) + if err != nil && !os.IsExist(err) { + return nil, err + } + + return quarantineRoot.OpenRoot("pack") +} + +func (service *Service) promoteQuarantine(quarantineName string, quarantineRoot *os.Root) error { + if quarantineName == "" || quarantineRoot == nil { + return nil + } + + return service.promoteQuarantineDir(quarantineName, quarantineRoot, ".") +} + +func (service *Service) promoteQuarantineDir(quarantineName string, quarantineRoot *os.Root, rel string) error { + entries, err := fs.ReadDir(quarantineRoot.FS(), rel) + if err != nil && !os.IsNotExist(err) { + return err + } + + slices.SortFunc(entries, func(left, right fs.DirEntry) int { + return packCopyPriority(left.Name()) - packCopyPriority(right.Name()) + }) + + for _, entry := range entries { + childRel := entry.Name() + if rel != "." { + childRel = path.Join(rel, entry.Name()) + } + + if entry.IsDir() { + err = service.opts.ObjectsRoot.Mkdir(childRel, 0o755) + if err != nil && !os.IsExist(err) { + return err + } + + err = service.promoteQuarantineDir(quarantineName, quarantineRoot, childRel) + if err != nil { + return err + } + + continue + } + + err = finalizeQuarantineFile( + service.opts.ObjectsRoot, + path.Join(quarantineName, childRel), + childRel, + isLooseObjectShardPath(rel), + ) + if err == nil { + continue + } + + return err + } + + return nil +} + +func packCopyPriority(name string) int { + if !pathHasPackPrefix(name) { + return 0 + } + + switch { + case path.Ext(name) == ".keep": + return 1 + case path.Ext(name) == ".pack": + return 2 + case path.Ext(name) == ".rev": + return 3 + case path.Ext(name) == ".idx": + return 4 + default: + return 5 + } +} + +func pathHasPackPrefix(name string) bool { + return len(name) >= 4 && name[:4] == "pack" +} + +func isLooseObjectShardPath(rel string) bool { + return len(rel) == 2 && isHex(rel[0]) && isHex(rel[1]) +} + +func isHex(ch byte) bool { + return ('0' <= ch && ch <= '9') || ('a' <= ch && ch <= 'f') || ('A' <= ch && ch <= 'F') +} + +func finalizeQuarantineFile(root *os.Root, src, dst string, skipCollisionCheck bool) error { + const maxVanishedRetries = 5 + + for retries := 0; ; retries++ { + err := root.Link(src, dst) + switch { + case err == nil: + _ = root.Remove(src) + + return nil + case !errors.Is(err, fs.ErrExist): + _, statErr := root.Stat(dst) + if statErr == nil { + err = fs.ErrExist + } else if errors.Is(statErr, fs.ErrNotExist) { + if renameErr := root.Rename(src, dst); renameErr == nil { + return nil + } + + return fmt.Errorf("promote quarantine %q -> %q: %w", src, dst, err) + } else { + return statErr + } + } + + if skipCollisionCheck { + _ = root.Remove(src) + + return nil + } + + equal, vanished, cmpErr := compareRootFiles(root, src, dst) + if vanished { + if retries >= maxVanishedRetries { + return fmt.Errorf("promote quarantine %q -> %q: destination repeatedly vanished", src, dst) + } + + continue + } + + if cmpErr != nil { + return cmpErr + } + + if !equal { + return fmt.Errorf("promote quarantine %q -> %q: files differ in contents", src, dst) + } + + _ = root.Remove(src) + + return nil + } +} + +func compareRootFiles(root *os.Root, left, right string) (equal bool, vanished bool, err error) { + leftFile, err := root.Open(left) + if err != nil { + return false, false, err + } + + defer func() { + _ = leftFile.Close() + }() + + rightFile, err := root.Open(right) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return false, true, nil + } + + return false, false, err + } + + defer func() { + _ = rightFile.Close() + }() + + var leftBuf, rightBuf [4096]byte + + for { + leftN, leftErr := leftFile.Read(leftBuf[:]) + rightN, rightErr := rightFile.Read(rightBuf[:]) + + if leftErr != nil && !errors.Is(leftErr, io.EOF) { + return false, false, leftErr + } + + if rightErr != nil && !errors.Is(rightErr, io.EOF) { + return false, false, rightErr + } + + if leftN != rightN || !bytes.Equal(leftBuf[:leftN], rightBuf[:rightN]) { + return false, false, nil + } + + if leftErr != nil || rightErr != nil { + return true, false, nil + } + } +} diff --git a/receivepack/internal/service/quarantine_test.go b/receivepack/internal/service/quarantine_test.go new file mode 100644 index 00000000..795fb35d --- /dev/null +++ b/receivepack/internal/service/quarantine_test.go @@ -0,0 +1,163 @@ +package service + +import ( + "os" + "path" + "testing" + + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objectstore/memory" +) + +func TestPromoteQuarantineTreatsExistingLooseObjectAsSuccess(t *testing.T) { + t.Parallel() + + objectsDir := t.TempDir() + objectsRoot, err := os.OpenRoot(objectsDir) + if err != nil { + t.Fatalf("os.OpenRoot: %v", err) + } + + t.Cleanup(func() { + _ = objectsRoot.Close() + }) + + svc := New(Options{ + Algorithm: objectid.AlgorithmSHA1, + ExistingObjects: memory.New(objectid.AlgorithmSHA1), + ObjectsRoot: objectsRoot, + }) + + quarantineName, quarantineRoot, err := svc.createQuarantineRoot() + if err != nil { + t.Fatalf("createQuarantineRoot: %v", err) + } + + t.Cleanup(func() { + _ = quarantineRoot.Close() + _ = objectsRoot.RemoveAll(quarantineName) + }) + + if err := quarantineRoot.Mkdir("ab", 0o755); err != nil { + t.Fatalf("Mkdir(ab): %v", err) + } + + if err := objectsRoot.Mkdir("ab", 0o755); err != nil { + t.Fatalf("Mkdir(dst ab): %v", err) + } + + const payload = "same object bytes" + if err := quarantineRoot.WriteFile(path.Join("ab", "cdef"), []byte(payload), 0o644); err != nil { + t.Fatalf("WriteFile(quarantine loose): %v", err) + } + + if err := objectsRoot.WriteFile(path.Join("ab", "cdef"), []byte(payload), 0o644); err != nil { + t.Fatalf("WriteFile(permanent loose): %v", err) + } + + if err := svc.promoteQuarantine(quarantineName, quarantineRoot); err != nil { + t.Fatalf("promoteQuarantine: %v", err) + } +} + +func TestPromoteQuarantineRejectsDifferentExistingPackFile(t *testing.T) { + t.Parallel() + + objectsDir := t.TempDir() + objectsRoot, err := os.OpenRoot(objectsDir) + if err != nil { + t.Fatalf("os.OpenRoot: %v", err) + } + + t.Cleanup(func() { + _ = objectsRoot.Close() + }) + + svc := New(Options{ + Algorithm: objectid.AlgorithmSHA1, + ExistingObjects: memory.New(objectid.AlgorithmSHA1), + ObjectsRoot: objectsRoot, + }) + + quarantineName, quarantineRoot, err := svc.createQuarantineRoot() + if err != nil { + t.Fatalf("createQuarantineRoot: %v", err) + } + + t.Cleanup(func() { + _ = quarantineRoot.Close() + _ = objectsRoot.RemoveAll(quarantineName) + }) + + if err := quarantineRoot.Mkdir("pack", 0o755); err != nil { + t.Fatalf("Mkdir(pack): %v", err) + } + + if err := objectsRoot.Mkdir("pack", 0o755); err != nil { + t.Fatalf("Mkdir(dst pack): %v", err) + } + + if err := quarantineRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte("new bytes"), 0o644); err != nil { + t.Fatalf("WriteFile(quarantine pack): %v", err) + } + + if err := objectsRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte("old bytes"), 0o644); err != nil { + t.Fatalf("WriteFile(permanent pack): %v", err) + } + + err = svc.promoteQuarantine(quarantineName, quarantineRoot) + if err == nil { + t.Fatal("promoteQuarantine unexpectedly succeeded") + } +} + +func TestPromoteQuarantineAcceptsMatchingExistingPackFile(t *testing.T) { + t.Parallel() + + objectsDir := t.TempDir() + objectsRoot, err := os.OpenRoot(objectsDir) + if err != nil { + t.Fatalf("os.OpenRoot: %v", err) + } + + t.Cleanup(func() { + _ = objectsRoot.Close() + }) + + svc := New(Options{ + Algorithm: objectid.AlgorithmSHA1, + ExistingObjects: memory.New(objectid.AlgorithmSHA1), + ObjectsRoot: objectsRoot, + }) + + quarantineName, quarantineRoot, err := svc.createQuarantineRoot() + if err != nil { + t.Fatalf("createQuarantineRoot: %v", err) + } + + t.Cleanup(func() { + _ = quarantineRoot.Close() + _ = objectsRoot.RemoveAll(quarantineName) + }) + + if err := quarantineRoot.Mkdir("pack", 0o755); err != nil { + t.Fatalf("Mkdir(pack): %v", err) + } + + if err := objectsRoot.Mkdir("pack", 0o755); err != nil { + t.Fatalf("Mkdir(dst pack): %v", err) + } + + const payload = "identical pack bytes" + if err := quarantineRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte(payload), 0o644); err != nil { + t.Fatalf("WriteFile(quarantine pack): %v", err) + } + + if err := objectsRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte(payload), 0o644); err != nil { + t.Fatalf("WriteFile(permanent pack): %v", err) + } + + if err := svc.promoteQuarantine(quarantineName, quarantineRoot); err != nil { + t.Fatalf("promoteQuarantine: %v", err) + } +} diff --git a/receivepack/internal/service/request.go b/receivepack/internal/service/request.go index 62764501..7a0b1f33 100644 --- a/receivepack/internal/service/request.go +++ b/receivepack/internal/service/request.go @@ -6,6 +6,7 @@ import "io" type Request struct { Commands []Command PushOptions []string + Atomic bool DeleteOnly bool PackExpected bool Pack io.Reader |
