diff options
| -rw-r--r-- | cmd/index-pack/main.go | 18 | ||||
| -rw-r--r-- | format/pack/ingest/api.go | 138 | ||||
| -rw-r--r-- | format/pack/ingest/errors.go | 9 | ||||
| -rw-r--r-- | format/pack/ingest/header.go | 41 | ||||
| -rw-r--r-- | format/pack/ingest/ingest_test.go | 96 | ||||
| -rw-r--r-- | format/pack/ingest/scan.go | 29 | ||||
| -rw-r--r-- | format/pack/ingest/state.go | 20 | ||||
| -rw-r--r-- | receivepack/int_test.go | 55 | ||||
| -rw-r--r-- | receivepack/service/execute.go | 2 | ||||
| -rw-r--r-- | receivepack/service/ingest_quarantine.go | 57 |
10 files changed, 415 insertions, 50 deletions
diff --git a/cmd/index-pack/main.go b/cmd/index-pack/main.go index 0715fa05..cfd80f6a 100644 --- a/cmd/index-pack/main.go +++ b/cmd/index-pack/main.go @@ -85,7 +85,7 @@ func run(repoPath, destinationPath, objectFormat string, fixThin, writeRev bool) defer func() { _ = destinationRoot.Close() }() - result, err := ingest.Ingest(os.Stdin, destinationRoot, algo, ingest.Options{ + pending, err := ingest.Ingest(os.Stdin, algo, ingest.Options{ FixThin: fixThin, WriteRev: writeRev, Base: base, @@ -95,6 +95,22 @@ func run(repoPath, destinationPath, objectFormat string, fixThin, writeRev bool) return err } + if pending.Header().ObjectCount == 0 { + discarded, err := pending.Discard() + if err != nil { + return err + } + + _, _ = fmt.Fprintf(os.Stdout, "pack\t%s\n", discarded.PackHash.String()) + + return nil + } + + result, err := pending.Continue(destinationRoot) + if err != nil { + return err + } + _, _ = fmt.Fprintf(os.Stdout, "pack\t%s\n", result.PackHash.String()) return nil diff --git a/format/pack/ingest/api.go b/format/pack/ingest/api.go index eb00ded3..227f6a23 100644 --- a/format/pack/ingest/api.go +++ b/format/pack/ingest/api.go @@ -1,6 +1,9 @@ package ingest import ( + "bufio" + "bytes" + "errors" "io" "os" @@ -48,23 +51,138 @@ type Result struct { ThinFixed bool } -// Ingest ingests one pack stream from src into destination. -// -// Ingest performs streaming pack read/write/verification, delta resolution, -// optional thin fixup, then writes .idx and optionally .rev. -// -// destination ownership and lifecycle are managed by the caller. -// Ingest does not perform quarantine promotion/migration. +// HeaderInfo describes the parsed PACK header. +type HeaderInfo struct { + Version uint32 + ObjectCount uint32 +} + +// DiscardResult describes one successful Discard call. +type DiscardResult struct { + PackHash objectid.ObjectID + ObjectCount uint32 +} + +// Pending is one started ingest operation awaiting Continue or Discard. +type Pending struct { + reader *bufio.Reader + algo objectid.Algorithm + opts Options + header HeaderInfo + headerRaw [packHeaderSize]byte + + finalized bool +} + +// Ingest reads and validates one PACK header, returning one pending operation. func Ingest( src io.Reader, - destination *os.Root, algo objectid.Algorithm, opts Options, -) (Result, error) { - state, err := newIngestState(src, destination, algo, opts) +) (*Pending, error) { + if algo.Size() == 0 { + return nil, objectid.ErrInvalidAlgorithm + } + + reader := bufio.NewReader(src) + + header, headerRaw, err := readAndValidatePackHeader(reader) + if err != nil { + return nil, err + } + + return &Pending{ + reader: reader, + algo: algo, + opts: opts, + header: header, + headerRaw: headerRaw, + }, nil +} + +// Header returns parsed PACK header info. +func (pending *Pending) Header() HeaderInfo { + return pending.header +} + +// Continue ingests the pack stream into destination and writes pack artifacts. +func (pending *Pending) Continue(destination *os.Root) (Result, error) { + if pending.finalized { + return Result{}, ErrAlreadyFinalized + } + + pending.finalized = true + + if pending.header.ObjectCount == 0 { + return Result{}, ErrZeroObjectContinue + } + + state, err := newIngestState( + pending.reader, + destination, + pending.algo, + pending.opts, + pending.header, + pending.headerRaw, + ) if err != nil { return Result{}, err } return ingest(state) } + +// Discard consumes and verifies one zero-object pack stream without writing files. +func (pending *Pending) Discard() (DiscardResult, error) { + if pending.finalized { + return DiscardResult{}, ErrAlreadyFinalized + } + + pending.finalized = true + + if pending.header.ObjectCount != 0 { + return DiscardResult{}, ErrNonZeroDiscard + } + + hashImpl, err := pending.algo.New() + if err != nil { + return DiscardResult{}, err + } + + _, _ = hashImpl.Write(pending.headerRaw[:]) + + trailer := make([]byte, pending.algo.Size()) + + _, err = io.ReadFull(pending.reader, trailer) + if err != nil { + return DiscardResult{}, &PackTrailerMismatchError{} + } + + computed := hashImpl.Sum(nil) + if !bytes.Equal(computed, trailer) { + return DiscardResult{}, &PackTrailerMismatchError{} + } + + if pending.opts.RequireTrailingEOF { + var probe [1]byte + + n, err := pending.reader.Read(probe[:]) + if n > 0 || err == nil { + return DiscardResult{}, errors.New("format/pack/ingest: pack has trailing garbage") + } + + if err != io.EOF { + return DiscardResult{}, err + } + } + + packHash, err := objectid.FromBytes(pending.algo, trailer) + if err != nil { + return DiscardResult{}, err + } + + return DiscardResult{ + PackHash: packHash, + ObjectCount: 0, + }, nil +} diff --git a/format/pack/ingest/errors.go b/format/pack/ingest/errors.go index 82b662b5..d5e6d703 100644 --- a/format/pack/ingest/errors.go +++ b/format/pack/ingest/errors.go @@ -66,3 +66,12 @@ func (err *DestinationWriteError) Error() string { } var errExternalThinBase = errors.New("format/pack/ingest: external thin base required") + +var ( + // ErrAlreadyFinalized indicates Continue/Discard already called. + ErrAlreadyFinalized = errors.New("format/pack/ingest: operation already finalized") + // ErrZeroObjectContinue indicates Continue was called for a zero-object pack. + ErrZeroObjectContinue = errors.New("format/pack/ingest: cannot continue zero-object pack") + // ErrNonZeroDiscard indicates Discard was called for a non-zero-object pack. + ErrNonZeroDiscard = errors.New("format/pack/ingest: cannot discard non-zero pack") +) diff --git a/format/pack/ingest/header.go b/format/pack/ingest/header.go index fba2b175..76d43bef 100644 --- a/format/pack/ingest/header.go +++ b/format/pack/ingest/header.go @@ -3,32 +3,47 @@ package ingest import ( "encoding/binary" "fmt" + "io" "codeberg.org/lindenii/furgit/format/pack" ) -// readAndValidatePackHeader reads and validates PACK header from the stream. -func readAndValidatePackHeader(state *ingestState) error { - var hdr [12]byte +const packHeaderSize = 12 - err := state.stream.readFull(hdr[:]) +// readAndValidatePackHeader reads one PACK header from src and validates it. +func readAndValidatePackHeader(src io.Reader) (HeaderInfo, [packHeaderSize]byte, error) { + var hdr [packHeaderSize]byte + + _, err := io.ReadFull(src, hdr[:]) + if err != nil { + return HeaderInfo{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{ + Reason: fmt.Sprintf("read header: %v", err), + } + } + + header, err := parseAndValidatePackHeader(hdr) if err != nil { - return &InvalidPackHeaderError{Reason: fmt.Sprintf("read header: %v", err)} + return HeaderInfo{}, [packHeaderSize]byte{}, err } + return header, hdr, nil +} + +// parseAndValidatePackHeader validates one already-read PACK header. +func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (HeaderInfo, error) { if binary.BigEndian.Uint32(hdr[:4]) != pack.Signature { - return &InvalidPackHeaderError{Reason: "signature mismatch"} + return HeaderInfo{}, &InvalidPackHeaderError{Reason: "signature mismatch"} } version := binary.BigEndian.Uint32(hdr[4:8]) if !pack.VersionSupported(version) { - return &InvalidPackHeaderError{Reason: fmt.Sprintf("unsupported version %d", version)} - } - - state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12]) - if state.objectCountHeader == 0 { - return &InvalidPackHeaderError{Reason: "zero objects"} + return HeaderInfo{}, &InvalidPackHeaderError{ + Reason: fmt.Sprintf("unsupported version %d", version), + } } - return nil + return HeaderInfo{ + Version: version, + ObjectCount: binary.BigEndian.Uint32(hdr[8:12]), + }, nil } diff --git a/format/pack/ingest/ingest_test.go b/format/pack/ingest/ingest_test.go index 8a88eb7f..95b6643c 100644 --- a/format/pack/ingest/ingest_test.go +++ b/format/pack/ingest/ingest_test.go @@ -2,7 +2,9 @@ package ingest_test import ( "bytes" + "encoding/binary" "errors" + "io" "io/fs" "os" "path/filepath" @@ -26,6 +28,20 @@ func (r *noExtraReadReader) Read(p []byte) (int, error) { return r.reader.Read(p) } +func beginAndContinue( + src io.Reader, + packRoot *os.Root, + algo objectid.Algorithm, + opts ingest.Options, +) (ingest.Result, error) { + pending, err := ingest.Ingest(src, algo, opts) + if err != nil { + return ingest.Result{}, err + } + + return pending.Continue(packRoot) +} + // fixturePath returns one fixture file path for the selected algorithm. func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string { t.Helper() @@ -173,7 +189,7 @@ func TestIngestNonThinPackWritesPackIdxRev(t *testing.T) { packRoot := receiver.OpenPackRoot(t) - result, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ + result, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ WriteRev: true, RequireTrailingEOF: true, }) @@ -221,7 +237,7 @@ func TestIngestThinPackWithoutFixReturnsUnresolved(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) packRoot := receiver.OpenPackRoot(t) - _, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ + _, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ WriteRev: true, RequireTrailingEOF: true, }) @@ -257,7 +273,7 @@ func TestIngestThinPackWithFixThin(t *testing.T) { packRoot := receiver.OpenPackRoot(t) - _, err := ingest.Ingest(bytes.NewReader(basePack), packRoot, algo, ingest.Options{ + _, err := beginAndContinue(bytes.NewReader(basePack), packRoot, algo, ingest.Options{ RequireTrailingEOF: true, }) if err != nil { @@ -266,7 +282,7 @@ func TestIngestThinPackWithFixThin(t *testing.T) { receiverRepo := receiver.OpenRepository(t) - result, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ + result, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{ FixThin: true, WriteRev: true, Base: receiverRepo.Objects(), @@ -301,7 +317,7 @@ func TestIngestPackTrailerMismatch(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) packRoot := receiver.OpenPackRoot(t) - _, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ + _, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{ WriteRev: true, RequireTrailingEOF: true, }) @@ -326,6 +342,74 @@ func TestIngestPackTrailerMismatch(t *testing.T) { }) } +func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte { + t.Helper() + + hashImpl, err := algo.New() + if err != nil { + t.Fatalf("algo.New: %v", err) + } + + var header [12]byte + copy(header[:4], []byte{'P', 'A', 'C', 'K'}) + binary.BigEndian.PutUint32(header[4:8], 2) + binary.BigEndian.PutUint32(header[8:12], 0) + + _, _ = hashImpl.Write(header[:]) + + return append(header[:], hashImpl.Sum(nil)...) +} + +func TestIngestDiscardZeroObjectPack(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + packBytes := zeroObjectPackBytes(t, algo) + + pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{ + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + + if pending.Header().ObjectCount != 0 { + t.Fatalf("ObjectCount = %d, want 0", pending.Header().ObjectCount) + } + + discarded, err := pending.Discard() + if err != nil { + t.Fatalf("Discard: %v", err) + } + + if discarded.ObjectCount != 0 { + t.Fatalf("Discard.ObjectCount = %d, want 0", discarded.ObjectCount) + } + }) +} + +func TestIngestContinueRejectsZeroObjectPack(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + packBytes := zeroObjectPackBytes(t, algo) + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packRoot := receiver.OpenPackRoot(t) + + pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{ + RequireTrailingEOF: true, + }) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + + _, err = pending.Continue(packRoot) + if !errors.Is(err, ingest.ErrZeroObjectContinue) { + t.Fatalf("Continue error = %v, want ErrZeroObjectContinue", err) + } + }) +} + func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) { t.Parallel() @@ -336,7 +420,7 @@ func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) { receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) packRoot := receiver.OpenPackRoot(t) - result, err := ingest.Ingest(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{ + result, err := beginAndContinue(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{ WriteRev: true, }) if err != nil { diff --git a/format/pack/ingest/scan.go b/format/pack/ingest/scan.go index 2fa88b51..b5d683a8 100644 --- a/format/pack/ingest/scan.go +++ b/format/pack/ingest/scan.go @@ -23,7 +23,7 @@ func streamPackAndScan(state *ingestState) error { utils.WriteProgressf(state.opts.Progress, "validating pack header...\r") - err = readAndValidatePackHeader(state) + err = seedStreamWithPackHeader(state) if err != nil { return err } @@ -75,3 +75,30 @@ func streamPackAndScan(state *ingestState) error { return state.stream.flush() } + +// seedStreamWithPackHeader writes the already-validated PACK header to output, +// seeds the running pack hash, and advances stream offset accounting. +func seedStreamWithPackHeader(state *ingestState) error { + written := 0 + for written < len(state.packHeaderRaw) { + n, err := state.packFile.Write(state.packHeaderRaw[written:]) + if err != nil { + return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)} + } + + if n == 0 { + return &DestinationWriteError{Op: "write pack header: short write"} + } + + written += n + } + + _, err := state.stream.hash.Write(state.packHeaderRaw[:]) + if err != nil { + return err + } + + state.stream.consumed = packHeaderSize + + return nil +} diff --git a/format/pack/ingest/state.go b/format/pack/ingest/state.go index cbc412e3..d44b6e09 100644 --- a/format/pack/ingest/state.go +++ b/format/pack/ingest/state.go @@ -18,6 +18,8 @@ type ingestState struct { algo objectid.Algorithm opts Options + packHeaderRaw [packHeaderSize]byte + packFile *os.File packTmpName string idxFile *os.File @@ -47,18 +49,22 @@ func newIngestState( destination *os.Root, algo objectid.Algorithm, opts Options, + header HeaderInfo, + headerRaw [packHeaderSize]byte, ) (*ingestState, error) { if algo.Size() == 0 { return nil, objectid.ErrInvalidAlgorithm } return &ingestState{ - src: src, - destination: destination, - algo: algo, - opts: opts, - offsetToRecord: make(map[uint64]int), - objectToRecord: make(map[objectid.ObjectID]int), - baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes), + src: src, + destination: destination, + algo: algo, + opts: opts, + packHeaderRaw: headerRaw, + objectCountHeader: header.ObjectCount, + offsetToRecord: make(map[uint64]int), + objectToRecord: make(map[objectid.ObjectID]int), + baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes), }, nil } diff --git a/receivepack/int_test.go b/receivepack/int_test.go index a5cd29ab..b144c387 100644 --- a/receivepack/int_test.go +++ b/receivepack/int_test.go @@ -797,6 +797,61 @@ func TestReceivePackGitPushCreatesBranch(t *testing.T) { }) } +func TestReceivePackGitPushRefUpdateWithoutNewObjectsSucceeds(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + t.Parallel() + + sender := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo}) + blobID, treeID := sender.MakeSingleFileTree(t, "base.txt", []byte("base\n")) + commitID := sender.CommitTree(t, treeID, "base") + sender.UpdateRef(t, "refs/heads/main", commitID) + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + receiver.HashObject(t, "blob", sender.RunBytes(t, "cat-file", "blob", blobID.String())) + receiver.HashObject(t, "tree", sender.RunBytes(t, "cat-file", "tree", treeID.String())) + receiver.HashObject(t, "commit", sender.RunBytes(t, "cat-file", "commit", commitID.String())) + receiver.UpdateRef(t, "refs/heads/main", commitID) + + repo := receiver.OpenRepository(t) + objectsRoot := receiver.OpenObjectsRoot(t) + + stdout, stderr, clientErr, serverErr := runGitPushFD( + t, + sender, + receivepack.Options{ + Algorithm: algo, + Refs: repo.Refs(), + ExistingObjects: repo.Objects(), + ObjectsRoot: objectsRoot, + }, + "push", "--porcelain", "fd::3,4/test", "refs/heads/main:refs/heads/topic", + ) + if clientErr != nil { + t.Fatalf("git push failed: %v\nstdout=%s\nstderr=%s", clientErr, stdout, stderr) + } + + if serverErr != nil { + t.Fatalf("ReceivePack: %v", serverErr) + } + + resolved, err := receiver.OpenRepository(t).Refs().ResolveFully("refs/heads/topic") + if err != nil { + t.Fatalf("ResolveFully(topic): %v", err) + } + + if resolved.ID != commitID { + t.Fatalf("refs/heads/topic = %s, want %s", resolved.ID, commitID) + } + + packs := receiver.Run(t, "count-objects", "-v") + if !strings.Contains(packs, "packs: 0") { + t.Fatalf("count-objects output shows unexpected promoted pack: %q", packs) + } + }) +} + func TestReceivePackGitPushAtomicDelete(t *testing.T) { t.Parallel() diff --git a/receivepack/service/execute.go b/receivepack/service/execute.go index faedff49..8f70fb83 100644 --- a/receivepack/service/execute.go +++ b/receivepack/service/execute.go @@ -77,7 +77,7 @@ func (service *Service) Execute(ctx context.Context, req *Request) (*Result, err return result, nil } - if req.PackExpected { + if req.PackExpected && quarantineRoot != nil { // Git migrates quarantined objects into permanent storage immediately // before starting ref updates. utils.WriteProgressf(service.opts.Progress, "promoting quarantine...\r") diff --git a/receivepack/service/ingest_quarantine.go b/receivepack/service/ingest_quarantine.go index ad6ce852..48815fa8 100644 --- a/receivepack/service/ingest_quarantine.go +++ b/receivepack/service/ingest_quarantine.go @@ -34,6 +34,51 @@ func (service *Service) ingestQuarantine( return "", nil, false } + pending, err := ingest.Ingest( + req.Pack, + service.opts.Algorithm, + ingest.Options{ + FixThin: true, + WriteRev: true, + Base: service.opts.ExistingObjects, + Progress: service.opts.Progress, + }, + ) + if err != nil { + utils.WriteProgressf(service.opts.Progress, "unpack failed: %v\n", err) + + result.UnpackError = err.Error() + fillCommandErrors(result, commands, err.Error()) + + return "", nil, false + } + + if pending.Header().ObjectCount == 0 { + discarded, err := pending.Discard() + if err != nil { + utils.WriteProgressf(service.opts.Progress, "unpack failed: %v\n", err) + + result.UnpackError = err.Error() + fillCommandErrors(result, commands, err.Error()) + + return "", nil, false + } + + result.Ingest = &ingest.Result{ + PackHash: discarded.PackHash, + ObjectCount: discarded.ObjectCount, + } + + utils.WriteProgressf( + service.opts.Progress, + "unpacking: done (%d objects, %s).\n", + discarded.ObjectCount, + discarded.PackHash, + ) + + return "", nil, true + } + utils.WriteProgressf(service.opts.Progress, "creating quarantine...\r") quarantineName, quarantineRoot, err := service.createQuarantineRoot() @@ -62,17 +107,7 @@ func (service *Service) ingestQuarantine( utils.WriteProgressf(service.opts.Progress, "creating quarantine: done.\n") utils.WriteProgressf(service.opts.Progress, "unpacking...\r") - ingested, err := ingest.Ingest( - req.Pack, - quarantinePackRoot, - service.opts.Algorithm, - ingest.Options{ - FixThin: true, - WriteRev: true, - Base: service.opts.ExistingObjects, - Progress: service.opts.Progress, - }, - ) + ingested, err := pending.Continue(quarantinePackRoot) _ = quarantinePackRoot.Close() |
