aboutsummaryrefslogtreecommitdiff
path: root/receivepack/internal
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-03-07 19:40:27 +0800
committerGravatar Runxi Yu2026-03-07 20:53:17 +0800
commit8aa2e9f0903a80c90a9d8308138439d6f8732050 (patch)
treed2fecbf29b0eaa78da87b017005139960783f669 /receivepack/internal
parentrefstore/files: Implement batching (diff)
signatureNo signature
receivepack: Use refs
Diffstat (limited to 'receivepack/internal')
-rw-r--r--receivepack/internal/service/apply.go107
-rw-r--r--receivepack/internal/service/command.go13
-rw-r--r--receivepack/internal/service/command_result.go10
-rw-r--r--receivepack/internal/service/execute.go61
-rw-r--r--receivepack/internal/service/options.go2
-rw-r--r--receivepack/internal/service/quarantine.go202
-rw-r--r--receivepack/internal/service/quarantine_test.go163
-rw-r--r--receivepack/internal/service/request.go1
8 files changed, 542 insertions, 17 deletions
diff --git a/receivepack/internal/service/apply.go b/receivepack/internal/service/apply.go
new file mode 100644
index 00000000..c24d2a95
--- /dev/null
+++ b/receivepack/internal/service/apply.go
@@ -0,0 +1,107 @@
+package service
+
+import (
+ "codeberg.org/lindenii/furgit/objectid"
+ "codeberg.org/lindenii/furgit/refstore"
+)
+
+func (service *Service) applyAtomic(result *Result, commands []Command) error {
+ tx, err := service.opts.Refs.BeginTransaction()
+ if err != nil {
+ return err
+ }
+
+ for _, command := range commands {
+ err = queueWriteTransaction(tx, command)
+ if err != nil {
+ _ = tx.Abort()
+ fillCommandErrors(result, commands, err.Error())
+
+ return nil
+ }
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ fillCommandErrors(result, commands, err.Error())
+
+ return nil
+ }
+
+ result.Applied = true
+ for _, command := range commands {
+ result.Commands = append(result.Commands, successCommandResult(command))
+ }
+
+ return nil
+}
+
+func (service *Service) applyBatch(result *Result, commands []Command) error {
+ batch, err := service.opts.Refs.BeginBatch()
+ if err != nil {
+ return err
+ }
+
+ for _, command := range commands {
+ queueWriteBatch(batch, command)
+ }
+
+ batchResults, err := batch.Apply()
+ if err != nil && len(batchResults) == 0 {
+ return err
+ }
+
+ appliedAny := false
+
+ for i, command := range commands {
+ item := successCommandResult(command)
+ if i < len(batchResults) && batchResults[i].Error != nil {
+ item.Error = batchResults[i].Error.Error()
+ } else {
+ appliedAny = true
+ }
+
+ result.Commands = append(result.Commands, item)
+ }
+
+ result.Applied = appliedAny
+
+ return nil
+}
+
+func queueWriteTransaction(tx refstore.Transaction, command Command) error {
+ if isDelete(command) {
+ return tx.Delete(command.Name, command.OldID)
+ }
+
+ if command.OldID == objectid.Zero(command.OldID.Algorithm()) {
+ return tx.Create(command.Name, command.NewID)
+ }
+
+ return tx.Update(command.Name, command.NewID, command.OldID)
+}
+
+func queueWriteBatch(batch refstore.Batch, command Command) {
+ if isDelete(command) {
+ batch.Delete(command.Name, command.OldID)
+
+ return
+ }
+
+ if command.OldID == objectid.Zero(command.OldID.Algorithm()) {
+ batch.Create(command.Name, command.NewID)
+
+ return
+ }
+
+ batch.Update(command.Name, command.NewID, command.OldID)
+}
+
+func successCommandResult(command Command) CommandResult {
+ return CommandResult{
+ Name: command.Name,
+ RefName: command.Name,
+ OldID: objectIDPointer(command.OldID),
+ NewID: objectIDPointer(command.NewID),
+ }
+}
diff --git a/receivepack/internal/service/command.go b/receivepack/internal/service/command.go
index f51461ff..33342e41 100644
--- a/receivepack/internal/service/command.go
+++ b/receivepack/internal/service/command.go
@@ -12,8 +12,11 @@ type Command struct {
func fillCommandErrors(result *Result, commands []Command, errText string) {
for _, command := range commands {
result.Commands = append(result.Commands, CommandResult{
- Name: command.Name,
- Error: errText,
+ Name: command.Name,
+ Error: errText,
+ RefName: command.Name,
+ OldID: objectIDPointer(command.OldID),
+ NewID: objectIDPointer(command.NewID),
})
}
}
@@ -21,3 +24,9 @@ func fillCommandErrors(result *Result, commands []Command, errText string) {
func isDelete(command Command) bool {
return command.NewID == objectid.Zero(command.NewID.Algorithm())
}
+
+func objectIDPointer(id objectid.ObjectID) *objectid.ObjectID {
+ out := id
+
+ return &out
+}
diff --git a/receivepack/internal/service/command_result.go b/receivepack/internal/service/command_result.go
index 1234c8ef..18e39acc 100644
--- a/receivepack/internal/service/command_result.go
+++ b/receivepack/internal/service/command_result.go
@@ -1,7 +1,13 @@
package service
+import "codeberg.org/lindenii/furgit/objectid"
+
// CommandResult is one per-command execution result.
type CommandResult struct {
- Name string
- Error string
+ Name string
+ Error string
+ RefName string
+ OldID *objectid.ObjectID
+ NewID *objectid.ObjectID
+ ForcedUpdate bool
}
diff --git a/receivepack/internal/service/execute.go b/receivepack/internal/service/execute.go
index b3d47d29..ebba9003 100644
--- a/receivepack/internal/service/execute.go
+++ b/receivepack/internal/service/execute.go
@@ -2,7 +2,7 @@ package service
import (
"context"
- "log"
+ "os"
"codeberg.org/lindenii/furgit/format/pack/ingest"
)
@@ -12,14 +12,17 @@ import (
//
// TODO: Invoke hook or policy callbacks to decide whether each planned update
// should be allowed.
-// TODO: Apply planned ref updates with one atomic compare-and-swap ref
-// transaction once ref writing exists.
func (service *Service) Execute(ctx context.Context, req *Request) (*Result, error) {
_ = ctx
result := &Result{
Commands: make([]CommandResult, 0, len(req.Commands)),
}
+ var (
+ quarantineName string
+ quarantineRoot *os.Root
+ err error
+ )
if req.PackExpected {
if req.Pack == nil {
@@ -36,7 +39,7 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err
return result, nil
}
- quarantineName, quarantineRoot, err := service.createQuarantineRoot()
+ quarantineName, quarantineRoot, err = service.createQuarantineRoot()
if err != nil {
result.UnpackError = err.Error()
fillCommandErrors(result, req.Commands, err.Error())
@@ -46,14 +49,24 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err
defer func() {
_ = quarantineRoot.Close()
- // TODO: Promote accepted quarantined objects into the permanent object
- // store once atomic ref application exists.
_ = service.opts.ObjectsRoot.RemoveAll(quarantineName)
}()
+ quarantinePackRoot, err := service.openQuarantinePackRoot(quarantineRoot)
+ if err != nil {
+ result.UnpackError = err.Error()
+ fillCommandErrors(result, req.Commands, err.Error())
+
+ return result, nil
+ }
+
+ defer func() {
+ _ = quarantinePackRoot.Close()
+ }()
+
ingested, err := ingest.Ingest(
req.Pack,
- quarantineRoot,
+ quarantinePackRoot,
service.opts.Algorithm,
true,
true,
@@ -78,11 +91,35 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err
})
}
- fillCommandErrors(result, req.Commands, "ref updates not implemented yet")
- log.Printf(
- "receivepack: planned %d ref updates, but hook/policy checks and atomic ref writes are not implemented yet",
- len(result.Planned),
- )
+ if len(req.Commands) == 0 {
+ return result, nil
+ }
+
+ if req.PackExpected {
+ // Git migrates quarantined objects into permanent storage immediately
+ // before starting ref updates.
+ err = service.promoteQuarantine(quarantineName, quarantineRoot)
+ if err != nil {
+ result.UnpackError = err.Error()
+ fillCommandErrors(result, req.Commands, err.Error())
+
+ return result, nil
+ }
+ }
+
+ if req.Atomic {
+ err := service.applyAtomic(result, req.Commands)
+ if err != nil {
+ return result, err
+ }
+
+ return result, nil
+ }
+
+ err = service.applyBatch(result, req.Commands)
+ if err != nil {
+ return result, err
+ }
return result, nil
}
diff --git a/receivepack/internal/service/options.go b/receivepack/internal/service/options.go
index 2bc70058..4ba06827 100644
--- a/receivepack/internal/service/options.go
+++ b/receivepack/internal/service/options.go
@@ -11,7 +11,7 @@ import (
// Options configures one protocol-independent receive-pack service.
type Options struct {
Algorithm objectid.Algorithm
- Refs refstore.ReadingStore
+ Refs refstore.ReadWriteStore
ExistingObjects objectstore.Store
ObjectsRoot *os.Root
// TODO: Hook and such callbacks.
diff --git a/receivepack/internal/service/quarantine.go b/receivepack/internal/service/quarantine.go
index 17ff6279..101cadc7 100644
--- a/receivepack/internal/service/quarantine.go
+++ b/receivepack/internal/service/quarantine.go
@@ -1,8 +1,15 @@
package service
import (
+ "bytes"
"crypto/rand"
+ "errors"
+ "fmt"
+ "io"
+ "io/fs"
"os"
+ "path"
+ "slices"
)
// createQuarantineRoot creates one per-push quarantine directory beneath the
@@ -24,3 +31,198 @@ func (service *Service) createQuarantineRoot() (string, *os.Root, error) {
return name, root, nil
}
+
+func (service *Service) openQuarantinePackRoot(quarantineRoot *os.Root) (*os.Root, error) {
+ err := quarantineRoot.Mkdir("pack", 0o755)
+ if err != nil && !os.IsExist(err) {
+ return nil, err
+ }
+
+ return quarantineRoot.OpenRoot("pack")
+}
+
+func (service *Service) promoteQuarantine(quarantineName string, quarantineRoot *os.Root) error {
+ if quarantineName == "" || quarantineRoot == nil {
+ return nil
+ }
+
+ return service.promoteQuarantineDir(quarantineName, quarantineRoot, ".")
+}
+
+func (service *Service) promoteQuarantineDir(quarantineName string, quarantineRoot *os.Root, rel string) error {
+ entries, err := fs.ReadDir(quarantineRoot.FS(), rel)
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ }
+
+ slices.SortFunc(entries, func(left, right fs.DirEntry) int {
+ return packCopyPriority(left.Name()) - packCopyPriority(right.Name())
+ })
+
+ for _, entry := range entries {
+ childRel := entry.Name()
+ if rel != "." {
+ childRel = path.Join(rel, entry.Name())
+ }
+
+ if entry.IsDir() {
+ err = service.opts.ObjectsRoot.Mkdir(childRel, 0o755)
+ if err != nil && !os.IsExist(err) {
+ return err
+ }
+
+ err = service.promoteQuarantineDir(quarantineName, quarantineRoot, childRel)
+ if err != nil {
+ return err
+ }
+
+ continue
+ }
+
+ err = finalizeQuarantineFile(
+ service.opts.ObjectsRoot,
+ path.Join(quarantineName, childRel),
+ childRel,
+ isLooseObjectShardPath(rel),
+ )
+ if err == nil {
+ continue
+ }
+
+ return err
+ }
+
+ return nil
+}
+
+func packCopyPriority(name string) int {
+ if !pathHasPackPrefix(name) {
+ return 0
+ }
+
+ switch {
+ case path.Ext(name) == ".keep":
+ return 1
+ case path.Ext(name) == ".pack":
+ return 2
+ case path.Ext(name) == ".rev":
+ return 3
+ case path.Ext(name) == ".idx":
+ return 4
+ default:
+ return 5
+ }
+}
+
+func pathHasPackPrefix(name string) bool {
+ return len(name) >= 4 && name[:4] == "pack"
+}
+
+func isLooseObjectShardPath(rel string) bool {
+ return len(rel) == 2 && isHex(rel[0]) && isHex(rel[1])
+}
+
+func isHex(ch byte) bool {
+ return ('0' <= ch && ch <= '9') || ('a' <= ch && ch <= 'f') || ('A' <= ch && ch <= 'F')
+}
+
+func finalizeQuarantineFile(root *os.Root, src, dst string, skipCollisionCheck bool) error {
+ const maxVanishedRetries = 5
+
+ for retries := 0; ; retries++ {
+ err := root.Link(src, dst)
+ switch {
+ case err == nil:
+ _ = root.Remove(src)
+
+ return nil
+ case !errors.Is(err, fs.ErrExist):
+ _, statErr := root.Stat(dst)
+ if statErr == nil {
+ err = fs.ErrExist
+ } else if errors.Is(statErr, fs.ErrNotExist) {
+ if renameErr := root.Rename(src, dst); renameErr == nil {
+ return nil
+ }
+
+ return fmt.Errorf("promote quarantine %q -> %q: %w", src, dst, err)
+ } else {
+ return statErr
+ }
+ }
+
+ if skipCollisionCheck {
+ _ = root.Remove(src)
+
+ return nil
+ }
+
+ equal, vanished, cmpErr := compareRootFiles(root, src, dst)
+ if vanished {
+ if retries >= maxVanishedRetries {
+ return fmt.Errorf("promote quarantine %q -> %q: destination repeatedly vanished", src, dst)
+ }
+
+ continue
+ }
+
+ if cmpErr != nil {
+ return cmpErr
+ }
+
+ if !equal {
+ return fmt.Errorf("promote quarantine %q -> %q: files differ in contents", src, dst)
+ }
+
+ _ = root.Remove(src)
+
+ return nil
+ }
+}
+
+func compareRootFiles(root *os.Root, left, right string) (equal bool, vanished bool, err error) {
+ leftFile, err := root.Open(left)
+ if err != nil {
+ return false, false, err
+ }
+
+ defer func() {
+ _ = leftFile.Close()
+ }()
+
+ rightFile, err := root.Open(right)
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return false, true, nil
+ }
+
+ return false, false, err
+ }
+
+ defer func() {
+ _ = rightFile.Close()
+ }()
+
+ var leftBuf, rightBuf [4096]byte
+
+ for {
+ leftN, leftErr := leftFile.Read(leftBuf[:])
+ rightN, rightErr := rightFile.Read(rightBuf[:])
+
+ if leftErr != nil && !errors.Is(leftErr, io.EOF) {
+ return false, false, leftErr
+ }
+
+ if rightErr != nil && !errors.Is(rightErr, io.EOF) {
+ return false, false, rightErr
+ }
+
+ if leftN != rightN || !bytes.Equal(leftBuf[:leftN], rightBuf[:rightN]) {
+ return false, false, nil
+ }
+
+ if leftErr != nil || rightErr != nil {
+ return true, false, nil
+ }
+ }
+}
diff --git a/receivepack/internal/service/quarantine_test.go b/receivepack/internal/service/quarantine_test.go
new file mode 100644
index 00000000..795fb35d
--- /dev/null
+++ b/receivepack/internal/service/quarantine_test.go
@@ -0,0 +1,163 @@
+package service
+
+import (
+ "os"
+ "path"
+ "testing"
+
+ "codeberg.org/lindenii/furgit/objectid"
+ "codeberg.org/lindenii/furgit/objectstore/memory"
+)
+
+func TestPromoteQuarantineTreatsExistingLooseObjectAsSuccess(t *testing.T) {
+ t.Parallel()
+
+ objectsDir := t.TempDir()
+ objectsRoot, err := os.OpenRoot(objectsDir)
+ if err != nil {
+ t.Fatalf("os.OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() {
+ _ = objectsRoot.Close()
+ })
+
+ svc := New(Options{
+ Algorithm: objectid.AlgorithmSHA1,
+ ExistingObjects: memory.New(objectid.AlgorithmSHA1),
+ ObjectsRoot: objectsRoot,
+ })
+
+ quarantineName, quarantineRoot, err := svc.createQuarantineRoot()
+ if err != nil {
+ t.Fatalf("createQuarantineRoot: %v", err)
+ }
+
+ t.Cleanup(func() {
+ _ = quarantineRoot.Close()
+ _ = objectsRoot.RemoveAll(quarantineName)
+ })
+
+ if err := quarantineRoot.Mkdir("ab", 0o755); err != nil {
+ t.Fatalf("Mkdir(ab): %v", err)
+ }
+
+ if err := objectsRoot.Mkdir("ab", 0o755); err != nil {
+ t.Fatalf("Mkdir(dst ab): %v", err)
+ }
+
+ const payload = "same object bytes"
+ if err := quarantineRoot.WriteFile(path.Join("ab", "cdef"), []byte(payload), 0o644); err != nil {
+ t.Fatalf("WriteFile(quarantine loose): %v", err)
+ }
+
+ if err := objectsRoot.WriteFile(path.Join("ab", "cdef"), []byte(payload), 0o644); err != nil {
+ t.Fatalf("WriteFile(permanent loose): %v", err)
+ }
+
+ if err := svc.promoteQuarantine(quarantineName, quarantineRoot); err != nil {
+ t.Fatalf("promoteQuarantine: %v", err)
+ }
+}
+
+func TestPromoteQuarantineRejectsDifferentExistingPackFile(t *testing.T) {
+ t.Parallel()
+
+ objectsDir := t.TempDir()
+ objectsRoot, err := os.OpenRoot(objectsDir)
+ if err != nil {
+ t.Fatalf("os.OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() {
+ _ = objectsRoot.Close()
+ })
+
+ svc := New(Options{
+ Algorithm: objectid.AlgorithmSHA1,
+ ExistingObjects: memory.New(objectid.AlgorithmSHA1),
+ ObjectsRoot: objectsRoot,
+ })
+
+ quarantineName, quarantineRoot, err := svc.createQuarantineRoot()
+ if err != nil {
+ t.Fatalf("createQuarantineRoot: %v", err)
+ }
+
+ t.Cleanup(func() {
+ _ = quarantineRoot.Close()
+ _ = objectsRoot.RemoveAll(quarantineName)
+ })
+
+ if err := quarantineRoot.Mkdir("pack", 0o755); err != nil {
+ t.Fatalf("Mkdir(pack): %v", err)
+ }
+
+ if err := objectsRoot.Mkdir("pack", 0o755); err != nil {
+ t.Fatalf("Mkdir(dst pack): %v", err)
+ }
+
+ if err := quarantineRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte("new bytes"), 0o644); err != nil {
+ t.Fatalf("WriteFile(quarantine pack): %v", err)
+ }
+
+ if err := objectsRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte("old bytes"), 0o644); err != nil {
+ t.Fatalf("WriteFile(permanent pack): %v", err)
+ }
+
+ err = svc.promoteQuarantine(quarantineName, quarantineRoot)
+ if err == nil {
+ t.Fatal("promoteQuarantine unexpectedly succeeded")
+ }
+}
+
+func TestPromoteQuarantineAcceptsMatchingExistingPackFile(t *testing.T) {
+ t.Parallel()
+
+ objectsDir := t.TempDir()
+ objectsRoot, err := os.OpenRoot(objectsDir)
+ if err != nil {
+ t.Fatalf("os.OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() {
+ _ = objectsRoot.Close()
+ })
+
+ svc := New(Options{
+ Algorithm: objectid.AlgorithmSHA1,
+ ExistingObjects: memory.New(objectid.AlgorithmSHA1),
+ ObjectsRoot: objectsRoot,
+ })
+
+ quarantineName, quarantineRoot, err := svc.createQuarantineRoot()
+ if err != nil {
+ t.Fatalf("createQuarantineRoot: %v", err)
+ }
+
+ t.Cleanup(func() {
+ _ = quarantineRoot.Close()
+ _ = objectsRoot.RemoveAll(quarantineName)
+ })
+
+ if err := quarantineRoot.Mkdir("pack", 0o755); err != nil {
+ t.Fatalf("Mkdir(pack): %v", err)
+ }
+
+ if err := objectsRoot.Mkdir("pack", 0o755); err != nil {
+ t.Fatalf("Mkdir(dst pack): %v", err)
+ }
+
+ const payload = "identical pack bytes"
+ if err := quarantineRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte(payload), 0o644); err != nil {
+ t.Fatalf("WriteFile(quarantine pack): %v", err)
+ }
+
+ if err := objectsRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte(payload), 0o644); err != nil {
+ t.Fatalf("WriteFile(permanent pack): %v", err)
+ }
+
+ if err := svc.promoteQuarantine(quarantineName, quarantineRoot); err != nil {
+ t.Fatalf("promoteQuarantine: %v", err)
+ }
+}
diff --git a/receivepack/internal/service/request.go b/receivepack/internal/service/request.go
index 62764501..7a0b1f33 100644
--- a/receivepack/internal/service/request.go
+++ b/receivepack/internal/service/request.go
@@ -6,6 +6,7 @@ import "io"
type Request struct {
Commands []Command
PushOptions []string
+ Atomic bool
DeleteOnly bool
PackExpected bool
Pack io.Reader