diff options
| author | 2026-06-23 19:24:14 +0000 | |
|---|---|---|
| committer | 2026-06-24 04:29:01 +0000 | |
| commit | fd585f724ab2eb267d32902297f40bae54a7435c (patch) | |
| tree | 8a01517cfc8baa95457d79fba2204f10c7178c75 | |
| parent | object/store/packed/internal/ingest: Use iterative resolver (diff) | |
object/store: Add context to WritePack
| -rw-r--r-- | object/store/dual/quarantine.go | 5 | ||||
| -rw-r--r-- | object/store/dual/writer.go | 5 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/finalize.go | 5 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/ingest.go | 6 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/resolve.go | 5 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/scan.go | 7 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/thin.go | 5 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/writepack_test.go | 68 | ||||
| -rw-r--r-- | object/store/packed/writer.go | 5 | ||||
| -rw-r--r-- | object/store/packed/writer_test.go | 2 | ||||
| -rw-r--r-- | object/store/writer.go | 3 |
11 files changed, 101 insertions, 15 deletions
diff --git a/object/store/dual/quarantine.go b/object/store/dual/quarantine.go index b73e48fe..168ba059 100644 --- a/object/store/dual/quarantine.go +++ b/object/store/dual/quarantine.go @@ -1,6 +1,7 @@ package dual import ( + "context" "errors" "fmt" "io" @@ -111,8 +112,8 @@ func (quarantine *coordinatedQuarantine) WriteReaderContent(ty typ.Type, size in return quarantine.objectQ.WriteReaderContent(ty, size, src) //nolint:wrapcheck } -func (quarantine *coordinatedQuarantine) WritePack(src io.Reader, opts store.PackWriteOptions) error { - return quarantine.packQ.WritePack(src, opts) //nolint:wrapcheck +func (quarantine *coordinatedQuarantine) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error { + return quarantine.packQ.WritePack(ctx, src, opts) //nolint:wrapcheck } // Promote publishes both halves and joins their errors. diff --git a/object/store/dual/writer.go b/object/store/dual/writer.go index f75f49e1..fb59adbe 100644 --- a/object/store/dual/writer.go +++ b/object/store/dual/writer.go @@ -1,6 +1,7 @@ package dual import ( + "context" "io" "lindenii.org/go/furgit/object/id" @@ -29,6 +30,6 @@ func (dual *Dual) WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.O } // WritePack ingests one pack stream into the pack side. -func (dual *Dual) WritePack(src io.Reader, opts store.PackWriteOptions) error { - return dual.pack.WritePack(src, opts) //nolint:wrapcheck +func (dual *Dual) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error { + return dual.pack.WritePack(ctx, src, opts) //nolint:wrapcheck } diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go index afed996c..fe0d770c 100644 --- a/object/store/packed/internal/ingest/finalize.go +++ b/object/store/packed/internal/ingest/finalize.go @@ -18,6 +18,11 @@ import ( // then links the pack, reverse index, and index // to their content-addressed names. func (ingestion *ingestion) finalize() (Result, error) { + err := ingestion.ctx.Err() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + entries, positions, err := ingestion.indexEntries() if err != nil { return Result{}, err diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go index 3e802324..95b87455 100644 --- a/object/store/packed/internal/ingest/ingest.go +++ b/object/store/packed/internal/ingest/ingest.go @@ -2,6 +2,7 @@ package ingest import ( "bytes" + "context" "crypto/rand" "errors" "fmt" @@ -19,6 +20,8 @@ var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exh // ingestion holds the state for one WritePack call. type ingestion struct { + ctx context.Context + // root is the destination objects/pack directory. root *os.Root @@ -84,7 +87,7 @@ type ingestion struct { // The pack must be the last thing the peer sends before that response: // any bytes arriving immediately after the trailer // are rejected as a malformed pack. -func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) { +func WritePack(ctx context.Context, root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) { if objectFormat.Size() == 0 { return Result{}, id.ErrInvalidObjectFormat } @@ -95,6 +98,7 @@ func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts } ingestion := &ingestion{ + ctx: ctx, root: root, objectFormat: objectFormat, opts: opts, diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go index bfc4289b..4e2adc13 100644 --- a/object/store/packed/internal/ingest/resolve.go +++ b/object/store/packed/internal/ingest/resolve.go @@ -97,6 +97,11 @@ func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter } for len(stack) > 0 { + err := ingestion.ctx.Err() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + frame := stack[len(stack)-1] stack = stack[:len(stack)-1] diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go index 62dd6104..31a47152 100644 --- a/object/store/packed/internal/ingest/scan.go +++ b/object/store/packed/internal/ingest/scan.go @@ -295,7 +295,12 @@ func (ingestion *ingestion) streamAndScan() error { }) for done := range ingestion.headerCount { - err := ingestion.scanEntry(ingestion.scanner.consumed) + err := ingestion.ctx.Err() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + err = ingestion.scanEntry(ingestion.scanner.consumed) if err != nil { return err } diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go index dceddac9..70aa9a63 100644 --- a/object/store/packed/internal/ingest/thin.go +++ b/object/store/packed/internal/ingest/thin.go @@ -36,6 +36,11 @@ func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency, appended := make([]int, 0, len(external)) for _, baseOID := range external { + err := ingestion.ctx.Err() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID) if errors.Is(err, store.ErrObjectNotFound) { continue diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go index adc0ba35..b5a53a4f 100644 --- a/object/store/packed/internal/ingest/writepack_test.go +++ b/object/store/packed/internal/ingest/writepack_test.go @@ -2,6 +2,7 @@ package ingest_test import ( "bytes" + "context" "errors" "io" "os" @@ -256,7 +257,7 @@ func TestWritePackIdempotent(t *testing.T) { t.Cleanup(func() { _ = root.Close() }) - first, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + first, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ ThinBase: nil, Progress: nil, }) @@ -264,7 +265,7 @@ func TestWritePackIdempotent(t *testing.T) { t.Fatalf("first WritePack: %v", err) } - second, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + second, err := ingest.WritePack(t.Context(), root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ ThinBase: nil, Progress: nil, }) @@ -305,7 +306,7 @@ func writePack( t.Cleanup(func() { _ = root.Close() }) - result, err := ingest.WritePack(root, objectFormat, src, opts) + result, err := ingest.WritePack(t.Context(), root, objectFormat, src, opts) if err != nil { t.Fatalf("WritePack: %v", err) } @@ -355,7 +356,7 @@ func TestWritePackThinWithoutBase(t *testing.T) { repo, seeded := seedHistory(t, objectFormat) stream := thinStream(t, repo, seeded) - _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ ThinBase: nil, Progress: nil, }) @@ -381,7 +382,7 @@ func TestWritePackThinMissingBase(t *testing.T) { emptyBase := emptyStore(t, objectFormat) stream := thinStream(t, repo, seeded) - _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + _, err := ingest.WritePack(t.Context(), freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ ThinBase: emptyBase, Progress: nil, }) @@ -398,6 +399,63 @@ func TestWritePackThinMissingBase(t *testing.T) { } } +// TestWritePackContextCancelled verifies that a cancelled context +// aborts ingestion and publishes no artifacts. +func TestWritePackContextCancelled(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: false, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + _, err = ingest.WritePack(ctx, root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if !errors.Is(err, context.Canceled) { + t.Fatalf("err = %v, want context.Canceled", err) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("ReadDir: %v", err) + } + + if len(entries) != 0 { + t.Fatalf("cancelled ingestion left %d files behind", len(entries)) + } + }) + } +} + // seedHistory creates one repository with a seeded history. func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) { t.Helper() diff --git a/object/store/packed/writer.go b/object/store/packed/writer.go index 59309c24..6476cc42 100644 --- a/object/store/packed/writer.go +++ b/object/store/packed/writer.go @@ -1,6 +1,7 @@ package packed import ( + "context" "fmt" "io" @@ -23,8 +24,8 @@ var _ store.PackWriter = (*Packed)(nil) // The pack must be the last thing the peer sends before that response: // any bytes arriving immediately after the trailer // are rejected as a malformed pack. -func (packed *Packed) WritePack(src io.Reader, opts store.PackWriteOptions) error { - _, err := ingest.WritePack(packed.root, packed.objectFormat, src, opts) +func (packed *Packed) WritePack(ctx context.Context, src io.Reader, opts store.PackWriteOptions) error { + _, err := ingest.WritePack(ctx, packed.root, packed.objectFormat, src, opts) if err != nil { return err //nolint:wrapcheck } diff --git a/object/store/packed/writer_test.go b/object/store/packed/writer_test.go index 8227caa7..d668647b 100644 --- a/object/store/packed/writer_test.go +++ b/object/store/packed/writer_test.go @@ -42,7 +42,7 @@ func TestWritePack(t *testing.T) { packedStore := openEmptyStore(t, objectFormat) - err = packedStore.WritePack(bytes.NewReader(stream), store.PackWriteOptions{ + err = packedStore.WritePack(t.Context(), bytes.NewReader(stream), store.PackWriteOptions{ ThinBase: nil, Progress: nil, }) diff --git a/object/store/writer.go b/object/store/writer.go index d83eec6a..eeff071c 100644 --- a/object/store/writer.go +++ b/object/store/writer.go @@ -1,6 +1,7 @@ package store import ( + "context" "errors" "io" @@ -32,7 +33,7 @@ type PackWriter interface { // WritePack ingests one pack stream, // such that the objects contained therein // become available in the relevant store. - WritePack(src io.Reader, opts PackWriteOptions) error + WritePack(ctx context.Context, src io.Reader, opts PackWriteOptions) error } // PackWriteOptions controls one pack write operation. |
