diff options
| author | 2026-03-26 09:17:14 +0000 | |
|---|---|---|
| committer | 2026-03-26 09:18:30 +0000 | |
| commit | 3e884f5f3d42cbc4874a04da31dde10314b0cfad (patch) | |
| tree | f5e1e325fd1a2a0801791c054010213214475d80 /format/packfile/ingest/stream.go | |
| parent | network/receivepack: Rename from receivepack (diff) | |
| signature | No signature | |
format: Move commitgraph and packfile here
Diffstat (limited to 'format/packfile/ingest/stream.go')
| -rw-r--r-- | format/packfile/ingest/stream.go | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/format/packfile/ingest/stream.go b/format/packfile/ingest/stream.go new file mode 100644 index 00000000..a403087a --- /dev/null +++ b/format/packfile/ingest/stream.go @@ -0,0 +1,111 @@ +package ingest + +import ( + "errors" + "hash" + "io" + "os" +) + +const streamScannerBufferSize = 64 << 10 + +// streamScanner incrementally reads/consumes one pack stream while mirroring +// consumed bytes into one destination pack file. +type streamScanner struct { + src io.Reader + dstFile *os.File + + // Input buffer window: buf[off:n] is unread. + buf []byte + off int + n int + + // Absolute consumed stream bytes. + consumed uint64 + + // Running pack hash over consumed bytes while hashEnabled is true. + hash hash.Hash + hashSize int + hashEnabled bool + + // Entry CRC state while one entry is being consumed. + entryCRC uint32 + inEntryCRC bool + + packTrailer []byte +} + +// newStreamScanner constructs one scanner with fixed input buffering. +func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner { + return &streamScanner{ + src: src, + dstFile: dstFile, + buf: make([]byte, streamScannerBufferSize), + hash: hash, + hashSize: hashSize, + hashEnabled: true, + } +} + +// Read implements io.Reader. +func (scanner *streamScanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + if scanner.n-scanner.off == 0 { + err := scanner.fill(1) + if err != nil { + if errors.Is(err, io.EOF) { + return 0, io.EOF + } + + return 0, err + } + } + + unread := scanner.n - scanner.off + if unread == 0 { + return 0, io.EOF + } + + n := min(len(dst), unread) + + copy(dst, scanner.buf[scanner.off:scanner.off+n]) + + err := scanner.use(n) + if err != nil { + return 0, err + } + + return n, nil +} + +// ReadByte implements io.ByteReader without allocation. +func (scanner *streamScanner) ReadByte() (byte, error) { + if scanner.n-scanner.off == 0 { + err := scanner.fill(1) + if err != nil { + return 0, err + } + } + + b := scanner.buf[scanner.off] + + err := scanner.use(1) + if err != nil { + return 0, err + } + + return b, nil +} + +// readFull reads exactly len(dst) bytes through receiver. +func (scanner *streamScanner) readFull(dst []byte) error { + _, err := io.ReadFull(scanner, dst) + if err != nil { + return err + } + + return nil +} |
