aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-03-05 20:35:02 +0800
committerGravatar Runxi Yu2026-03-05 20:35:02 +0800
commit197fc54d0fe9e89345992b1efbfbfaf3185e3272 (patch)
tree45b41cdb16f405fa34268dab5347bbd0188dafe0
parentformat/pack/ingest: Temporary file purging (diff)
signatureNo signature
format/pack/ingest: Improve trailer stuff
-rw-r--r--format/pack/ingest/state.go2
-rw-r--r--format/pack/ingest/stream.go250
-rw-r--r--format/pack/ingest/stream_scan.go53
-rw-r--r--format/pack/ingest/thin_fix.go8
-rw-r--r--format/pack/ingest/trailer.go65
5 files changed, 215 insertions, 163 deletions
diff --git a/format/pack/ingest/state.go b/format/pack/ingest/state.go
index 3fabe639..2263e8a1 100644
--- a/format/pack/ingest/state.go
+++ b/format/pack/ingest/state.go
@@ -28,7 +28,7 @@ type ingestState struct {
revFile *os.File
revTmpName string
- stream *streamCopier
+ stream *streamScanner
records []objectRecord
ofsDeltas []ofsDeltaRef
diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go
index 303e91b2..61f6079b 100644
--- a/format/pack/ingest/stream.go
+++ b/format/pack/ingest/stream.go
@@ -1,7 +1,7 @@
package ingest
import (
- "bufio"
+ "bytes"
"fmt"
"hash"
"hash/crc32"
@@ -9,54 +9,156 @@ import (
"os"
)
-// streamCopier reads bytes from src, writes them to packFile, and updates
-// trailer verification state.
-type streamCopier struct {
- reader *bufio.Reader
- writer *bufio.Writer
- verifier *trailerVerifier
- offset uint64
- entryCRC hash.Hash32
+const streamScannerBufferSize = 64 << 10
+
+// streamScanner incrementally reads/consumes one pack stream while mirroring
+// consumed bytes into one destination pack file.
+type streamScanner struct {
+ src io.Reader
+ dstFile *os.File
+
+ // Input buffer window: buf[off:n] is unread.
+ buf []byte
+ off int
+ n int
+
+ // Absolute consumed stream bytes.
+ consumed uint64
+
+ // Running pack hash over consumed bytes while hashEnabled is true.
+ hash hash.Hash
+ hashSize int
+ hashEnabled bool
+
+ // Entry CRC state while one entry is being consumed.
+ entryCRC uint32
+ inEntryCRC bool
+
+ packTrailer []byte
}
-// newStreamCopier constructs one stream copier.
-func newStreamCopier(src io.Reader, packFile *os.File, verifier *trailerVerifier) *streamCopier {
- return &streamCopier{
- reader: bufio.NewReaderSize(src, 64<<10),
- writer: bufio.NewWriterSize(packFile, 256<<10),
- verifier: verifier,
+// newStreamScanner constructs one scanner with fixed input buffering.
+func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner {
+ return &streamScanner{
+ src: src,
+ dstFile: dstFile,
+ buf: make([]byte, streamScannerBufferSize),
+ hash: hash,
+ hashSize: hashSize,
+ hashEnabled: true,
}
}
-// Read implements io.Reader.
-func (stream *streamCopier) Read(dst []byte) (int, error) {
- n, err := stream.reader.Read(dst)
- if n > 0 {
- if writeErr := stream.writeChunk(dst[:n]); writeErr != nil {
- return 0, writeErr
+// fill ensures at least min unread bytes are available in receiver's buffer.
+func (scanner *streamScanner) fill(min int) error {
+ if min <= 0 {
+ return nil
+ }
+ if min > len(scanner.buf) {
+ return fmt.Errorf("format/pack/ingest: fill(%d) exceeds scanner buffer", min)
+ }
+
+ for scanner.n-scanner.off < min {
+ if err := scanner.flushConsumedPrefix(); err != nil {
+ return err
+ }
+
+ readN, err := scanner.src.Read(scanner.buf[scanner.n:])
+ if readN > 0 {
+ scanner.n += readN
+ }
+ if err != nil {
+ if err == io.EOF && scanner.n-scanner.off >= min {
+ return nil
+ }
+
+ return err
+ }
+ if readN == 0 {
+ return io.ErrNoProgress
+ }
+ }
+
+ return nil
+}
+
+// use consumes n unread bytes and updates accounting/checksum state.
+func (scanner *streamScanner) use(n int) error {
+ if n < 0 || n > scanner.n-scanner.off {
+ return fmt.Errorf("format/pack/ingest: invalid consume length %d", n)
+ }
+ if n == 0 {
+ return nil
+ }
+
+ chunk := scanner.buf[scanner.off : scanner.off+n]
+ if scanner.hashEnabled {
+ if _, err := scanner.hash.Write(chunk); err != nil {
+ return err
}
}
+ if scanner.inEntryCRC {
+ scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk)
+ }
+
+ scanner.off += n
+ scanner.consumed += uint64(n)
- return n, err
+ return nil
}
-// ReadByte implements io.ByteReader.
-func (stream *streamCopier) ReadByte() (byte, error) {
- b, err := stream.reader.ReadByte()
- if err != nil {
+// Read implements io.Reader.
+func (scanner *streamScanner) Read(dst []byte) (int, error) {
+ if len(dst) == 0 {
+ return 0, nil
+ }
+
+ if scanner.n-scanner.off == 0 {
+ if err := scanner.fill(1); err != nil {
+ if err == io.EOF {
+ return 0, io.EOF
+ }
+
+ return 0, err
+ }
+ }
+
+ unread := scanner.n - scanner.off
+ if unread == 0 {
+ return 0, io.EOF
+ }
+
+ n := len(dst)
+ if n > unread {
+ n = unread
+ }
+ copy(dst, scanner.buf[scanner.off:scanner.off+n])
+ if err := scanner.use(n); err != nil {
return 0, err
}
- if writeErr := stream.writeChunk([]byte{b}); writeErr != nil {
- return 0, writeErr
+ return n, nil
+}
+
+// ReadByte implements io.ByteReader without allocation.
+func (scanner *streamScanner) ReadByte() (byte, error) {
+ if scanner.n-scanner.off == 0 {
+ if err := scanner.fill(1); err != nil {
+ return 0, err
+ }
+ }
+
+ b := scanner.buf[scanner.off]
+ if err := scanner.use(1); err != nil {
+ return 0, err
}
return b, nil
}
-// readFull reads exactly len(dst) bytes through stream.
-func (stream *streamCopier) readFull(dst []byte) error {
- _, err := io.ReadFull(stream, dst)
+// readFull reads exactly len(dst) bytes through receiver.
+func (scanner *streamScanner) readFull(dst []byte) error {
+ _, err := io.ReadFull(scanner, dst)
if err != nil {
return err
}
@@ -64,47 +166,83 @@ func (stream *streamCopier) readFull(dst []byte) error {
return nil
}
-// writeChunk mirrors src bytes to destination artifacts and accounting.
-func (stream *streamCopier) writeChunk(src []byte) error {
- n, err := stream.writer.Write(src)
- if err != nil {
- return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)}
- }
- if n != len(src) {
- return &ErrDestinationWrite{Op: "write pack: short write"}
+// flush writes all consumed-but-unflushed bytes to destination pack file.
+func (scanner *streamScanner) flush() error {
+ return scanner.flushConsumedPrefix()
+}
+
+// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum,
+// and ensures no trailing garbage remains in stream.
+func (scanner *streamScanner) finishAndFlushTrailer() error {
+ if scanner.hashSize <= 0 {
+ return fmt.Errorf("format/pack/ingest: invalid hash size")
}
- if stream.entryCRC != nil {
- _, _ = stream.entryCRC.Write(src)
+ trailer := make([]byte, scanner.hashSize)
+ scanner.hashEnabled = false
+ if err := scanner.readFull(trailer); err != nil {
+ return &ErrPackTrailerMismatch{}
}
- stream.verifier.write(src)
- stream.offset += uint64(len(src))
+ scanner.packTrailer = append(scanner.packTrailer[:0], trailer...)
- return nil
-}
+ var probe [1]byte
+ n, err := scanner.Read(probe[:])
+ if n > 0 || err == nil {
+ return fmt.Errorf("format/pack/ingest: pack has trailing garbage")
+ }
+ if err != io.EOF {
+ return err
+ }
-// flush flushes buffered pack output bytes to the destination file.
-func (stream *streamCopier) flush() error {
- err := stream.writer.Flush()
- if err != nil {
- return &ErrDestinationWrite{Op: fmt.Sprintf("flush pack: %v", err)}
+ computed := scanner.hash.Sum(nil)
+ if !bytes.Equal(computed, trailer) {
+ return &ErrPackTrailerMismatch{}
}
return nil
}
// beginEntryCRC starts inline CRC accumulation for one packed entry.
-func (stream *streamCopier) beginEntryCRC() {
- stream.entryCRC = crc32.NewIEEE()
+func (scanner *streamScanner) beginEntryCRC() {
+ scanner.entryCRC = 0
+ scanner.inEntryCRC = true
}
// endEntryCRC finishes inline CRC accumulation for one packed entry.
-func (stream *streamCopier) endEntryCRC() (uint32, error) {
- if stream.entryCRC == nil {
+func (scanner *streamScanner) endEntryCRC() (uint32, error) {
+ if !scanner.inEntryCRC {
return 0, fmt.Errorf("format/pack/ingest: entry CRC not started")
}
- crc := stream.entryCRC.Sum32()
- stream.entryCRC = nil
+ crc := scanner.entryCRC
+ scanner.entryCRC = 0
+ scanner.inEntryCRC = false
return crc, nil
}
+
+// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread
+// bytes to the start of buffer.
+func (scanner *streamScanner) flushConsumedPrefix() error {
+ if scanner.off == 0 {
+ return nil
+ }
+
+ written := 0
+ for written < scanner.off {
+ n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off])
+ if err != nil {
+ return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)}
+ }
+ if n == 0 {
+ return &ErrDestinationWrite{Op: "write pack: short write"}
+ }
+ written += n
+ }
+
+ unread := scanner.n - scanner.off
+ copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n])
+ scanner.off = 0
+ scanner.n = unread
+
+ return nil
+}
diff --git a/format/pack/ingest/stream_scan.go b/format/pack/ingest/stream_scan.go
index 524467be..e80e9a3e 100644
--- a/format/pack/ingest/stream_scan.go
+++ b/format/pack/ingest/stream_scan.go
@@ -20,10 +20,11 @@ func streamPackAndScan(state *ingestState) error {
return err
}
- state.stream = newStreamCopier(
+ state.stream = newStreamScanner(
state.src,
state.packFile,
- newTrailerVerifier(hashImpl, state.algo.Size()),
+ hashImpl,
+ state.algo.Size(),
)
if err := readAndValidatePackHeader(state); err != nil {
@@ -35,19 +36,27 @@ func streamPackAndScan(state *ingestState) error {
state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader)
for range state.objectCountHeader {
- nextOffset, err := scanOneEntry(state, state.stream.offset)
+ nextOffset, err := scanOneEntry(state, state.stream.consumed)
if err != nil {
return err
}
- if nextOffset != state.stream.offset {
+ if nextOffset != state.stream.consumed {
return fmt.Errorf("format/pack/ingest: internal stream offset mismatch")
}
}
- if err := finalizeStreamPackHash(state); err != nil {
+ if err := state.stream.finishAndFlushTrailer(); err != nil {
return err
}
+ if len(state.stream.packTrailer) != state.algo.Size() {
+ return fmt.Errorf("format/pack/ingest: invalid trailer size")
+ }
+ packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer)
+ if err != nil {
+ return err
+ }
+ state.packHash = packHash
return state.stream.flush()
}
@@ -98,10 +107,10 @@ func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) {
}
endOffset := startOffset + uint64(record.headerLen) + consumedInput
- if endOffset > state.stream.offset {
+ if endOffset > state.stream.consumed {
return 0, &ErrMalformedPackEntry{
Offset: startOffset,
- Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.offset),
+ Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed),
}
}
@@ -283,36 +292,6 @@ func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) {
}
// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity.
-func finalizeStreamPackHash(state *ingestState) error {
- // We have already consumed object entries. Drain exactly the hash trailer.
- trailer := make([]byte, state.algo.Size())
- if err := state.stream.readFull(trailer); err != nil {
- return &ErrPackTrailerMismatch{}
- }
-
- // Ensure no trailing garbage.
- var probe [1]byte
- n, err := state.stream.Read(probe[:])
- if n > 0 || err == nil {
- return fmt.Errorf("format/pack/ingest: pack has trailing garbage")
- }
- if err != io.EOF {
- return err
- }
-
- if err := state.stream.verifier.verify(); err != nil {
- return err
- }
-
- packHash, err := objectid.FromBytes(state.algo, trailer)
- if err != nil {
- return err
- }
- state.packHash = packHash
-
- return nil
-}
-
// readDeltaHeaderSizes reads source and destination sizes from one delta payload.
func readDeltaHeaderSizes(payload []byte) (int, int, error) {
reader := &byteSliceReader{data: payload}
diff --git a/format/pack/ingest/thin_fix.go b/format/pack/ingest/thin_fix.go
index 436d5c88..249fe136 100644
--- a/format/pack/ingest/thin_fix.go
+++ b/format/pack/ingest/thin_fix.go
@@ -37,7 +37,7 @@ func maybeFixThin(state *ingestState) error {
if err := state.packFile.Truncate(newEnd); err != nil {
return err
}
- state.stream.offset = uint64(newEnd)
+ state.stream.consumed = uint64(newEnd)
baseIDs := unresolvedThinBaseIDs(state)
for _, id := range baseIDs {
@@ -60,7 +60,7 @@ func maybeFixThin(state *ingestState) error {
// appendBaseObject appends one base object as a new packed non-delta entry.
func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) {
- start := state.stream.offset
+ start := state.stream.consumed
header := encodePackEntryHeader(realType, int64(len(content)))
if _, err := state.packFile.WriteAt(header, int64(start)); err != nil {
return 0, err
@@ -80,7 +80,7 @@ func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objectt
packedLen := uint64(len(header)) + uint64(counting.n)
end := start + packedLen
- state.stream.offset = end
+ state.stream.consumed = end
record := objectRecord{
offset: start,
@@ -186,7 +186,7 @@ func rewritePackHeaderAndTrailer(state *ingestState) error {
}
state.packHash = packHash
state.objectCountHeader = uint32(len(state.records))
- state.stream.offset = uint64(endWithoutTrailer + int64(len(sum)))
+ state.stream.consumed = uint64(endWithoutTrailer + int64(len(sum)))
return nil
}
diff --git a/format/pack/ingest/trailer.go b/format/pack/ingest/trailer.go
deleted file mode 100644
index be8156d3..00000000
--- a/format/pack/ingest/trailer.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package ingest
-
-import (
- "bytes"
- "fmt"
- "hash"
-)
-
-// trailerVerifier incrementally verifies trailing hash bytes in a stream.
-type trailerVerifier struct {
- hash hash.Hash
- hashSize int
- tail []byte
- seen int64
-}
-
-// newTrailerVerifier creates a trailing hash verifier.
-func newTrailerVerifier(hash hash.Hash, hashSize int) *trailerVerifier {
- return &trailerVerifier{
- hash: hash,
- hashSize: hashSize,
- tail: make([]byte, 0, hashSize),
- }
-}
-
-// write feeds one chunk of stream bytes into the verifier.
-func (verifier *trailerVerifier) write(src []byte) {
- if len(src) == 0 {
- return
- }
-
- verifier.seen += int64(len(src))
- if len(verifier.tail) == 0 && len(src) <= verifier.hashSize {
- verifier.tail = append(verifier.tail, src...)
-
- return
- }
-
- tmp := make([]byte, 0, len(verifier.tail)+len(src))
- tmp = append(tmp, verifier.tail...)
- tmp = append(tmp, src...)
- if len(tmp) <= verifier.hashSize {
- verifier.tail = tmp
-
- return
- }
-
- flushN := len(tmp) - verifier.hashSize
- _, _ = verifier.hash.Write(tmp[:flushN])
- verifier.tail = append(verifier.tail[:0], tmp[flushN:]...)
-}
-
-// verify finalizes verification against the stream trailer.
-func (verifier *trailerVerifier) verify() error {
- if len(verifier.tail) != verifier.hashSize {
- return fmt.Errorf("format/pack/ingest: stream too short for trailer hash")
- }
-
- computed := verifier.hash.Sum(nil)
- if !bytes.Equal(computed, verifier.tail) {
- return &ErrPackTrailerMismatch{}
- }
-
- return nil
-}