From 197fc54d0fe9e89345992b1efbfbfaf3185e3272 Mon Sep 17 00:00:00 2001 From: Runxi Yu Date: Thu, 5 Mar 2026 20:35:02 +0800 Subject: format/pack/ingest: Improve trailer stuff --- format/pack/ingest/state.go | 2 +- format/pack/ingest/stream.go | 250 +++++++++++++++++++++++++++++--------- format/pack/ingest/stream_scan.go | 53 +++----- format/pack/ingest/thin_fix.go | 8 +- format/pack/ingest/trailer.go | 65 ---------- 5 files changed, 215 insertions(+), 163 deletions(-) delete mode 100644 format/pack/ingest/trailer.go diff --git a/format/pack/ingest/state.go b/format/pack/ingest/state.go index 3fabe639..2263e8a1 100644 --- a/format/pack/ingest/state.go +++ b/format/pack/ingest/state.go @@ -28,7 +28,7 @@ type ingestState struct { revFile *os.File revTmpName string - stream *streamCopier + stream *streamScanner records []objectRecord ofsDeltas []ofsDeltaRef diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go index 303e91b2..61f6079b 100644 --- a/format/pack/ingest/stream.go +++ b/format/pack/ingest/stream.go @@ -1,7 +1,7 @@ package ingest import ( - "bufio" + "bytes" "fmt" "hash" "hash/crc32" @@ -9,54 +9,156 @@ import ( "os" ) -// streamCopier reads bytes from src, writes them to packFile, and updates -// trailer verification state. -type streamCopier struct { - reader *bufio.Reader - writer *bufio.Writer - verifier *trailerVerifier - offset uint64 - entryCRC hash.Hash32 +const streamScannerBufferSize = 64 << 10 + +// streamScanner incrementally reads/consumes one pack stream while mirroring +// consumed bytes into one destination pack file. +type streamScanner struct { + src io.Reader + dstFile *os.File + + // Input buffer window: buf[off:n] is unread. + buf []byte + off int + n int + + // Absolute consumed stream bytes. + consumed uint64 + + // Running pack hash over consumed bytes while hashEnabled is true. + hash hash.Hash + hashSize int + hashEnabled bool + + // Entry CRC state while one entry is being consumed. + entryCRC uint32 + inEntryCRC bool + + packTrailer []byte } -// newStreamCopier constructs one stream copier. -func newStreamCopier(src io.Reader, packFile *os.File, verifier *trailerVerifier) *streamCopier { - return &streamCopier{ - reader: bufio.NewReaderSize(src, 64<<10), - writer: bufio.NewWriterSize(packFile, 256<<10), - verifier: verifier, +// newStreamScanner constructs one scanner with fixed input buffering. +func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner { + return &streamScanner{ + src: src, + dstFile: dstFile, + buf: make([]byte, streamScannerBufferSize), + hash: hash, + hashSize: hashSize, + hashEnabled: true, } } -// Read implements io.Reader. -func (stream *streamCopier) Read(dst []byte) (int, error) { - n, err := stream.reader.Read(dst) - if n > 0 { - if writeErr := stream.writeChunk(dst[:n]); writeErr != nil { - return 0, writeErr +// fill ensures at least min unread bytes are available in receiver's buffer. +func (scanner *streamScanner) fill(min int) error { + if min <= 0 { + return nil + } + if min > len(scanner.buf) { + return fmt.Errorf("format/pack/ingest: fill(%d) exceeds scanner buffer", min) + } + + for scanner.n-scanner.off < min { + if err := scanner.flushConsumedPrefix(); err != nil { + return err + } + + readN, err := scanner.src.Read(scanner.buf[scanner.n:]) + if readN > 0 { + scanner.n += readN + } + if err != nil { + if err == io.EOF && scanner.n-scanner.off >= min { + return nil + } + + return err + } + if readN == 0 { + return io.ErrNoProgress + } + } + + return nil +} + +// use consumes n unread bytes and updates accounting/checksum state. +func (scanner *streamScanner) use(n int) error { + if n < 0 || n > scanner.n-scanner.off { + return fmt.Errorf("format/pack/ingest: invalid consume length %d", n) + } + if n == 0 { + return nil + } + + chunk := scanner.buf[scanner.off : scanner.off+n] + if scanner.hashEnabled { + if _, err := scanner.hash.Write(chunk); err != nil { + return err } } + if scanner.inEntryCRC { + scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += uint64(n) - return n, err + return nil } -// ReadByte implements io.ByteReader. -func (stream *streamCopier) ReadByte() (byte, error) { - b, err := stream.reader.ReadByte() - if err != nil { +// Read implements io.Reader. +func (scanner *streamScanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + if scanner.n-scanner.off == 0 { + if err := scanner.fill(1); err != nil { + if err == io.EOF { + return 0, io.EOF + } + + return 0, err + } + } + + unread := scanner.n - scanner.off + if unread == 0 { + return 0, io.EOF + } + + n := len(dst) + if n > unread { + n = unread + } + copy(dst, scanner.buf[scanner.off:scanner.off+n]) + if err := scanner.use(n); err != nil { return 0, err } - if writeErr := stream.writeChunk([]byte{b}); writeErr != nil { - return 0, writeErr + return n, nil +} + +// ReadByte implements io.ByteReader without allocation. +func (scanner *streamScanner) ReadByte() (byte, error) { + if scanner.n-scanner.off == 0 { + if err := scanner.fill(1); err != nil { + return 0, err + } + } + + b := scanner.buf[scanner.off] + if err := scanner.use(1); err != nil { + return 0, err } return b, nil } -// readFull reads exactly len(dst) bytes through stream. -func (stream *streamCopier) readFull(dst []byte) error { - _, err := io.ReadFull(stream, dst) +// readFull reads exactly len(dst) bytes through receiver. +func (scanner *streamScanner) readFull(dst []byte) error { + _, err := io.ReadFull(scanner, dst) if err != nil { return err } @@ -64,47 +166,83 @@ func (stream *streamCopier) readFull(dst []byte) error { return nil } -// writeChunk mirrors src bytes to destination artifacts and accounting. -func (stream *streamCopier) writeChunk(src []byte) error { - n, err := stream.writer.Write(src) - if err != nil { - return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} - } - if n != len(src) { - return &ErrDestinationWrite{Op: "write pack: short write"} +// flush writes all consumed-but-unflushed bytes to destination pack file. +func (scanner *streamScanner) flush() error { + return scanner.flushConsumedPrefix() +} + +// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum, +// and ensures no trailing garbage remains in stream. +func (scanner *streamScanner) finishAndFlushTrailer() error { + if scanner.hashSize <= 0 { + return fmt.Errorf("format/pack/ingest: invalid hash size") } - if stream.entryCRC != nil { - _, _ = stream.entryCRC.Write(src) + trailer := make([]byte, scanner.hashSize) + scanner.hashEnabled = false + if err := scanner.readFull(trailer); err != nil { + return &ErrPackTrailerMismatch{} } - stream.verifier.write(src) - stream.offset += uint64(len(src)) + scanner.packTrailer = append(scanner.packTrailer[:0], trailer...) - return nil -} + var probe [1]byte + n, err := scanner.Read(probe[:]) + if n > 0 || err == nil { + return fmt.Errorf("format/pack/ingest: pack has trailing garbage") + } + if err != io.EOF { + return err + } -// flush flushes buffered pack output bytes to the destination file. -func (stream *streamCopier) flush() error { - err := stream.writer.Flush() - if err != nil { - return &ErrDestinationWrite{Op: fmt.Sprintf("flush pack: %v", err)} + computed := scanner.hash.Sum(nil) + if !bytes.Equal(computed, trailer) { + return &ErrPackTrailerMismatch{} } return nil } // beginEntryCRC starts inline CRC accumulation for one packed entry. -func (stream *streamCopier) beginEntryCRC() { - stream.entryCRC = crc32.NewIEEE() +func (scanner *streamScanner) beginEntryCRC() { + scanner.entryCRC = 0 + scanner.inEntryCRC = true } // endEntryCRC finishes inline CRC accumulation for one packed entry. -func (stream *streamCopier) endEntryCRC() (uint32, error) { - if stream.entryCRC == nil { +func (scanner *streamScanner) endEntryCRC() (uint32, error) { + if !scanner.inEntryCRC { return 0, fmt.Errorf("format/pack/ingest: entry CRC not started") } - crc := stream.entryCRC.Sum32() - stream.entryCRC = nil + crc := scanner.entryCRC + scanner.entryCRC = 0 + scanner.inEntryCRC = false return crc, nil } + +// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread +// bytes to the start of buffer. +func (scanner *streamScanner) flushConsumedPrefix() error { + if scanner.off == 0 { + return nil + } + + written := 0 + for written < scanner.off { + n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off]) + if err != nil { + return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} + } + if n == 0 { + return &ErrDestinationWrite{Op: "write pack: short write"} + } + written += n + } + + unread := scanner.n - scanner.off + copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n]) + scanner.off = 0 + scanner.n = unread + + return nil +} diff --git a/format/pack/ingest/stream_scan.go b/format/pack/ingest/stream_scan.go index 524467be..e80e9a3e 100644 --- a/format/pack/ingest/stream_scan.go +++ b/format/pack/ingest/stream_scan.go @@ -20,10 +20,11 @@ func streamPackAndScan(state *ingestState) error { return err } - state.stream = newStreamCopier( + state.stream = newStreamScanner( state.src, state.packFile, - newTrailerVerifier(hashImpl, state.algo.Size()), + hashImpl, + state.algo.Size(), ) if err := readAndValidatePackHeader(state); err != nil { @@ -35,19 +36,27 @@ func streamPackAndScan(state *ingestState) error { state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader) for range state.objectCountHeader { - nextOffset, err := scanOneEntry(state, state.stream.offset) + nextOffset, err := scanOneEntry(state, state.stream.consumed) if err != nil { return err } - if nextOffset != state.stream.offset { + if nextOffset != state.stream.consumed { return fmt.Errorf("format/pack/ingest: internal stream offset mismatch") } } - if err := finalizeStreamPackHash(state); err != nil { + if err := state.stream.finishAndFlushTrailer(); err != nil { return err } + if len(state.stream.packTrailer) != state.algo.Size() { + return fmt.Errorf("format/pack/ingest: invalid trailer size") + } + packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer) + if err != nil { + return err + } + state.packHash = packHash return state.stream.flush() } @@ -98,10 +107,10 @@ func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) { } endOffset := startOffset + uint64(record.headerLen) + consumedInput - if endOffset > state.stream.offset { + if endOffset > state.stream.consumed { return 0, &ErrMalformedPackEntry{ Offset: startOffset, - Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.offset), + Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed), } } @@ -283,36 +292,6 @@ func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) { } // finalizeStreamPackHash consumes trailer bytes and verifies stream integrity. -func finalizeStreamPackHash(state *ingestState) error { - // We have already consumed object entries. Drain exactly the hash trailer. - trailer := make([]byte, state.algo.Size()) - if err := state.stream.readFull(trailer); err != nil { - return &ErrPackTrailerMismatch{} - } - - // Ensure no trailing garbage. - var probe [1]byte - n, err := state.stream.Read(probe[:]) - if n > 0 || err == nil { - return fmt.Errorf("format/pack/ingest: pack has trailing garbage") - } - if err != io.EOF { - return err - } - - if err := state.stream.verifier.verify(); err != nil { - return err - } - - packHash, err := objectid.FromBytes(state.algo, trailer) - if err != nil { - return err - } - state.packHash = packHash - - return nil -} - // readDeltaHeaderSizes reads source and destination sizes from one delta payload. func readDeltaHeaderSizes(payload []byte) (int, int, error) { reader := &byteSliceReader{data: payload} diff --git a/format/pack/ingest/thin_fix.go b/format/pack/ingest/thin_fix.go index 436d5c88..249fe136 100644 --- a/format/pack/ingest/thin_fix.go +++ b/format/pack/ingest/thin_fix.go @@ -37,7 +37,7 @@ func maybeFixThin(state *ingestState) error { if err := state.packFile.Truncate(newEnd); err != nil { return err } - state.stream.offset = uint64(newEnd) + state.stream.consumed = uint64(newEnd) baseIDs := unresolvedThinBaseIDs(state) for _, id := range baseIDs { @@ -60,7 +60,7 @@ func maybeFixThin(state *ingestState) error { // appendBaseObject appends one base object as a new packed non-delta entry. func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) { - start := state.stream.offset + start := state.stream.consumed header := encodePackEntryHeader(realType, int64(len(content))) if _, err := state.packFile.WriteAt(header, int64(start)); err != nil { return 0, err @@ -80,7 +80,7 @@ func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objectt packedLen := uint64(len(header)) + uint64(counting.n) end := start + packedLen - state.stream.offset = end + state.stream.consumed = end record := objectRecord{ offset: start, @@ -186,7 +186,7 @@ func rewritePackHeaderAndTrailer(state *ingestState) error { } state.packHash = packHash state.objectCountHeader = uint32(len(state.records)) - state.stream.offset = uint64(endWithoutTrailer + int64(len(sum))) + state.stream.consumed = uint64(endWithoutTrailer + int64(len(sum))) return nil } diff --git a/format/pack/ingest/trailer.go b/format/pack/ingest/trailer.go deleted file mode 100644 index be8156d3..00000000 --- a/format/pack/ingest/trailer.go +++ /dev/null @@ -1,65 +0,0 @@ -package ingest - -import ( - "bytes" - "fmt" - "hash" -) - -// trailerVerifier incrementally verifies trailing hash bytes in a stream. -type trailerVerifier struct { - hash hash.Hash - hashSize int - tail []byte - seen int64 -} - -// newTrailerVerifier creates a trailing hash verifier. -func newTrailerVerifier(hash hash.Hash, hashSize int) *trailerVerifier { - return &trailerVerifier{ - hash: hash, - hashSize: hashSize, - tail: make([]byte, 0, hashSize), - } -} - -// write feeds one chunk of stream bytes into the verifier. -func (verifier *trailerVerifier) write(src []byte) { - if len(src) == 0 { - return - } - - verifier.seen += int64(len(src)) - if len(verifier.tail) == 0 && len(src) <= verifier.hashSize { - verifier.tail = append(verifier.tail, src...) - - return - } - - tmp := make([]byte, 0, len(verifier.tail)+len(src)) - tmp = append(tmp, verifier.tail...) - tmp = append(tmp, src...) - if len(tmp) <= verifier.hashSize { - verifier.tail = tmp - - return - } - - flushN := len(tmp) - verifier.hashSize - _, _ = verifier.hash.Write(tmp[:flushN]) - verifier.tail = append(verifier.tail[:0], tmp[flushN:]...) -} - -// verify finalizes verification against the stream trailer. -func (verifier *trailerVerifier) verify() error { - if len(verifier.tail) != verifier.hashSize { - return fmt.Errorf("format/pack/ingest: stream too short for trailer hash") - } - - computed := verifier.hash.Sum(nil) - if !bytes.Equal(computed, verifier.tail) { - return &ErrPackTrailerMismatch{} - } - - return nil -} -- cgit v1.3.1-10-gc9f91