diff options
| author | 2026-03-30 19:51:58 +0000 | |
|---|---|---|
| committer | 2026-03-30 19:57:33 +0000 | |
| commit | da621b97d0aa209e7e502e9e898e0a7a89857216 (patch) | |
| tree | c1484d878f85216ebf108e1e72136ccdabe4156f | |
| parent | repository: Use dual (diff) | |
| signature | No signature | |
network/receivepack: Use dual
| -rw-r--r-- | cmd/receivepack9418/conn.go | 53 | ||||
| -rw-r--r-- | network/receivepack/hooks/reject_force_push.go | 2 | ||||
| -rw-r--r-- | network/receivepack/int_test.go | 76 | ||||
| -rw-r--r-- | network/receivepack/options.go | 13 | ||||
| -rw-r--r-- | network/receivepack/permissions.go | 27 | ||||
| -rw-r--r-- | network/receivepack/receivepack.go | 7 | ||||
| -rw-r--r-- | network/receivepack/service/execute.go | 27 | ||||
| -rw-r--r-- | network/receivepack/service/ingest_quarantine.go | 104 | ||||
| -rw-r--r-- | network/receivepack/service/options.go | 26 | ||||
| -rw-r--r-- | network/receivepack/service/quarantine.go | 274 | ||||
| -rw-r--r-- | network/receivepack/service/quarantine_test.go | 184 | ||||
| -rw-r--r-- | network/receivepack/service/result.go | 5 | ||||
| -rw-r--r-- | network/receivepack/service/run_hook.go | 82 | ||||
| -rw-r--r-- | network/receivepack/service/service_test.go | 76 |
14 files changed, 216 insertions, 740 deletions
diff --git a/cmd/receivepack9418/conn.go b/cmd/receivepack9418/conn.go index e3b4d1ac..755cf022 100644 --- a/cmd/receivepack9418/conn.go +++ b/cmd/receivepack9418/conn.go @@ -6,9 +6,13 @@ import ( "fmt" "log" "net" + "os" "strings" "codeberg.org/lindenii/furgit/network/receivepack" + objectdual "codeberg.org/lindenii/furgit/object/store/dual" + objectloose "codeberg.org/lindenii/furgit/object/store/loose" + objectpacked "codeberg.org/lindenii/furgit/object/store/packed" ) func (srv *server) handleConn(conn net.Conn) { @@ -38,12 +42,24 @@ func (srv *server) handleConn(conn net.Conn) { gitProtocol := strings.Join(req.ExtraParameters, ":") + objectIngress, cleanupObjectIngress, err := srv.openObjectIngress() + if err != nil { + writeErrPkt(writer, fmt.Sprintf("object ingress unavailable: %v", err)) + _ = writer.Flush() + + log.Printf("receivepack9418: %s: object ingress unavailable: %v", conn.RemoteAddr(), err) + + return + } + + defer cleanupObjectIngress() + opts := receivepack.Options{ GitProtocol: gitProtocol, Algorithm: srv.repo.Algorithm(), Refs: srv.repo.Refs(), ExistingObjects: srv.repo.Objects(), - ObjectsRoot: srv.objectsRoot, + ObjectIngress: objectIngress, } err = receivepack.ReceivePack(context.Background(), writer, reader, opts) @@ -69,3 +85,38 @@ func (srv *server) handleConn(conn net.Conn) { return } } + +func (srv *server) openObjectIngress() (*objectdual.Dual, func(), error) { + err := srv.objectsRoot.Mkdir("pack", 0o755) + if err != nil && !os.IsExist(err) { + return nil, nil, err + } + + packRoot, err := srv.objectsRoot.OpenRoot("pack") + if err != nil { + return nil, nil, err + } + + looseStore, err := objectloose.New(srv.objectsRoot, srv.repo.Algorithm()) + if err != nil { + _ = packRoot.Close() + + return nil, nil, err + } + + packedStore, err := objectpacked.New(packRoot, srv.repo.Algorithm(), objectpacked.Options{WriteRev: true}) + if err != nil { + _ = looseStore.Close() + _ = packRoot.Close() + + return nil, nil, err + } + + cleanup := func() { + _ = packedStore.Close() + _ = looseStore.Close() + _ = packRoot.Close() + } + + return objectdual.New(looseStore, packedStore), cleanup, nil +} diff --git a/network/receivepack/hooks/reject_force_push.go b/network/receivepack/hooks/reject_force_push.go index ee71e96c..841af31f 100644 --- a/network/receivepack/hooks/reject_force_push.go +++ b/network/receivepack/hooks/reject_force_push.go @@ -23,8 +23,6 @@ func RejectForcePush() receivepack.Hook { objects := objectmix.New(req.QuarantinedObjects, req.ExistingObjects) - defer func() { _ = objects.Close() }() - queries := commitquery.New(fetch.New(objects), req.CommitGraph) decisions := make([]receivepack.UpdateDecision, len(req.Updates)) diff --git a/network/receivepack/int_test.go b/network/receivepack/int_test.go index 79662bc1..352bbe7b 100644 --- a/network/receivepack/int_test.go +++ b/network/receivepack/int_test.go @@ -15,6 +15,10 @@ import ( receivepack "codeberg.org/lindenii/furgit/network/receivepack" receivepackhooks "codeberg.org/lindenii/furgit/network/receivepack/hooks" objectid "codeberg.org/lindenii/furgit/object/id" + objectstore "codeberg.org/lindenii/furgit/object/store" + objectdual "codeberg.org/lindenii/furgit/object/store/dual" + objectloose "codeberg.org/lindenii/furgit/object/store/loose" + objectpacked "codeberg.org/lindenii/furgit/object/store/packed" ) func TestReceivePackDeleteOnlyAtomicDeleteSucceeds(t *testing.T) { @@ -301,7 +305,7 @@ func testReceivePackProtocolFallback(t *testing.T, gitProtocol string) { }) } -func TestReceivePackPackRequestWithoutObjectsRootReportsNotConfigured(t *testing.T) { +func TestReceivePackPackRequestWithoutObjectIngressReportsNotConfigured(t *testing.T) { t.Parallel() //nolint:thelper @@ -334,7 +338,7 @@ func TestReceivePackPackRequestWithoutObjectsRootReportsNotConfigured(t *testing } got := output.String() - if !strings.Contains(got, "unpack objects root not configured\n") { + if !strings.Contains(got, "unpack object ingress not configured\n") { t.Fatalf("unexpected receive-pack output %q", got) } }) @@ -352,7 +356,7 @@ func TestReceivePackPackCreatePromotesObjectsAndUpdatesRef(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) repo := receiver.OpenRepository(t) - objectsRoot := receiver.OpenObjectsRoot(t) + objectIngress := openReceivePackIngress(t, receiver, algo) packStream := sender.PackObjectsReader(t, []string{commitID.String()}, false) t.Cleanup(func() { @@ -377,7 +381,7 @@ func TestReceivePackPackCreatePromotesObjectsAndUpdatesRef(t *testing.T) { Algorithm: algo, Refs: repo.Refs(), ExistingObjects: repo.Objects(), - ObjectsRoot: objectsRoot, + ObjectIngress: objectIngress, }, ) if err != nil { @@ -423,7 +427,7 @@ func TestReceivePackHookSeesQuarantinedObjectsAndCanRejectBeforePromotion(t *tes receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) repo := receiver.OpenRepository(t) - objectsRoot := receiver.OpenObjectsRoot(t) + objectIngress := openReceivePackIngress(t, receiver, algo) packStream := sender.PackObjectsReader(t, []string{commitID.String()}, false) t.Cleanup(func() { @@ -449,7 +453,7 @@ func TestReceivePackHookSeesQuarantinedObjectsAndCanRejectBeforePromotion(t *tes Algorithm: algo, Refs: repo.Refs(), ExistingObjects: repo.Objects(), - ObjectsRoot: objectsRoot, + ObjectIngress: objectIngress, Hook: func(ctx context.Context, req receivepack.HookRequest) ([]receivepack.UpdateDecision, error) { hookCalled = true @@ -658,7 +662,7 @@ func TestReceivePackPredefinedRejectForcePushHookRejectsNonFastForward(t *testin testRepo.UpdateRef(t, "refs/heads/main", currentID) repo := testRepo.OpenRepository(t) - objectsRoot := testRepo.OpenObjectsRoot(t) + objectIngress := openReceivePackIngress(t, testRepo, algo) packStream := testRepo.PackObjectsReader(t, []string{forcedID.String(), "^" + currentID.String()}, false) t.Cleanup(func() { _ = packStream.Close() @@ -682,7 +686,7 @@ func TestReceivePackPredefinedRejectForcePushHookRejectsNonFastForward(t *testin Algorithm: algo, Refs: repo.Refs(), ExistingObjects: repo.Objects(), - ObjectsRoot: objectsRoot, + ObjectIngress: objectIngress, Hook: receivepackhooks.RejectForcePush(), }, ) @@ -765,7 +769,7 @@ func TestReceivePackGitPushCreatesBranch(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) repo := receiver.OpenRepository(t) - objectsRoot := receiver.OpenObjectsRoot(t) + objectIngress := openReceivePackIngress(t, receiver, algo) stdout, stderr, clientErr, serverErr := runGitPushFD( t, @@ -774,7 +778,7 @@ func TestReceivePackGitPushCreatesBranch(t *testing.T) { Algorithm: algo, Refs: repo.Refs(), ExistingObjects: repo.Objects(), - ObjectsRoot: objectsRoot, + ObjectIngress: objectIngress, }, "push", "--porcelain", "fd::3,4/test", "refs/heads/main:refs/heads/main", ) @@ -815,7 +819,7 @@ func TestReceivePackGitPushRefUpdateWithoutNewObjectsSucceeds(t *testing.T) { receiver.UpdateRef(t, "refs/heads/main", commitID) repo := receiver.OpenRepository(t) - objectsRoot := receiver.OpenObjectsRoot(t) + objectIngress := openReceivePackIngress(t, receiver, algo) stdout, stderr, clientErr, serverErr := runGitPushFD( t, @@ -824,7 +828,7 @@ func TestReceivePackGitPushRefUpdateWithoutNewObjectsSucceeds(t *testing.T) { Algorithm: algo, Refs: repo.Refs(), ExistingObjects: repo.Objects(), - ObjectsRoot: objectsRoot, + ObjectIngress: objectIngress, }, "push", "--porcelain", "fd::3,4/test", "refs/heads/main:refs/heads/topic", ) @@ -911,7 +915,7 @@ func TestReceivePackGitPushRejectsForcedUpdateViaHook(t *testing.T) { receiver.UpdateRef(t, "refs/heads/main", currentID) repo := receiver.OpenRepository(t) - objectsRoot := receiver.OpenObjectsRoot(t) + objectIngress := openReceivePackIngress(t, receiver, algo) stdout, stderr, clientErr, serverErr := runGitPushFD( t, @@ -920,7 +924,7 @@ func TestReceivePackGitPushRejectsForcedUpdateViaHook(t *testing.T) { Algorithm: algo, Refs: repo.Refs(), ExistingObjects: repo.Objects(), - ObjectsRoot: objectsRoot, + ObjectIngress: objectIngress, Hook: receivepackhooks.RejectForcePush(), }, "push", "--porcelain", "--force", "fd::3,4/test", "refs/heads/main:refs/heads/main", @@ -960,6 +964,50 @@ func pktlineData(payload string) string { return fmt.Sprintf("%04x%s", len(payload)+4, payload) } +func openReceivePackIngress( + tb testing.TB, + testRepo *testgit.TestRepo, + algo objectid.Algorithm, +) objectstore.Quarantiner { + tb.Helper() + + objectsRoot := testRepo.OpenObjectsRoot(tb) + + err := objectsRoot.Mkdir("pack", 0o755) + if err != nil && !os.IsExist(err) { + tb.Fatalf("Mkdir(pack): %v", err) + } + + packRoot, err := objectsRoot.OpenRoot("pack") + if err != nil { + tb.Fatalf("OpenRoot(pack): %v", err) + } + + tb.Cleanup(func() { + _ = packRoot.Close() + }) + + looseStore, err := objectloose.New(objectsRoot, algo) + if err != nil { + tb.Fatalf("loose.New: %v", err) + } + + tb.Cleanup(func() { + _ = looseStore.Close() + }) + + packedStore, err := objectpacked.New(packRoot, algo, objectpacked.Options{WriteRev: true}) + if err != nil { + tb.Fatalf("packed.New: %v", err) + } + + tb.Cleanup(func() { + _ = packedStore.Close() + }) + + return objectdual.New(looseStore, packedStore) +} + type fileWriteFlusher struct { *os.File } diff --git a/network/receivepack/options.go b/network/receivepack/options.go index 18d0e9b0..5806a26b 100644 --- a/network/receivepack/options.go +++ b/network/receivepack/options.go @@ -1,8 +1,6 @@ package receivepack import ( - "os" - commitgraphread "codeberg.org/lindenii/furgit/format/commitgraph/read" objectid "codeberg.org/lindenii/furgit/object/id" objectstore "codeberg.org/lindenii/furgit/object/store" @@ -14,7 +12,7 @@ import ( // ReceivePack borrows all configured dependencies. // // Refs and ExistingObjects are required and must be non-nil. -// ObjectsRoot is required if the invocation may need to ingest or promote a +// ObjectIngress is required if the invocation may need to ingest or quarantine a // pack. type Options struct { // GitProtocol is the raw Git protocol version string from the transport, @@ -31,15 +29,12 @@ type Options struct { // ExistingObjects is the object store visible to the push before any newly // uploaded quarantined objects are promoted. ExistingObjects objectstore.Reader + // ObjectIngress creates coordinated quarantines for quarantined object and + // pack ingestion during the push. + ObjectIngress objectstore.Quarantiner // CommitGraph is an optional commit-graph snapshot corresponding to // ExistingObjects. CommitGraph *commitgraphread.Reader - // ObjectsRoot is the permanent object storage root beneath which per-push - // quarantine directories are derived. - ObjectsRoot *os.Root - // PromotedObjectPermissions, when non-nil, is applied to objects and - // directories moved from quarantine into the permanent object store. - PromotedObjectPermissions *PromotedObjectPermissions // Hook, when non-nil, runs after pack ingestion into quarantine and before // quarantine promotion or ref updates. Hook is borrowed for the duration of // ReceivePack. diff --git a/network/receivepack/permissions.go b/network/receivepack/permissions.go deleted file mode 100644 index 1aaa5a0c..00000000 --- a/network/receivepack/permissions.go +++ /dev/null @@ -1,27 +0,0 @@ -package receivepack - -import ( - "io/fs" - - "codeberg.org/lindenii/furgit/network/receivepack/service" -) - -// PromotedObjectPermissions configures the destination permissions applied to -// objects and directories promoted out of quarantine. -type PromotedObjectPermissions struct { - DirMode fs.FileMode - FileMode fs.FileMode -} - -func translatePromotedObjectPermissions( - perms *PromotedObjectPermissions, -) *service.PromotedObjectPermissions { - if perms == nil { - return nil - } - - return &service.PromotedObjectPermissions{ - DirMode: perms.DirMode, - FileMode: perms.FileMode, - } -} diff --git a/network/receivepack/receivepack.go b/network/receivepack/receivepack.go index 1e6e6d5f..2a336cb1 100644 --- a/network/receivepack/receivepack.go +++ b/network/receivepack/receivepack.go @@ -108,13 +108,10 @@ func ReceivePack( Algorithm: opts.Algorithm, Refs: opts.Refs, ExistingObjects: opts.ExistingObjects, + ObjectIngress: opts.ObjectIngress, CommitGraph: opts.CommitGraph, - ObjectsRoot: opts.ObjectsRoot, Progress: progress, - PromotedObjectPermissions: translatePromotedObjectPermissions( - opts.PromotedObjectPermissions, - ), - Hook: translateHook(opts.Hook), + Hook: translateHook(opts.Hook), HookIO: service.HookIO{ Progress: progress, Error: protoSession.ErrorWriter(), diff --git a/network/receivepack/service/execute.go b/network/receivepack/service/execute.go index 5b00dba5..08177873 100644 --- a/network/receivepack/service/execute.go +++ b/network/receivepack/service/execute.go @@ -2,9 +2,9 @@ package service import ( "context" - "os" "codeberg.org/lindenii/furgit/internal/utils" + objectstore "codeberg.org/lindenii/furgit/object/store" ) // Execute validates one receive-pack request, optionally ingests its pack into @@ -15,23 +15,17 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err result := &Result{ Commands: make([]CommandResult, 0, len(req.Commands)), } + var err error - var ( - quarantineName string - quarantineRoot *os.Root - err error - ) - - quarantineName, quarantineRoot, ok := service.ingestQuarantine(result, req.Commands, req) + quarantine, ok := service.ingestQuarantine(result, req.Commands, req) if !ok { return result, nil } - if quarantineRoot != nil { - defer func() { - _ = quarantineRoot.Close() - _ = service.opts.ObjectsRoot.RemoveAll(quarantineName) - }() + if quarantine != nil { + defer func(q objectstore.Quarantine) { + _ = q.Discard() + }(quarantine) } for _, command := range req.Commands { @@ -51,7 +45,7 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err ctx, req, req.Commands, - quarantineName, + quarantine, ) if !ok { fillCommandErrors(result, req.Commands, errText) @@ -79,12 +73,12 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err return result, nil } - if req.PackExpected && quarantineRoot != nil { + if req.PackExpected && quarantine != nil { // Git migrates quarantined objects into permanent storage immediately // before starting ref updates. utils.BestEffortFprintf(service.opts.Progress, "promoting quarantine...\r") - err = service.promoteQuarantine(quarantineName, quarantineRoot) + err := quarantine.Promote() if err != nil { utils.BestEffortFprintf(service.opts.Progress, "promoting quarantine: failed: %v.\n", err) @@ -94,6 +88,7 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err return result, nil } + quarantine = nil utils.BestEffortFprintf(service.opts.Progress, "promoting quarantine: done.\n") } diff --git a/network/receivepack/service/ingest_quarantine.go b/network/receivepack/service/ingest_quarantine.go index 14083fe1..75f3a790 100644 --- a/network/receivepack/service/ingest_quarantine.go +++ b/network/receivepack/service/ingest_quarantine.go @@ -1,19 +1,17 @@ package service import ( - "os" - - "codeberg.org/lindenii/furgit/format/packfile/ingest" "codeberg.org/lindenii/furgit/internal/utils" + objectstore "codeberg.org/lindenii/furgit/object/store" ) func (service *Service) ingestQuarantine( result *Result, commands []Command, req *Request, -) (string, *os.Root, bool) { +) (objectstore.Quarantine, bool) { if !req.PackExpected { - return "", nil, true + return nil, true } if req.Pack == nil { @@ -22,16 +20,16 @@ func (service *Service) ingestQuarantine( result.UnpackError = "missing pack stream" fillCommandErrors(result, commands, "missing pack stream") - return "", nil, false + return nil, false } - if service.opts.ObjectsRoot == nil { - utils.BestEffortFprintf(service.opts.Progress, "unpack failed: objects root not configured.\n") + if service.opts.ObjectIngress == nil { + utils.BestEffortFprintf(service.opts.Progress, "unpack failed: object ingress not configured.\n") - result.UnpackError = "objects root not configured" - fillCommandErrors(result, commands, "objects root not configured") + result.UnpackError = "object ingress not configured" + fillCommandErrors(result, commands, "object ingress not configured") - return "", nil, false + return nil, false } var err error @@ -43,101 +41,41 @@ func (service *Service) ingestQuarantine( result.UnpackError = err.Error() fillCommandErrors(result, commands, err.Error()) - return "", nil, false - } - - pending, err := ingest.Ingest( - req.Pack, - service.opts.Algorithm, - ingest.Options{ - FixThin: true, - WriteRev: true, - Base: service.opts.ExistingObjects, - Progress: service.opts.Progress, - }, - ) - if err != nil { - utils.BestEffortFprintf(service.opts.Progress, "unpack failed: %v.\n", err) - - result.UnpackError = err.Error() - fillCommandErrors(result, commands, err.Error()) - - return "", nil, false - } - - if pending.Header().ObjectCount == 0 { - discarded, err := pending.Discard() - if err != nil { - utils.BestEffortFprintf(service.opts.Progress, "unpack failed: %v.\n", err) - - result.UnpackError = err.Error() - fillCommandErrors(result, commands, err.Error()) - - return "", nil, false - } - - result.Ingest = &ingest.Result{ - PackHash: discarded.PackHash, - ObjectCount: discarded.ObjectCount, - } - - utils.BestEffortFprintf( - service.opts.Progress, - "unpacking: done (%d objects, %s).\n", - discarded.ObjectCount, - discarded.PackHash, - ) - - return "", nil, true + return nil, false } utils.BestEffortFprintf(service.opts.Progress, "creating quarantine...\r") - quarantineName, quarantineRoot, err := service.createQuarantineRoot() - if err != nil { - utils.BestEffortFprintf(service.opts.Progress, "unpack failed: %v.\n", err) - - result.UnpackError = err.Error() - fillCommandErrors(result, commands, err.Error()) - - return "", nil, false - } - - quarantinePackRoot, err := service.openQuarantinePackRoot(quarantineRoot) + quarantine, err := service.opts.ObjectIngress.BeginQuarantine(objectstore.QuarantineOptions{}) if err != nil { utils.BestEffortFprintf(service.opts.Progress, "unpack failed: %v.\n", err) result.UnpackError = err.Error() fillCommandErrors(result, commands, err.Error()) - _ = quarantineRoot.Close() - _ = service.opts.ObjectsRoot.RemoveAll(quarantineName) - - return "", nil, false + return nil, false } utils.BestEffortFprintf(service.opts.Progress, "creating quarantine: done.\n") utils.BestEffortFprintf(service.opts.Progress, "unpacking...\r") - ingested, err := pending.Continue(quarantinePackRoot) - - _ = quarantinePackRoot.Close() - + err = quarantine.WritePack(req.Pack, objectstore.PackWriteOptions{ + ThinBase: service.opts.ExistingObjects, + Progress: service.opts.Progress, + RequireTrailingEOF: false, + }) if err != nil { utils.BestEffortFprintf(service.opts.Progress, "unpack failed: %v.\n", err) result.UnpackError = err.Error() fillCommandErrors(result, commands, err.Error()) - _ = quarantineRoot.Close() - _ = service.opts.ObjectsRoot.RemoveAll(quarantineName) + _ = quarantine.Discard() - return "", nil, false + return nil, false } - utils.BestEffortFprintf(service.opts.Progress, "unpacking: done (%d objects, %s).\n", ingested.ObjectCount, ingested.PackHash) - - result.Ingest = &ingested + utils.BestEffortFprintf(service.opts.Progress, "unpacking: done.\n") - return quarantineName, quarantineRoot, true + return quarantine, true } diff --git a/network/receivepack/service/options.go b/network/receivepack/service/options.go index 4d7b48c8..9e790bc0 100644 --- a/network/receivepack/service/options.go +++ b/network/receivepack/service/options.go @@ -1,9 +1,6 @@ package service import ( - "io/fs" - "os" - "codeberg.org/lindenii/furgit/common/iowrap" commitgraphread "codeberg.org/lindenii/furgit/format/commitgraph/read" objectid "codeberg.org/lindenii/furgit/object/id" @@ -11,18 +8,14 @@ import ( refstore "codeberg.org/lindenii/furgit/ref/store" ) -type PromotedObjectPermissions struct { - DirMode fs.FileMode - FileMode fs.FileMode -} - // Options configures one protocol-independent receive-pack service. // // Service borrows all configured dependencies. // // Refs and ExistingObjects are required and must be non-nil. -// ObjectsRoot is required if Execute may need to ingest or promote a pack. -// Progress, Hook, and HookIO are optional; when provided they are also +// ObjectIngress is required if Execute may need to ingest or quarantine a +// pack. +// CommitGraph, Progress, Hook, and HookIO are optional; when provided they are also // borrowed for the duration of Execute. type Options struct { Algorithm objectid.Algorithm @@ -31,11 +24,10 @@ type Options struct { refstore.TransactionalStore refstore.BatchStore } - ExistingObjects objectstore.Reader - CommitGraph *commitgraphread.Reader - ObjectsRoot *os.Root - Progress iowrap.WriteFlusher - PromotedObjectPermissions *PromotedObjectPermissions - Hook Hook - HookIO HookIO + ExistingObjects objectstore.Reader + ObjectIngress objectstore.Quarantiner + CommitGraph *commitgraphread.Reader + Progress iowrap.WriteFlusher + Hook Hook + HookIO HookIO } diff --git a/network/receivepack/service/quarantine.go b/network/receivepack/service/quarantine.go deleted file mode 100644 index 0bd98aeb..00000000 --- a/network/receivepack/service/quarantine.go +++ /dev/null @@ -1,274 +0,0 @@ -package service - -import ( - "bytes" - "crypto/rand" - "errors" - "fmt" - "io" - "io/fs" - "os" - "path" - "slices" -) - -// createQuarantineRoot creates one per-push quarantine directory beneath the -// permanent objects root. -// -// It returns both the quarantine directory name relative to ObjectsRoot and an -// opened root for that directory. Callers use the name for later promotion or -// removal relative to ObjectsRoot, and use the opened root for capability-based -// access within the quarantine itself. -func (service *Service) createQuarantineRoot() (string, *os.Root, error) { - name := "tmp_objdir-incoming-" + rand.Text() - - err := service.opts.ObjectsRoot.Mkdir(name, 0o700) - if err != nil { - return "", nil, err - } - - root, err := service.opts.ObjectsRoot.OpenRoot(name) - if err != nil { - _ = service.opts.ObjectsRoot.RemoveAll(name) - - return "", nil, err - } - - return name, root, nil -} - -func (service *Service) openQuarantinePackRoot(quarantineRoot *os.Root) (*os.Root, error) { - err := quarantineRoot.Mkdir("pack", 0o755) - if err != nil && !os.IsExist(err) { - return nil, err - } - - return quarantineRoot.OpenRoot("pack") -} - -func (service *Service) promoteQuarantine(quarantineName string, quarantineRoot *os.Root) error { - if quarantineName == "" || quarantineRoot == nil { - return nil - } - - return service.promoteQuarantineDir(quarantineName, quarantineRoot, ".") -} - -func (service *Service) promoteQuarantineDir(quarantineName string, quarantineRoot *os.Root, rel string) error { - entries, err := fs.ReadDir(quarantineRoot.FS(), rel) - if err != nil && !os.IsNotExist(err) { - return err - } - - slices.SortFunc(entries, func(left, right fs.DirEntry) int { - return packCopyPriority(left.Name()) - packCopyPriority(right.Name()) - }) - - for _, entry := range entries { - childRel := entry.Name() - if rel != "." { - childRel = path.Join(rel, entry.Name()) - } - - if entry.IsDir() { - err = service.opts.ObjectsRoot.Mkdir(childRel, 0o755) - if err != nil && !os.IsExist(err) { - return err - } - - err = service.applyPromotedDirectoryPermissions(childRel) - if err != nil { - return err - } - - err = service.promoteQuarantineDir(quarantineName, quarantineRoot, childRel) - if err != nil { - return err - } - - continue - } - - err = finalizeQuarantineFile( - service.opts.ObjectsRoot, - path.Join(quarantineName, childRel), - childRel, - isLooseObjectShardPath(rel), - service.opts.PromotedObjectPermissions, - ) - if err == nil { - continue - } - - return err - } - - return nil -} - -func packCopyPriority(name string) int { - if !pathHasPackPrefix(name) { - return 0 - } - - switch { - case path.Ext(name) == ".keep": - return 1 - case path.Ext(name) == ".pack": - return 2 - case path.Ext(name) == ".rev": - return 3 - case path.Ext(name) == ".idx": - return 4 - default: - return 5 - } -} - -func pathHasPackPrefix(name string) bool { - return len(name) >= 4 && name[:4] == "pack" -} - -func isLooseObjectShardPath(rel string) bool { - return len(rel) == 2 && isHex(rel[0]) && isHex(rel[1]) -} - -func isHex(ch byte) bool { - return ('0' <= ch && ch <= '9') || ('a' <= ch && ch <= 'f') || ('A' <= ch && ch <= 'F') -} - -func (service *Service) applyPromotedDirectoryPermissions(name string) error { - if service.opts.PromotedObjectPermissions == nil { - return nil - } - - return service.opts.ObjectsRoot.Chmod(name, service.opts.PromotedObjectPermissions.DirMode) -} - -func applyPromotedFilePermissions( - root *os.Root, - name string, - perms *PromotedObjectPermissions, -) error { - if perms == nil { - return nil - } - - return root.Chmod(name, perms.FileMode) -} - -func finalizeQuarantineFile( - root *os.Root, - src, dst string, - skipCollisionCheck bool, - perms *PromotedObjectPermissions, -) error { - const maxVanishedRetries = 5 - - for retries := 0; ; retries++ { - err := root.Link(src, dst) - switch { - case err == nil: - _ = root.Remove(src) - - return applyPromotedFilePermissions(root, dst, perms) - case !errors.Is(err, fs.ErrExist): - _, statErr := root.Stat(dst) - switch { - case statErr == nil: - err = fs.ErrExist - case errors.Is(statErr, fs.ErrNotExist): - renameErr := root.Rename(src, dst) - if renameErr == nil { - return applyPromotedFilePermissions(root, dst, perms) - } - - err = renameErr - default: - _ = root.Remove(src) - - return statErr - } - } - - if !errors.Is(err, fs.ErrExist) { - _ = root.Remove(src) - - return fmt.Errorf("promote quarantine %q -> %q: %w", src, dst, err) - } - - if skipCollisionCheck { - _ = root.Remove(src) - - return applyPromotedFilePermissions(root, dst, perms) - } - - equal, vanished, cmpErr := compareRootFiles(root, src, dst) - if vanished { - if retries >= maxVanishedRetries { - return fmt.Errorf("promote quarantine %q -> %q: destination repeatedly vanished", src, dst) - } - - continue - } - - if cmpErr != nil { - return cmpErr - } - - if !equal { - return fmt.Errorf("promote quarantine %q -> %q: files differ in contents", src, dst) - } - - _ = root.Remove(src) - - return applyPromotedFilePermissions(root, dst, perms) - } -} - -func compareRootFiles(root *os.Root, left, right string) (equal bool, vanished bool, err error) { - leftFile, err := root.Open(left) - if err != nil { - return false, false, err - } - - defer func() { - _ = leftFile.Close() - }() - - rightFile, err := root.Open(right) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - return false, true, nil - } - - return false, false, err - } - - defer func() { - _ = rightFile.Close() - }() - - var leftBuf, rightBuf [4096]byte - - for { - leftN, leftErr := leftFile.Read(leftBuf[:]) - rightN, rightErr := rightFile.Read(rightBuf[:]) - - if leftErr != nil && !errors.Is(leftErr, io.EOF) { - return false, false, leftErr - } - - if rightErr != nil && !errors.Is(rightErr, io.EOF) { - return false, false, rightErr - } - - if leftN != rightN || !bytes.Equal(leftBuf[:leftN], rightBuf[:rightN]) { - return false, false, nil - } - - if leftErr != nil || rightErr != nil { - return true, false, nil - } - } -} diff --git a/network/receivepack/service/quarantine_test.go b/network/receivepack/service/quarantine_test.go deleted file mode 100644 index e0389472..00000000 --- a/network/receivepack/service/quarantine_test.go +++ /dev/null @@ -1,184 +0,0 @@ -package service //nolint:testpackage - -// because we need access to quarantine internals - -import ( - "os" - "path" - "testing" - - objectid "codeberg.org/lindenii/furgit/object/id" - "codeberg.org/lindenii/furgit/object/store/memory" -) - -type quarantineFixture struct { - svc *Service - objectsRoot *os.Root - quarantineName string - quarantineRoot *os.Root -} - -func newQuarantineFixture(tb testing.TB, opts Options) *quarantineFixture { - tb.Helper() - - objectsRoot, err := os.OpenRoot(tb.TempDir()) - if err != nil { - tb.Fatalf("os.OpenRoot: %v", err) - } - - tb.Cleanup(func() { - _ = objectsRoot.Close() - }) - - opts.Algorithm = objectid.AlgorithmSHA1 - opts.ExistingObjects = memory.New(objectid.AlgorithmSHA1) - opts.ObjectsRoot = objectsRoot - - svc := New(opts) - - quarantineName, quarantineRoot, err := svc.createQuarantineRoot() - if err != nil { - tb.Fatalf("createQuarantineRoot: %v", err) - } - - tb.Cleanup(func() { - _ = quarantineRoot.Close() - _ = objectsRoot.RemoveAll(quarantineName) - }) - - return &quarantineFixture{ - svc: svc, - objectsRoot: objectsRoot, - quarantineName: quarantineName, - quarantineRoot: quarantineRoot, - } -} - -func writeMatchingPromotedFile( - tb testing.TB, - quarantineRoot, objectsRoot *os.Root, - dir, name, payload string, -) { - tb.Helper() - - err := quarantineRoot.Mkdir(dir, 0o755) - if err != nil { - tb.Fatalf("Mkdir(%s): %v", dir, err) - } - - err = objectsRoot.Mkdir(dir, 0o755) - if err != nil { - tb.Fatalf("Mkdir(dst %s): %v", dir, err) - } - - rel := path.Join(dir, name) - - err = quarantineRoot.WriteFile(rel, []byte(payload), 0o644) - if err != nil { - tb.Fatalf("WriteFile(quarantine %s): %v", rel, err) - } - - err = objectsRoot.WriteFile(rel, []byte(payload), 0o644) - if err != nil { - tb.Fatalf("WriteFile(permanent %s): %v", rel, err) - } -} - -func TestPromoteQuarantineAppliesConfiguredPermissions(t *testing.T) { - t.Parallel() - - fx := newQuarantineFixture(t, Options{ - PromotedObjectPermissions: &PromotedObjectPermissions{ - DirMode: 0o751, - FileMode: 0o640, - }, - }) - - err := fx.quarantineRoot.Mkdir("ab", 0o700) - if err != nil { - t.Fatalf("Mkdir(ab): %v", err) - } - - err = fx.quarantineRoot.WriteFile(path.Join("ab", "cdef"), []byte("payload"), 0o600) - if err != nil { - t.Fatalf("WriteFile(quarantine loose): %v", err) - } - - err = fx.svc.promoteQuarantine(fx.quarantineName, fx.quarantineRoot) - if err != nil { - t.Fatalf("promoteQuarantine: %v", err) - } - - dirInfo, err := fx.objectsRoot.Stat("ab") - if err != nil { - t.Fatalf("Stat(ab): %v", err) - } - - if got := dirInfo.Mode().Perm(); got != 0o751 { - t.Fatalf("dir mode = %o, want 751", got) - } - - fileInfo, err := fx.objectsRoot.Stat(path.Join("ab", "cdef")) - if err != nil { - t.Fatalf("Stat(ab/cdef): %v", err) - } - - if got := fileInfo.Mode().Perm(); got != 0o640 { - t.Fatalf("file mode = %o, want 640", got) - } -} - -func TestPromoteQuarantineTreatsExistingLooseObjectAsSuccess(t *testing.T) { - t.Parallel() - - fx := newQuarantineFixture(t, Options{}) - writeMatchingPromotedFile(t, fx.quarantineRoot, fx.objectsRoot, "ab", "cdef", "same object bytes") - - err := fx.svc.promoteQuarantine(fx.quarantineName, fx.quarantineRoot) - if err != nil { - t.Fatalf("promoteQuarantine: %v", err) - } -} - -func TestPromoteQuarantineRejectsDifferentExistingPackFile(t *testing.T) { - t.Parallel() - - fx := newQuarantineFixture(t, Options{}) - - err := fx.quarantineRoot.Mkdir("pack", 0o755) - if err != nil { - t.Fatalf("Mkdir(pack): %v", err) - } - - err = fx.objectsRoot.Mkdir("pack", 0o755) - if err != nil { - t.Fatalf("Mkdir(dst pack): %v", err) - } - - err = fx.quarantineRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte("new bytes"), 0o644) - if err != nil { - t.Fatalf("WriteFile(quarantine pack): %v", err) - } - - err = fx.objectsRoot.WriteFile(path.Join("pack", "pack-a.pack"), []byte("old bytes"), 0o644) - if err != nil { - t.Fatalf("WriteFile(permanent pack): %v", err) - } - - err = fx.svc.promoteQuarantine(fx.quarantineName, fx.quarantineRoot) - if err == nil { - t.Fatal("promoteQuarantine unexpectedly succeeded") - } -} - -func TestPromoteQuarantineAcceptsMatchingExistingPackFile(t *testing.T) { - t.Parallel() - - fx := newQuarantineFixture(t, Options{}) - writeMatchingPromotedFile(t, fx.quarantineRoot, fx.objectsRoot, "pack", "pack-a.pack", "identical pack bytes") - - err := fx.svc.promoteQuarantine(fx.quarantineName, fx.quarantineRoot) - if err != nil { - t.Fatalf("promoteQuarantine: %v", err) - } -} diff --git a/network/receivepack/service/result.go b/network/receivepack/service/result.go index c5ff3812..7a75be11 100644 --- a/network/receivepack/service/result.go +++ b/network/receivepack/service/result.go @@ -1,14 +1,9 @@ package service -import ( - "codeberg.org/lindenii/furgit/format/packfile/ingest" -) - // Result is one receive-pack execution result. type Result struct { UnpackError string Commands []CommandResult - Ingest *ingest.Result Planned []PlannedUpdate Applied bool } diff --git a/network/receivepack/service/run_hook.go b/network/receivepack/service/run_hook.go index c3b1a8c7..1943dfd7 100644 --- a/network/receivepack/service/run_hook.go +++ b/network/receivepack/service/run_hook.go @@ -2,20 +2,16 @@ package service import ( "context" - "os" "codeberg.org/lindenii/furgit/internal/utils" objectstore "codeberg.org/lindenii/furgit/object/store" - "codeberg.org/lindenii/furgit/object/store/loose" - objectmix "codeberg.org/lindenii/furgit/object/store/mix" - "codeberg.org/lindenii/furgit/object/store/packed" ) func (service *Service) runHook( ctx context.Context, req *Request, commands []Command, - quarantineName string, + quarantinedObjects objectstore.Reader, ) ( allowedCommands []Command, allowedIndices []int, @@ -37,82 +33,6 @@ func (service *Service) runHook( utils.BestEffortFprintf(service.opts.Progress, "running hooks...\r") - quarantinedObjects := service.opts.ExistingObjects - - var ( - quarantineObjectsStore objectstore.Reader - quarantineLooseStore *loose.Store - quarantinePackedStore *packed.Store - quarantineLooseRoot *os.Root - quarantinePackRoot *os.Root - err error - ) - - //nolint:nestif - if quarantineName != "" { - quarantineLooseRoot, err = service.opts.ObjectsRoot.OpenRoot(quarantineName) - if err != nil { - utils.BestEffortFprintf(service.opts.Progress, "running hooks: failed: %v.\n", err) - - return nil, nil, nil, false, err.Error() - } - - quarantineLooseStore, err = loose.New(quarantineLooseRoot, service.opts.Algorithm) - if err != nil { - _ = quarantineLooseRoot.Close() - - utils.BestEffortFprintf(service.opts.Progress, "running hooks: failed: %v.\n", err) - - return nil, nil, nil, false, err.Error() - } - - quarantinedObjects = quarantineLooseStore - - quarantinePackRoot, err = quarantineLooseRoot.OpenRoot("pack") - if err == nil { - var packedErr error - - quarantinePackedStore, packedErr = packed.New(quarantinePackRoot, service.opts.Algorithm, packed.Options{}) - if packedErr != nil { - _ = quarantineLooseStore.Close() - _ = quarantinePackRoot.Close() - _ = quarantineLooseRoot.Close() - - utils.BestEffortFprintf(service.opts.Progress, "running hooks: failed: %v.\n", packedErr) - - return nil, nil, nil, false, packedErr.Error() - } - - quarantineObjectsStore = objectmix.New(quarantineLooseStore, quarantinePackedStore) - quarantinedObjects = quarantineObjectsStore - } else if !os.IsNotExist(err) { - _ = quarantineLooseStore.Close() - _ = quarantineLooseRoot.Close() - - utils.BestEffortFprintf(service.opts.Progress, "running hooks: failed: %v.\n", err) - - return nil, nil, nil, false, err.Error() - } - - defer func() { - if quarantinePackedStore != nil { - _ = quarantinePackedStore.Close() - } - - if quarantineLooseStore != nil { - _ = quarantineLooseStore.Close() - } - - if quarantinePackRoot != nil { - _ = quarantinePackRoot.Close() - } - - if quarantineLooseRoot != nil { - _ = quarantineLooseRoot.Close() - } - }() - } - decisions, err := service.opts.Hook(ctx, HookRequest{ Refs: service.opts.Refs, ExistingObjects: service.opts.ExistingObjects, diff --git a/network/receivepack/service/service_test.go b/network/receivepack/service/service_test.go index ffb08ea4..50d02beb 100644 --- a/network/receivepack/service/service_test.go +++ b/network/receivepack/service/service_test.go @@ -2,7 +2,6 @@ package service_test import ( "context" - "io/fs" "os" "strings" "testing" @@ -10,10 +9,14 @@ import ( "codeberg.org/lindenii/furgit/internal/testgit" "codeberg.org/lindenii/furgit/network/receivepack/service" objectid "codeberg.org/lindenii/furgit/object/id" + objectstore "codeberg.org/lindenii/furgit/object/store" + objectdual "codeberg.org/lindenii/furgit/object/store/dual" + objectloose "codeberg.org/lindenii/furgit/object/store/loose" "codeberg.org/lindenii/furgit/object/store/memory" + objectpacked "codeberg.org/lindenii/furgit/object/store/packed" ) -func TestExecutePackExpectedWithoutObjectsRoot(t *testing.T) { +func TestExecutePackExpectedWithoutObjectIngress(t *testing.T) { t.Parallel() //nolint:thelper @@ -39,13 +42,13 @@ func TestExecutePackExpectedWithoutObjectsRoot(t *testing.T) { t.Fatalf("Execute: %v", err) } - if result.UnpackError != "objects root not configured" { + if result.UnpackError != "object ingress not configured" { t.Fatalf("unexpected unpack error %q", result.UnpackError) } }) } -func TestExecuteRemovesDerivedQuarantineAfterIngestFailure(t *testing.T) { +func TestExecuteDiscardedQuarantineAfterIngestFailure(t *testing.T) { t.Parallel() //nolint:thelper @@ -53,21 +56,12 @@ func TestExecuteRemovesDerivedQuarantineAfterIngestFailure(t *testing.T) { t.Parallel() store := memory.New(algo) - objectsDir := t.TempDir() - - objectsRoot, err := os.OpenRoot(objectsDir) - if err != nil { - t.Fatalf("os.OpenRoot: %v", err) - } - - t.Cleanup(func() { - _ = objectsRoot.Close() - }) + objectIngress := newDualIngress(t, algo) svc := service.New(service.Options{ Algorithm: algo, ExistingObjects: store, - ObjectsRoot: objectsRoot, + ObjectIngress: objectIngress, }) result, err := svc.Execute(context.Background(), &service.Request{ @@ -86,14 +80,52 @@ func TestExecuteRemovesDerivedQuarantineAfterIngestFailure(t *testing.T) { if result.UnpackError == "" { t.Fatal("Execute returned empty unpack error for invalid pack") } + }) +} - entries, err := fs.ReadDir(objectsRoot.FS(), ".") - if err != nil { - t.Fatalf("fs.ReadDir: %v", err) - } +func newDualIngress(tb testing.TB, algo objectid.Algorithm) objectstore.Quarantiner { + tb.Helper() - if len(entries) != 0 { - t.Fatalf("objects root still has entries after failed ingest: %d", len(entries)) - } + objectsRoot, err := os.OpenRoot(tb.TempDir()) + if err != nil { + tb.Fatalf("os.OpenRoot: %v", err) + } + + tb.Cleanup(func() { + _ = objectsRoot.Close() }) + + err = objectsRoot.Mkdir("pack", 0o755) + if err != nil { + tb.Fatalf("Mkdir(pack): %v", err) + } + + packRoot, err := objectsRoot.OpenRoot("pack") + if err != nil { + tb.Fatalf("OpenRoot(pack): %v", err) + } + + tb.Cleanup(func() { + _ = packRoot.Close() + }) + + looseStore, err := objectloose.New(objectsRoot, algo) + if err != nil { + tb.Fatalf("loose.New: %v", err) + } + + tb.Cleanup(func() { + _ = looseStore.Close() + }) + + packedStore, err := objectpacked.New(packRoot, algo, objectpacked.Options{WriteRev: true}) + if err != nil { + tb.Fatalf("packed.New: %v", err) + } + + tb.Cleanup(func() { + _ = packedStore.Close() + }) + + return objectdual.New(looseStore, packedStore) } |
