diff options
| author | 2026-03-08 12:03:26 +0800 | |
|---|---|---|
| committer | 2026-03-08 12:03:26 +0800 | |
| commit | ae5c818674e2c9ca950ca7a9bf93f1283e7411b7 (patch) | |
| tree | 25d1702260993a8066690c93b3da81adea6d4258 /format | |
| parent | receivepack: Trivial caps (diff) | |
| signature | No signature | |
receivepack, format/pack/ingest: Two-stage ingestion
Diffstat (limited to 'format')
| -rw-r--r-- | format/pack/ingest/api.go | 138 | ||||
| -rw-r--r-- | format/pack/ingest/errors.go | 9 | ||||
| -rw-r--r-- | format/pack/ingest/header.go | 41 | ||||
| -rw-r--r-- | format/pack/ingest/ingest_test.go | 96 | ||||
| -rw-r--r-- | format/pack/ingest/scan.go | 29 | ||||
| -rw-r--r-- | format/pack/ingest/state.go | 20 |
6 files changed, 296 insertions, 37 deletions
diff --git a/format/pack/ingest/api.go b/format/pack/ingest/api.go index eb00ded3..227f6a23 100644 --- a/format/pack/ingest/api.go +++ b/format/pack/ingest/api.go @@ -1,6 +1,9 @@ package ingest import ( + "bufio" + "bytes" + "errors" "io" "os" @@ -48,23 +51,138 @@ type Result struct { ThinFixed bool } -// Ingest ingests one pack stream from src into destination. -// -// Ingest performs streaming pack read/write/verification, delta resolution, -// optional thin fixup, then writes .idx and optionally .rev. -// -// destination ownership and lifecycle are managed by the caller. -// Ingest does not perform quarantine promotion/migration. +// HeaderInfo describes the parsed PACK header. +type HeaderInfo struct { + Version uint32 + ObjectCount uint32 +} + +// DiscardResult describes one successful Discard call. +type DiscardResult struct { + PackHash objectid.ObjectID + ObjectCount uint32 +} + +// Pending is one started ingest operation awaiting Continue or Discard. +type Pending struct { + reader *bufio.Reader + algo objectid.Algorithm + opts Options + header HeaderInfo + headerRaw [packHeaderSize]byte + + finalized bool +} + +// Ingest reads and validates one PACK header, returning one pending operation. func Ingest( src io.Reader, - destination *os.Root, algo objectid.Algorithm, opts Options, -) (Result, error) { - state, err := newIngestState(src, destination, algo, opts) +) (*Pending, error) { + if algo.Size() == 0 { + return nil, objectid.ErrInvalidAlgorithm + } + + reader := bufio.NewReader(src) + + header, headerRaw, err := readAndValidatePackHeader(reader) + if err != nil { + return nil, err + } + + return &Pending{ + reader: reader, + algo: algo, + opts: opts, + header: header, + headerRaw: headerRaw, + }, nil +} + +// Header returns parsed PACK header info. +func (pending *Pending) Header() HeaderInfo { + return pending.header +} + +// Continue ingests the pack stream into destination and writes pack artifacts. +func (pending *Pending) Continue(destination *os.Root) (Result, error) { + if pending.finalized { + return Result{}, ErrAlreadyFinalized + } + + pending.finalized = true + + if pending.header.ObjectCount == 0 { + return Result{}, ErrZeroObjectContinue + } + + state, err := newIngestState( + pending.reader, + destination, + pending.algo, + pending.opts, + pending.header, + pending.headerRaw, + ) if err != nil { return Result{}, err } return ingest(state) } + +// Discard consumes and verifies one zero-object pack stream without writing files. +func (pending *Pending) Discard() (DiscardResult, error) { + if pending.finalized { + return DiscardResult{}, ErrAlreadyFinalized + } + + pending.finalized = true + + if pending.header.ObjectCount != 0 { + return DiscardResult{}, ErrNonZeroDiscard + } + + hashImpl, err := pending.algo.New() + if err != nil { + return DiscardResult{}, err + } + + _, _ = hashImpl.Write(pending.headerRaw[:]) + + trailer := make([]byte, pending.algo.Size()) + + _, err = io.ReadFull(pending.reader, trailer) + if err != nil { + return DiscardResult{}, &PackTrailerMismatchError{} + } + + computed := hashImpl.Sum(nil) + if !bytes.Equal(computed, trailer) { + return DiscardResult{}, &PackTrailerMismatchError{} + } + + if pending.opts.RequireTrailingEOF { + var probe [1]byte + + n, err := pending.reader.Read(probe[:]) + if n > 0 || err == nil { + return DiscardResult{}, errors.New("format/pack/ingest: pack has trailing garbage") + } + + if err != io.EOF { + return DiscardResult{}, err + } + } + + packHash, err := objectid.FromBytes(pending.algo, trailer) + if err != nil { + return DiscardResult{}, err + } + + return DiscardResult{ + PackHash: packHash, + ObjectCount: 0, + }, nil +} diff --git a/format/pack/ingest/errors.go b/format/pack/ingest/errors.go index 82b662b5..d5e6d703 100644 --- a/format/pack/ingest/errors.go +++ b/format/pack/ingest/errors.go @@ -66,3 +66,12 @@ func (err *DestinationWriteError) Error() string { } var errExternalThinBase = errors.New("format/pack/ingest: external thin base required") + +var ( + // ErrAlreadyFinalized indicates Continue/Discard already called. + ErrAlreadyFinalized = errors.New("format/pack/ingest: operation already finalized") + // ErrZeroObjectContinue indicates Continue was called for a zero-object pack. + ErrZeroObjectContinue = errors.New("format/pack/ingest: cannot continue zero-object pack") + // ErrNonZeroDiscard indicates Discard was called for a non-zero-object pack. + ErrNonZeroDiscard = errors.New("format/pack/ingest: cannot discard non-zero pack") +) diff --git a/format/pack/ingest/header.go b/format/pack/ingest/header.go index fba2b175..76d43bef 100644 --- a/format/pack/ingest/header.go +++ b/format/pack/ingest/header.go @@ -3,32 +3,47 @@ package ingest import ( "encoding/binary" "fmt" + "io" "codeberg.org/lindenii/furgit/format/pack" ) -// readAndValidatePackHeader reads and validates PACK header from the stream. -func readAndValidatePackHeader(state *ingestState) error { - var hdr [12]byte +const packHeaderSize = 12 - err := state.stream.readFull(hdr[:]) +// readAndValidatePackHeader reads one PACK header from src and validates it. +func readAndValidatePackHeader(src io.Reader) (HeaderInfo, [packHeaderSize]byte, error) { + var hdr [packHeaderSize]byte + + _, err := io.ReadFull(src, hdr[:]) + if err != nil { + return HeaderInfo{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{ + Reason: fmt.Sprintf("read header: %v", err), + } + } + + header, err := parseAndValidatePackHeader(hdr) if err != nil { - return &InvalidPackHeaderError{Reason: fmt.Sprintf("read header: %v", err)} + return HeaderInfo{}, [packHeaderSize]byte{}, err } + return header, hdr, nil +} + +// parseAndValidatePackHeader validates one already-read PACK header. +func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (HeaderInfo, error) { if binary.BigEndian.Uint32(hdr[:4]) != pack.Signature { - return &InvalidPackHeaderError{Reason: "signature mismatch"} + return HeaderInfo{}, &InvalidPackHeaderError{Reason: "signature mismatch"} } version := binary.BigEndian.Uint32(hdr[4:8]) if !pack.VersionSupported(version) { - return &InvalidPackHeaderError{Reason: fmt.Sprintf("unsupported version %d", version)} - } - - state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12]) - if state.objectCountHeader == 0 { - return &InvalidPackHeaderError{Reason: "zero objects"} + return HeaderInfo{}, &InvalidPackHeaderError{ + Reason: fmt.Sprintf("unsupported version %d", version), + } } - return nil + return HeaderInfo{ + Version: version, + ObjectCount: binary.BigEndian.Uint32(hdr[8:12]), + }, nil } diff --git a/format/pack/ingest/ingest_test.go b/format/pack/ingest/ingest_test.go index 8a88eb7f..95b6643c 100644 --- a/format/pack/ingest/ingest_test.go +++ b/format/pack/ingest/ingest_test.go @@ -2,7 +2,9 @@ package ingest_test import ( "bytes" + "encoding/binary" "errors" + "io" "io/fs" "os" "path/filepath" @@ -26,6 +28,20 @@ func (r *noExtraReadReader) Read(p []byte) (int, error) { return r.reader.Read(p) } +func beginAndContinue( + src io.Reader, + packRoot *os.Root, + algo objectid.Algorithm, + opts ingest.Options, +) (ingest.Result, error) { + pending, err := ingest.Ingest(src, algo, opts) + if err != nil { + return ingest.Result{}, err + } + + return pending.Continue(packRoot) +} + // fixturePath returns one fixture file path for the selected algorithm. func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string { t.Helper() @@ -173,7 +189,7 @@ func TestIngestNonThinPackWritesPackIdxRev(t *testing.T) { packRoot := receiver.OpenPackRoot(t) - result, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ + result, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ WriteRev: true, RequireTrailingEOF: true, }) @@ -221,7 +237,7 @@ func TestIngestThinPackWithoutFixReturnsUnresolved(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) packRoot := receiver.OpenPackRoot(t) - _, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ + _, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ WriteRev: true, RequireTrailingEOF: true, }) @@ -257,7 +273,7 @@ func TestIngestThinPackWithFixThin(t *testing.T) { packRoot := receiver.OpenPackRoot(t) - _, err := ingest.Ingest(bytes.NewReader(basePack), packRoot, algo, ingest.Options{ + _, err := beginAndContinue(bytes.NewReader(basePack), packRoot, algo, ingest.Options{ RequireTrailingEOF: true, }) if err != nil { @@ -266,7 +282,7 @@ func TestIngestThinPackWithFixThin(t *testing.T) { receiverRepo := receiver.OpenRepository(t) - result, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ + result, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ FixThin: true, WriteRev: true, Base: receiverRepo.Objects(), @@ -301,7 +317,7 @@ func TestIngestPackTrailerMismatch(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) packRoot := receiver.OpenPackRoot(t) - _, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ + _, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ WriteRev: true, RequireTrailingEOF: true, }) @@ -326,6 +342,74 @@ func TestIngestPackTrailerMismatch(t *testing.T) { }) } +func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte { + t.Helper() + + hashImpl, err := algo.New() + if err != nil { + t.Fatalf("algo.New: %v", err) + } + + var header [12]byte + copy(header[:4], []byte{'P', 'A', 'C', 'K'}) + binary.BigEndian.PutUint32(header[4:8], 2) + binary.BigEndian.PutUint32(header[8:12], 0) + + _, _ = hashImpl.Write(header[:]) + + return append(header[:], hashImpl.Sum(nil)...) +} + +func TestIngestDiscardZeroObjectPack(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + packBytes := zeroObjectPackBytes(t, algo) + + pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{ + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + + if pending.Header().ObjectCount != 0 { + t.Fatalf("ObjectCount = %d, want 0", pending.Header().ObjectCount) + } + + discarded, err := pending.Discard() + if err != nil { + t.Fatalf("Discard: %v", err) + } + + if discarded.ObjectCount != 0 { + t.Fatalf("Discard.ObjectCount = %d, want 0", discarded.ObjectCount) + } + }) +} + +func TestIngestContinueRejectsZeroObjectPack(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + packBytes := zeroObjectPackBytes(t, algo) + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packRoot := receiver.OpenPackRoot(t) + + pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{ + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + + _, err = pending.Continue(packRoot) + if !errors.Is(err, ingest.ErrZeroObjectContinue) { + t.Fatalf("Continue error = %v, want ErrZeroObjectContinue", err) + } + }) +} + func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) { t.Parallel() @@ -336,7 +420,7 @@ func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) packRoot := receiver.OpenPackRoot(t) - result, err := ingest.Ingest(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{ + result, err := beginAndContinue(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{ WriteRev: true, }) if err != nil { diff --git a/format/pack/ingest/scan.go b/format/pack/ingest/scan.go index 2fa88b51..b5d683a8 100644 --- a/format/pack/ingest/scan.go +++ b/format/pack/ingest/scan.go @@ -23,7 +23,7 @@ func streamPackAndScan(state *ingestState) error { utils.WriteProgressf(state.opts.Progress, "validating pack header...\r") - err = readAndValidatePackHeader(state) + err = seedStreamWithPackHeader(state) if err != nil { return err } @@ -75,3 +75,30 @@ func streamPackAndScan(state *ingestState) error { return state.stream.flush() } + +// seedStreamWithPackHeader writes the already-validated PACK header to output, +// seeds the running pack hash, and advances stream offset accounting. +func seedStreamWithPackHeader(state *ingestState) error { + written := 0 + for written < len(state.packHeaderRaw) { + n, err := state.packFile.Write(state.packHeaderRaw[written:]) + if err != nil { + return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)} + } + + if n == 0 { + return &DestinationWriteError{Op: "write pack header: short write"} + } + + written += n + } + + _, err := state.stream.hash.Write(state.packHeaderRaw[:]) + if err != nil { + return err + } + + state.stream.consumed = packHeaderSize + + return nil +} diff --git a/format/pack/ingest/state.go b/format/pack/ingest/state.go index cbc412e3..d44b6e09 100644 --- a/format/pack/ingest/state.go +++ b/format/pack/ingest/state.go @@ -18,6 +18,8 @@ type ingestState struct { algo objectid.Algorithm opts Options + packHeaderRaw [packHeaderSize]byte + packFile *os.File packTmpName string idxFile *os.File @@ -47,18 +49,22 @@ func newIngestState( destination *os.Root, algo objectid.Algorithm, opts Options, + header HeaderInfo, + headerRaw [packHeaderSize]byte, ) (*ingestState, error) { if algo.Size() == 0 { return nil, objectid.ErrInvalidAlgorithm } return &ingestState{ - src: src, - destination: destination, - algo: algo, - opts: opts, - offsetToRecord: make(map[uint64]int), - objectToRecord: make(map[objectid.ObjectID]int), - baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes), + src: src, + destination: destination, + algo: algo, + opts: opts, + packHeaderRaw: headerRaw, + objectCountHeader: header.ObjectCount, + offsetToRecord: make(map[uint64]int), + objectToRecord: make(map[objectid.ObjectID]int), + baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes), }, nil } |
