aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-06-23 19:24:14 +0000
committerGravatar Runxi Yu2026-06-24 04:29:01 +0000
commitfd585f724ab2eb267d32902297f40bae54a7435c (patch)
tree8a01517cfc8baa95457d79fba2204f10c7178c75
parentobject/store/packed/internal/ingest: Use iterative resolver (diff)
object/store: Add context to WritePack
-rw-r--r--object/store/dual/quarantine.go5
-rw-r--r--object/store/dual/writer.go5
-rw-r--r--object/store/packed/internal/ingest/finalize.go5
-rw-r--r--object/store/packed/internal/ingest/ingest.go6
-rw-r--r--object/store/packed/internal/ingest/resolve.go5
-rw-r--r--object/store/packed/internal/ingest/scan.go7
-rw-r--r--object/store/packed/internal/ingest/thin.go5
-rw-r--r--object/store/packed/internal/ingest/writepack_test.go68
-rw-r--r--object/store/packed/writer.go5
-rw-r--r--object/store/packed/writer_test.go2
-rw-r--r--object/store/writer.go3
11 files changed, 101 insertions, 15 deletions
diff --git a/object/store/dual/quarantine.go b/object/store/dual/quarantine.go
index b73e48fe..168ba059 100644
--- a/object/store/dual/quarantine.go
+++ b/object/store/dual/quarantine.go
@@ -1,6 +1,7 @@
package dual
import (
+ "context"
"errors"
"fmt"
"io"
@@ -111,8 +112,8 @@ func (quarantine *coordinatedQuarantine) WriteReaderContent(ty typ.Type, size in
return quarantine.objectQ.WriteReaderContent(ty, size, src) //nolint:wrapcheck
}
-func (quarantine *coordinatedQuarantine) WritePack(src io.Reader, opts store.PackWriteOptions) error {
- return quarantine.packQ.WritePack(src, opts) //nolint:wrapcheck
+func (quarantine *coordinatedQuarantine) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error {
+ return quarantine.packQ.WritePack(ctx, src, opts) //nolint:wrapcheck
}
// Promote publishes both halves and joins their errors.
diff --git a/object/store/dual/writer.go b/object/store/dual/writer.go
index f75f49e1..fb59adbe 100644
--- a/object/store/dual/writer.go
+++ b/object/store/dual/writer.go
@@ -1,6 +1,7 @@
package dual
import (
+ "context"
"io"
"lindenii.org/go/furgit/object/id"
@@ -29,6 +30,6 @@ func (dual *Dual) WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.O
}
// WritePack ingests one pack stream into the pack side.
-func (dual *Dual) WritePack(src io.Reader, opts store.PackWriteOptions) error {
- return dual.pack.WritePack(src, opts) //nolint:wrapcheck
+func (dual *Dual) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error {
+ return dual.pack.WritePack(ctx, src, opts) //nolint:wrapcheck
}
diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go
index afed996c..fe0d770c 100644
--- a/object/store/packed/internal/ingest/finalize.go
+++ b/object/store/packed/internal/ingest/finalize.go
@@ -18,6 +18,11 @@ import (
// then links the pack, reverse index, and index
// to their content-addressed names.
func (ingestion *ingestion) finalize() (Result, error) {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
entries, positions, err := ingestion.indexEntries()
if err != nil {
return Result{}, err
diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go
index 3e802324..95b87455 100644
--- a/object/store/packed/internal/ingest/ingest.go
+++ b/object/store/packed/internal/ingest/ingest.go
@@ -2,6 +2,7 @@ package ingest
import (
"bytes"
+ "context"
"crypto/rand"
"errors"
"fmt"
@@ -19,6 +20,8 @@ var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exh
// ingestion holds the state for one WritePack call.
type ingestion struct {
+ ctx context.Context
+
// root is the destination objects/pack directory.
root *os.Root
@@ -84,7 +87,7 @@ type ingestion struct {
// The pack must be the last thing the peer sends before that response:
// any bytes arriving immediately after the trailer
// are rejected as a malformed pack.
-func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) {
+func WritePack(ctx context.Context, root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) {
if objectFormat.Size() == 0 {
return Result{}, id.ErrInvalidObjectFormat
}
@@ -95,6 +98,7 @@ func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts
}
ingestion := &ingestion{
+ ctx: ctx,
root: root,
objectFormat: objectFormat,
opts: opts,
diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go
index bfc4289b..4e2adc13 100644
--- a/object/store/packed/internal/ingest/resolve.go
+++ b/object/store/packed/internal/ingest/resolve.go
@@ -97,6 +97,11 @@ func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter
}
for len(stack) > 0 {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
frame := stack[len(stack)-1]
stack = stack[:len(stack)-1]
diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go
index 62dd6104..31a47152 100644
--- a/object/store/packed/internal/ingest/scan.go
+++ b/object/store/packed/internal/ingest/scan.go
@@ -295,7 +295,12 @@ func (ingestion *ingestion) streamAndScan() error {
})
for done := range ingestion.headerCount {
- err := ingestion.scanEntry(ingestion.scanner.consumed)
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
+ err = ingestion.scanEntry(ingestion.scanner.consumed)
if err != nil {
return err
}
diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go
index dceddac9..70aa9a63 100644
--- a/object/store/packed/internal/ingest/thin.go
+++ b/object/store/packed/internal/ingest/thin.go
@@ -36,6 +36,11 @@ func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency,
appended := make([]int, 0, len(external))
for _, baseOID := range external {
+ err := ingestion.ctx.Err()
+ if err != nil {
+ return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
+ }
+
ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID)
if errors.Is(err, store.ErrObjectNotFound) {
continue
diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go
index adc0ba35..b5a53a4f 100644
--- a/object/store/packed/internal/ingest/writepack_test.go
+++ b/object/store/packed/internal/ingest/writepack_test.go
@@ -2,6 +2,7 @@ package ingest_test
import (
"bytes"
+ "context"
"errors"
"io"
"os"
@@ -256,7 +257,7 @@ func TestWritePackIdempotent(t *testing.T) {
t.Cleanup(func() { _ = root.Close() })
- first, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ first, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
@@ -264,7 +265,7 @@ func TestWritePackIdempotent(t *testing.T) {
t.Fatalf("first WritePack: %v", err)
}
- second, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ second, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
@@ -305,7 +306,7 @@ func writePack(
t.Cleanup(func() { _ = root.Close() })
- result, err := ingest.WritePack(root, objectFormat, src, opts)
+ result, err := ingest.WritePack(t.Context(), root, objectFormat, src, opts)
if err != nil {
t.Fatalf("WritePack: %v", err)
}
@@ -355,7 +356,7 @@ func TestWritePackThinWithoutBase(t *testing.T) {
repo, seeded := seedHistory(t, objectFormat)
stream := thinStream(t, repo, seeded)
- _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
@@ -381,7 +382,7 @@ func TestWritePackThinMissingBase(t *testing.T) {
emptyBase := emptyStore(t, objectFormat)
stream := thinStream(t, repo, seeded)
- _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: emptyBase,
Progress: nil,
})
@@ -398,6 +399,63 @@ func TestWritePackThinMissingBase(t *testing.T) {
}
}
+// TestWritePackContextCancelled verifies that a cancelled context
+// aborts ingestion and publishes no artifacts.
+func TestWritePackContextCancelled(t *testing.T) {
+ t.Parallel()
+
+ for _, objectFormat := range id.SupportedObjectFormats() {
+ t.Run(objectFormat.String(), func(t *testing.T) {
+ t.Parallel()
+
+ repo, seeded := seedHistory(t, objectFormat)
+
+ gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{
+ RevIndex: false,
+ Revs: false,
+ Exclude: nil,
+ })
+ if err != nil {
+ t.Fatalf("PackObjects: %v", err)
+ }
+
+ stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec
+ if err != nil {
+ t.Fatalf("ReadFile pack: %v", err)
+ }
+
+ ctx, cancel := context.WithCancel(t.Context())
+ cancel()
+
+ dir := t.TempDir()
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {
+ t.Fatalf("OpenRoot: %v", err)
+ }
+
+ t.Cleanup(func() { _ = root.Close() })
+
+ _, err = ingest.WritePack(ctx, root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{
+ ThinBase: nil,
+ Progress: nil,
+ })
+ if !errors.Is(err, context.Canceled) {
+ t.Fatalf("err = %v, want context.Canceled", err)
+ }
+
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ t.Fatalf("ReadDir: %v", err)
+ }
+
+ if len(entries) != 0 {
+ t.Fatalf("cancelled ingestion left %d files behind", len(entries))
+ }
+ })
+ }
+}
+
// seedHistory creates one repository with a seeded history.
func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) {
t.Helper()
diff --git a/object/store/packed/writer.go b/object/store/packed/writer.go
index 59309c24..6476cc42 100644
--- a/object/store/packed/writer.go
+++ b/object/store/packed/writer.go
@@ -1,6 +1,7 @@
package packed
import (
+ "context"
"fmt"
"io"
@@ -23,8 +24,8 @@ var _ store.PackWriter = (*Packed)(nil)
// The pack must be the last thing the peer sends before that response:
// any bytes arriving immediately after the trailer
// are rejected as a malformed pack.
-func (packed *Packed) WritePack(src io.Reader, opts store.PackWriteOptions) error {
- _, err := ingest.WritePack(packed.root, packed.objectFormat, src, opts)
+func (packed *Packed) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error {
+ _, err := ingest.WritePack(ctx, packed.root, packed.objectFormat, src, opts)
if err != nil {
return err //nolint:wrapcheck
}
diff --git a/object/store/packed/writer_test.go b/object/store/packed/writer_test.go
index 8227caa7..d668647b 100644
--- a/object/store/packed/writer_test.go
+++ b/object/store/packed/writer_test.go
@@ -42,7 +42,7 @@ func TestWritePack(t *testing.T) {
packedStore := openEmptyStore(t, objectFormat)
- err = packedStore.WritePack(bytes.NewReader(stream), store.PackWriteOptions{
+ err = packedStore.WritePack(t.Context(), bytes.NewReader(stream), store.PackWriteOptions{
ThinBase: nil,
Progress: nil,
})
diff --git a/object/store/writer.go b/object/store/writer.go
index d83eec6a..eeff071c 100644
--- a/object/store/writer.go
+++ b/object/store/writer.go
@@ -1,6 +1,7 @@
package store
import (
+ "context"
"errors"
"io"
@@ -32,7 +33,7 @@ type PackWriter interface {
// WritePack ingests one pack stream,
// such that the objects contained therein
// become available in the relevant store.
- WritePack(src io.Reader, opts PackWriteOptions) error
+ WritePack(ctx context.Context, src io.Reader, opts PackWriteOptions) error
}
// PackWriteOptions controls one pack write operation.