diff options
| author | 2026-03-05 20:35:02 +0800 | |
|---|---|---|
| committer | 2026-03-05 20:35:02 +0800 | |
| commit | 197fc54d0fe9e89345992b1efbfbfaf3185e3272 (patch) | |
| tree | 45b41cdb16f405fa34268dab5347bbd0188dafe0 /format/pack/ingest/stream.go | |
| parent | format/pack/ingest: Temporary file purging (diff) | |
| signature | No signature | |
format/pack/ingest: Improve trailer stuff
Diffstat (limited to 'format/pack/ingest/stream.go')
| -rw-r--r-- | format/pack/ingest/stream.go | 250 |
1 files changed, 194 insertions, 56 deletions
diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go index 303e91b2..61f6079b 100644 --- a/format/pack/ingest/stream.go +++ b/format/pack/ingest/stream.go @@ -1,7 +1,7 @@ package ingest import ( - "bufio" + "bytes" "fmt" "hash" "hash/crc32" @@ -9,54 +9,156 @@ import ( "os" ) -// streamCopier reads bytes from src, writes them to packFile, and updates -// trailer verification state. -type streamCopier struct { - reader *bufio.Reader - writer *bufio.Writer - verifier *trailerVerifier - offset uint64 - entryCRC hash.Hash32 +const streamScannerBufferSize = 64 << 10 + +// streamScanner incrementally reads/consumes one pack stream while mirroring +// consumed bytes into one destination pack file. +type streamScanner struct { + src io.Reader + dstFile *os.File + + // Input buffer window: buf[off:n] is unread. + buf []byte + off int + n int + + // Absolute consumed stream bytes. + consumed uint64 + + // Running pack hash over consumed bytes while hashEnabled is true. + hash hash.Hash + hashSize int + hashEnabled bool + + // Entry CRC state while one entry is being consumed. + entryCRC uint32 + inEntryCRC bool + + packTrailer []byte } -// newStreamCopier constructs one stream copier. -func newStreamCopier(src io.Reader, packFile *os.File, verifier *trailerVerifier) *streamCopier { - return &streamCopier{ - reader: bufio.NewReaderSize(src, 64<<10), - writer: bufio.NewWriterSize(packFile, 256<<10), - verifier: verifier, +// newStreamScanner constructs one scanner with fixed input buffering. +func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner { + return &streamScanner{ + src: src, + dstFile: dstFile, + buf: make([]byte, streamScannerBufferSize), + hash: hash, + hashSize: hashSize, + hashEnabled: true, } } -// Read implements io.Reader. -func (stream *streamCopier) Read(dst []byte) (int, error) { - n, err := stream.reader.Read(dst) - if n > 0 { - if writeErr := stream.writeChunk(dst[:n]); writeErr != nil { - return 0, writeErr +// fill ensures at least min unread bytes are available in receiver's buffer. +func (scanner *streamScanner) fill(min int) error { + if min <= 0 { + return nil + } + if min > len(scanner.buf) { + return fmt.Errorf("format/pack/ingest: fill(%d) exceeds scanner buffer", min) + } + + for scanner.n-scanner.off < min { + if err := scanner.flushConsumedPrefix(); err != nil { + return err + } + + readN, err := scanner.src.Read(scanner.buf[scanner.n:]) + if readN > 0 { + scanner.n += readN + } + if err != nil { + if err == io.EOF && scanner.n-scanner.off >= min { + return nil + } + + return err + } + if readN == 0 { + return io.ErrNoProgress + } + } + + return nil +} + +// use consumes n unread bytes and updates accounting/checksum state. +func (scanner *streamScanner) use(n int) error { + if n < 0 || n > scanner.n-scanner.off { + return fmt.Errorf("format/pack/ingest: invalid consume length %d", n) + } + if n == 0 { + return nil + } + + chunk := scanner.buf[scanner.off : scanner.off+n] + if scanner.hashEnabled { + if _, err := scanner.hash.Write(chunk); err != nil { + return err } } + if scanner.inEntryCRC { + scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += uint64(n) - return n, err + return nil } -// ReadByte implements io.ByteReader. -func (stream *streamCopier) ReadByte() (byte, error) { - b, err := stream.reader.ReadByte() - if err != nil { +// Read implements io.Reader. +func (scanner *streamScanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + if scanner.n-scanner.off == 0 { + if err := scanner.fill(1); err != nil { + if err == io.EOF { + return 0, io.EOF + } + + return 0, err + } + } + + unread := scanner.n - scanner.off + if unread == 0 { + return 0, io.EOF + } + + n := len(dst) + if n > unread { + n = unread + } + copy(dst, scanner.buf[scanner.off:scanner.off+n]) + if err := scanner.use(n); err != nil { return 0, err } - if writeErr := stream.writeChunk([]byte{b}); writeErr != nil { - return 0, writeErr + return n, nil +} + +// ReadByte implements io.ByteReader without allocation. +func (scanner *streamScanner) ReadByte() (byte, error) { + if scanner.n-scanner.off == 0 { + if err := scanner.fill(1); err != nil { + return 0, err + } + } + + b := scanner.buf[scanner.off] + if err := scanner.use(1); err != nil { + return 0, err } return b, nil } -// readFull reads exactly len(dst) bytes through stream. -func (stream *streamCopier) readFull(dst []byte) error { - _, err := io.ReadFull(stream, dst) +// readFull reads exactly len(dst) bytes through receiver. +func (scanner *streamScanner) readFull(dst []byte) error { + _, err := io.ReadFull(scanner, dst) if err != nil { return err } @@ -64,47 +166,83 @@ func (stream *streamCopier) readFull(dst []byte) error { return nil } -// writeChunk mirrors src bytes to destination artifacts and accounting. -func (stream *streamCopier) writeChunk(src []byte) error { - n, err := stream.writer.Write(src) - if err != nil { - return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} - } - if n != len(src) { - return &ErrDestinationWrite{Op: "write pack: short write"} +// flush writes all consumed-but-unflushed bytes to destination pack file. +func (scanner *streamScanner) flush() error { + return scanner.flushConsumedPrefix() +} + +// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum, +// and ensures no trailing garbage remains in stream. +func (scanner *streamScanner) finishAndFlushTrailer() error { + if scanner.hashSize <= 0 { + return fmt.Errorf("format/pack/ingest: invalid hash size") } - if stream.entryCRC != nil { - _, _ = stream.entryCRC.Write(src) + trailer := make([]byte, scanner.hashSize) + scanner.hashEnabled = false + if err := scanner.readFull(trailer); err != nil { + return &ErrPackTrailerMismatch{} } - stream.verifier.write(src) - stream.offset += uint64(len(src)) + scanner.packTrailer = append(scanner.packTrailer[:0], trailer...) - return nil -} + var probe [1]byte + n, err := scanner.Read(probe[:]) + if n > 0 || err == nil { + return fmt.Errorf("format/pack/ingest: pack has trailing garbage") + } + if err != io.EOF { + return err + } -// flush flushes buffered pack output bytes to the destination file. -func (stream *streamCopier) flush() error { - err := stream.writer.Flush() - if err != nil { - return &ErrDestinationWrite{Op: fmt.Sprintf("flush pack: %v", err)} + computed := scanner.hash.Sum(nil) + if !bytes.Equal(computed, trailer) { + return &ErrPackTrailerMismatch{} } return nil } // beginEntryCRC starts inline CRC accumulation for one packed entry. -func (stream *streamCopier) beginEntryCRC() { - stream.entryCRC = crc32.NewIEEE() +func (scanner *streamScanner) beginEntryCRC() { + scanner.entryCRC = 0 + scanner.inEntryCRC = true } // endEntryCRC finishes inline CRC accumulation for one packed entry. -func (stream *streamCopier) endEntryCRC() (uint32, error) { - if stream.entryCRC == nil { +func (scanner *streamScanner) endEntryCRC() (uint32, error) { + if !scanner.inEntryCRC { return 0, fmt.Errorf("format/pack/ingest: entry CRC not started") } - crc := stream.entryCRC.Sum32() - stream.entryCRC = nil + crc := scanner.entryCRC + scanner.entryCRC = 0 + scanner.inEntryCRC = false return crc, nil } + +// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread +// bytes to the start of buffer. +func (scanner *streamScanner) flushConsumedPrefix() error { + if scanner.off == 0 { + return nil + } + + written := 0 + for written < scanner.off { + n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off]) + if err != nil { + return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} + } + if n == 0 { + return &ErrDestinationWrite{Op: "write pack: short write"} + } + written += n + } + + unread := scanner.n - scanner.off + copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n]) + scanner.off = 0 + scanner.n = unread + + return nil +} |
