aboutsummaryrefslogtreecommitdiff
path: root/format/pack
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-03-08 12:03:26 +0800
committerGravatar Runxi Yu2026-03-08 12:03:26 +0800
commitae5c818674e2c9ca950ca7a9bf93f1283e7411b7 (patch)
tree25d1702260993a8066690c93b3da81adea6d4258 /format/pack
parentreceivepack: Trivial caps (diff)
signatureNo signature
receivepack, format/pack/ingest: Two-stage ingestion
Diffstat (limited to 'format/pack')
-rw-r--r--format/pack/ingest/api.go138
-rw-r--r--format/pack/ingest/errors.go9
-rw-r--r--format/pack/ingest/header.go41
-rw-r--r--format/pack/ingest/ingest_test.go96
-rw-r--r--format/pack/ingest/scan.go29
-rw-r--r--format/pack/ingest/state.go20
6 files changed, 296 insertions, 37 deletions
diff --git a/format/pack/ingest/api.go b/format/pack/ingest/api.go
index eb00ded3..227f6a23 100644
--- a/format/pack/ingest/api.go
+++ b/format/pack/ingest/api.go
@@ -1,6 +1,9 @@
package ingest
import (
+ "bufio"
+ "bytes"
+ "errors"
"io"
"os"
@@ -48,23 +51,138 @@ type Result struct {
ThinFixed bool
}
-// Ingest ingests one pack stream from src into destination.
-//
-// Ingest performs streaming pack read/write/verification, delta resolution,
-// optional thin fixup, then writes .idx and optionally .rev.
-//
-// destination ownership and lifecycle are managed by the caller.
-// Ingest does not perform quarantine promotion/migration.
+// HeaderInfo describes the parsed PACK header.
+type HeaderInfo struct {
+ Version uint32
+ ObjectCount uint32
+}
+
+// DiscardResult describes one successful Discard call.
+type DiscardResult struct {
+ PackHash objectid.ObjectID
+ ObjectCount uint32
+}
+
+// Pending is one started ingest operation awaiting Continue or Discard.
+type Pending struct {
+ reader *bufio.Reader
+ algo objectid.Algorithm
+ opts Options
+ header HeaderInfo
+ headerRaw [packHeaderSize]byte
+
+ finalized bool
+}
+
+// Ingest reads and validates one PACK header, returning one pending operation.
func Ingest(
src io.Reader,
- destination *os.Root,
algo objectid.Algorithm,
opts Options,
-) (Result, error) {
- state, err := newIngestState(src, destination, algo, opts)
+) (*Pending, error) {
+ if algo.Size() == 0 {
+ return nil, objectid.ErrInvalidAlgorithm
+ }
+
+ reader := bufio.NewReader(src)
+
+ header, headerRaw, err := readAndValidatePackHeader(reader)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Pending{
+ reader: reader,
+ algo: algo,
+ opts: opts,
+ header: header,
+ headerRaw: headerRaw,
+ }, nil
+}
+
+// Header returns parsed PACK header info.
+func (pending *Pending) Header() HeaderInfo {
+ return pending.header
+}
+
+// Continue ingests the pack stream into destination and writes pack artifacts.
+func (pending *Pending) Continue(destination *os.Root) (Result, error) {
+ if pending.finalized {
+ return Result{}, ErrAlreadyFinalized
+ }
+
+ pending.finalized = true
+
+ if pending.header.ObjectCount == 0 {
+ return Result{}, ErrZeroObjectContinue
+ }
+
+ state, err := newIngestState(
+ pending.reader,
+ destination,
+ pending.algo,
+ pending.opts,
+ pending.header,
+ pending.headerRaw,
+ )
if err != nil {
return Result{}, err
}
return ingest(state)
}
+
+// Discard consumes and verifies one zero-object pack stream without writing files.
+func (pending *Pending) Discard() (DiscardResult, error) {
+ if pending.finalized {
+ return DiscardResult{}, ErrAlreadyFinalized
+ }
+
+ pending.finalized = true
+
+ if pending.header.ObjectCount != 0 {
+ return DiscardResult{}, ErrNonZeroDiscard
+ }
+
+ hashImpl, err := pending.algo.New()
+ if err != nil {
+ return DiscardResult{}, err
+ }
+
+ _, _ = hashImpl.Write(pending.headerRaw[:])
+
+ trailer := make([]byte, pending.algo.Size())
+
+ _, err = io.ReadFull(pending.reader, trailer)
+ if err != nil {
+ return DiscardResult{}, &PackTrailerMismatchError{}
+ }
+
+ computed := hashImpl.Sum(nil)
+ if !bytes.Equal(computed, trailer) {
+ return DiscardResult{}, &PackTrailerMismatchError{}
+ }
+
+ if pending.opts.RequireTrailingEOF {
+ var probe [1]byte
+
+ n, err := pending.reader.Read(probe[:])
+ if n > 0 || err == nil {
+ return DiscardResult{}, errors.New("format/pack/ingest: pack has trailing garbage")
+ }
+
+ if err != io.EOF {
+ return DiscardResult{}, err
+ }
+ }
+
+ packHash, err := objectid.FromBytes(pending.algo, trailer)
+ if err != nil {
+ return DiscardResult{}, err
+ }
+
+ return DiscardResult{
+ PackHash: packHash,
+ ObjectCount: 0,
+ }, nil
+}
diff --git a/format/pack/ingest/errors.go b/format/pack/ingest/errors.go
index 82b662b5..d5e6d703 100644
--- a/format/pack/ingest/errors.go
+++ b/format/pack/ingest/errors.go
@@ -66,3 +66,12 @@ func (err *DestinationWriteError) Error() string {
}
var errExternalThinBase = errors.New("format/pack/ingest: external thin base required")
+
+var (
+ // ErrAlreadyFinalized indicates Continue/Discard already called.
+ ErrAlreadyFinalized = errors.New("format/pack/ingest: operation already finalized")
+ // ErrZeroObjectContinue indicates Continue was called for a zero-object pack.
+ ErrZeroObjectContinue = errors.New("format/pack/ingest: cannot continue zero-object pack")
+ // ErrNonZeroDiscard indicates Discard was called for a non-zero-object pack.
+ ErrNonZeroDiscard = errors.New("format/pack/ingest: cannot discard non-zero pack")
+)
diff --git a/format/pack/ingest/header.go b/format/pack/ingest/header.go
index fba2b175..76d43bef 100644
--- a/format/pack/ingest/header.go
+++ b/format/pack/ingest/header.go
@@ -3,32 +3,47 @@ package ingest
import (
"encoding/binary"
"fmt"
+ "io"
"codeberg.org/lindenii/furgit/format/pack"
)
-// readAndValidatePackHeader reads and validates PACK header from the stream.
-func readAndValidatePackHeader(state *ingestState) error {
- var hdr [12]byte
+const packHeaderSize = 12
- err := state.stream.readFull(hdr[:])
+// readAndValidatePackHeader reads one PACK header from src and validates it.
+func readAndValidatePackHeader(src io.Reader) (HeaderInfo, [packHeaderSize]byte, error) {
+ var hdr [packHeaderSize]byte
+
+ _, err := io.ReadFull(src, hdr[:])
+ if err != nil {
+ return HeaderInfo{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{
+ Reason: fmt.Sprintf("read header: %v", err),
+ }
+ }
+
+ header, err := parseAndValidatePackHeader(hdr)
if err != nil {
- return &InvalidPackHeaderError{Reason: fmt.Sprintf("read header: %v", err)}
+ return HeaderInfo{}, [packHeaderSize]byte{}, err
}
+ return header, hdr, nil
+}
+
+// parseAndValidatePackHeader validates one already-read PACK header.
+func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (HeaderInfo, error) {
if binary.BigEndian.Uint32(hdr[:4]) != pack.Signature {
- return &InvalidPackHeaderError{Reason: "signature mismatch"}
+ return HeaderInfo{}, &InvalidPackHeaderError{Reason: "signature mismatch"}
}
version := binary.BigEndian.Uint32(hdr[4:8])
if !pack.VersionSupported(version) {
- return &InvalidPackHeaderError{Reason: fmt.Sprintf("unsupported version %d", version)}
- }
-
- state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12])
- if state.objectCountHeader == 0 {
- return &InvalidPackHeaderError{Reason: "zero objects"}
+ return HeaderInfo{}, &InvalidPackHeaderError{
+ Reason: fmt.Sprintf("unsupported version %d", version),
+ }
}
- return nil
+ return HeaderInfo{
+ Version: version,
+ ObjectCount: binary.BigEndian.Uint32(hdr[8:12]),
+ }, nil
}
diff --git a/format/pack/ingest/ingest_test.go b/format/pack/ingest/ingest_test.go
index 8a88eb7f..95b6643c 100644
--- a/format/pack/ingest/ingest_test.go
+++ b/format/pack/ingest/ingest_test.go
@@ -2,7 +2,9 @@ package ingest_test
import (
"bytes"
+ "encoding/binary"
"errors"
+ "io"
"io/fs"
"os"
"path/filepath"
@@ -26,6 +28,20 @@ func (r *noExtraReadReader) Read(p []byte) (int, error) {
return r.reader.Read(p)
}
+func beginAndContinue(
+ src io.Reader,
+ packRoot *os.Root,
+ algo objectid.Algorithm,
+ opts ingest.Options,
+) (ingest.Result, error) {
+ pending, err := ingest.Ingest(src, algo, opts)
+ if err != nil {
+ return ingest.Result{}, err
+ }
+
+ return pending.Continue(packRoot)
+}
+
// fixturePath returns one fixture file path for the selected algorithm.
func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string {
t.Helper()
@@ -173,7 +189,7 @@ func TestIngestNonThinPackWritesPackIdxRev(t *testing.T) {
packRoot := receiver.OpenPackRoot(t)
- result, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
+ result, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
WriteRev: true,
RequireTrailingEOF: true,
})
@@ -221,7 +237,7 @@ func TestIngestThinPackWithoutFixReturnsUnresolved(t *testing.T) {
receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
packRoot := receiver.OpenPackRoot(t)
- _, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
+ _, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
WriteRev: true,
RequireTrailingEOF: true,
})
@@ -257,7 +273,7 @@ func TestIngestThinPackWithFixThin(t *testing.T) {
packRoot := receiver.OpenPackRoot(t)
- _, err := ingest.Ingest(bytes.NewReader(basePack), packRoot, algo, ingest.Options{
+ _, err := beginAndContinue(bytes.NewReader(basePack), packRoot, algo, ingest.Options{
RequireTrailingEOF: true,
})
if err != nil {
@@ -266,7 +282,7 @@ func TestIngestThinPackWithFixThin(t *testing.T) {
receiverRepo := receiver.OpenRepository(t)
- result, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
+ result, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
FixThin: true,
WriteRev: true,
Base: receiverRepo.Objects(),
@@ -301,7 +317,7 @@ func TestIngestPackTrailerMismatch(t *testing.T) {
receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
packRoot := receiver.OpenPackRoot(t)
- _, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
+ _, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
WriteRev: true,
RequireTrailingEOF: true,
})
@@ -326,6 +342,74 @@ func TestIngestPackTrailerMismatch(t *testing.T) {
})
}
+func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte {
+ t.Helper()
+
+ hashImpl, err := algo.New()
+ if err != nil {
+ t.Fatalf("algo.New: %v", err)
+ }
+
+ var header [12]byte
+ copy(header[:4], []byte{'P', 'A', 'C', 'K'})
+ binary.BigEndian.PutUint32(header[4:8], 2)
+ binary.BigEndian.PutUint32(header[8:12], 0)
+
+ _, _ = hashImpl.Write(header[:])
+
+ return append(header[:], hashImpl.Sum(nil)...)
+}
+
+func TestIngestDiscardZeroObjectPack(t *testing.T) {
+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper
+ packBytes := zeroObjectPackBytes(t, algo)
+
+ pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{
+ RequireTrailingEOF: true,
+ })
+ if err != nil {
+ t.Fatalf("Ingest: %v", err)
+ }
+
+ if pending.Header().ObjectCount != 0 {
+ t.Fatalf("ObjectCount = %d, want 0", pending.Header().ObjectCount)
+ }
+
+ discarded, err := pending.Discard()
+ if err != nil {
+ t.Fatalf("Discard: %v", err)
+ }
+
+ if discarded.ObjectCount != 0 {
+ t.Fatalf("Discard.ObjectCount = %d, want 0", discarded.ObjectCount)
+ }
+ })
+}
+
+func TestIngestContinueRejectsZeroObjectPack(t *testing.T) {
+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper
+ packBytes := zeroObjectPackBytes(t, algo)
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
+ packRoot := receiver.OpenPackRoot(t)
+
+ pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{
+ RequireTrailingEOF: true,
+ })
+ if err != nil {
+ t.Fatalf("Ingest: %v", err)
+ }
+
+ _, err = pending.Continue(packRoot)
+ if !errors.Is(err, ingest.ErrZeroObjectContinue) {
+ t.Fatalf("Continue error = %v, want ErrZeroObjectContinue", err)
+ }
+ })
+}
+
func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) {
t.Parallel()
@@ -336,7 +420,7 @@ func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) {
receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
packRoot := receiver.OpenPackRoot(t)
- result, err := ingest.Ingest(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{
+ result, err := beginAndContinue(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{
WriteRev: true,
})
if err != nil {
diff --git a/format/pack/ingest/scan.go b/format/pack/ingest/scan.go
index 2fa88b51..b5d683a8 100644
--- a/format/pack/ingest/scan.go
+++ b/format/pack/ingest/scan.go
@@ -23,7 +23,7 @@ func streamPackAndScan(state *ingestState) error {
utils.WriteProgressf(state.opts.Progress, "validating pack header...\r")
- err = readAndValidatePackHeader(state)
+ err = seedStreamWithPackHeader(state)
if err != nil {
return err
}
@@ -75,3 +75,30 @@ func streamPackAndScan(state *ingestState) error {
return state.stream.flush()
}
+
+// seedStreamWithPackHeader writes the already-validated PACK header to output,
+// seeds the running pack hash, and advances stream offset accounting.
+func seedStreamWithPackHeader(state *ingestState) error {
+ written := 0
+ for written < len(state.packHeaderRaw) {
+ n, err := state.packFile.Write(state.packHeaderRaw[written:])
+ if err != nil {
+ return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)}
+ }
+
+ if n == 0 {
+ return &DestinationWriteError{Op: "write pack header: short write"}
+ }
+
+ written += n
+ }
+
+ _, err := state.stream.hash.Write(state.packHeaderRaw[:])
+ if err != nil {
+ return err
+ }
+
+ state.stream.consumed = packHeaderSize
+
+ return nil
+}
diff --git a/format/pack/ingest/state.go b/format/pack/ingest/state.go
index cbc412e3..d44b6e09 100644
--- a/format/pack/ingest/state.go
+++ b/format/pack/ingest/state.go
@@ -18,6 +18,8 @@ type ingestState struct {
algo objectid.Algorithm
opts Options
+ packHeaderRaw [packHeaderSize]byte
+
packFile *os.File
packTmpName string
idxFile *os.File
@@ -47,18 +49,22 @@ func newIngestState(
destination *os.Root,
algo objectid.Algorithm,
opts Options,
+ header HeaderInfo,
+ headerRaw [packHeaderSize]byte,
) (*ingestState, error) {
if algo.Size() == 0 {
return nil, objectid.ErrInvalidAlgorithm
}
return &ingestState{
- src: src,
- destination: destination,
- algo: algo,
- opts: opts,
- offsetToRecord: make(map[uint64]int),
- objectToRecord: make(map[objectid.ObjectID]int),
- baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
+ src: src,
+ destination: destination,
+ algo: algo,
+ opts: opts,
+ packHeaderRaw: headerRaw,
+ objectCountHeader: header.ObjectCount,
+ offsetToRecord: make(map[uint64]int),
+ objectToRecord: make(map[objectid.ObjectID]int),
+ baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
}, nil
}