diff options
| author | 2026-03-05 18:24:40 +0800 | |
|---|---|---|
| committer | 2026-03-05 19:05:47 +0800 | |
| commit | 57f1818d547f2f1dca38033b4e29f62d89ef80f9 (patch) | |
| tree | 88d55ac38e2427860bf380c8cce42fcb3bb1e9ee /format/pack/ingest/stream.go | |
| parent | internal/compress/zlib: Use flate's compression consumed counter (diff) | |
| signature | No signature | |
format/pack/ingest: Init
Diffstat (limited to 'format/pack/ingest/stream.go')
| -rw-r--r-- | format/pack/ingest/stream.go | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go new file mode 100644 index 00000000..17a19d8d --- /dev/null +++ b/format/pack/ingest/stream.go @@ -0,0 +1,97 @@ +package ingest + +import ( + "bufio" + "fmt" + "hash" + "hash/crc32" + "io" + "os" +) + +// streamCopier reads bytes from src, writes them to packFile, and updates +// trailer verification state. +type streamCopier struct { + reader *bufio.Reader + packFile *os.File + verifier *trailerVerifier + offset uint64 + entryCRC hash.Hash32 +} + +// newStreamCopier constructs one stream copier. +func newStreamCopier(src io.Reader, packFile *os.File, verifier *trailerVerifier) *streamCopier { + return &streamCopier{ + reader: bufio.NewReaderSize(src, 64<<10), + packFile: packFile, + verifier: verifier, + } +} + +// Read implements io.Reader. +func (stream *streamCopier) Read(dst []byte) (int, error) { + n, err := stream.reader.Read(dst) + if n > 0 { + if writeErr := stream.writeChunk(dst[:n]); writeErr != nil { + return 0, writeErr + } + } + + return n, err +} + +// ReadByte implements io.ByteReader. +func (stream *streamCopier) ReadByte() (byte, error) { + b, err := stream.reader.ReadByte() + if err != nil { + return 0, err + } + + if writeErr := stream.writeChunk([]byte{b}); writeErr != nil { + return 0, writeErr + } + + return b, nil +} + +// readFull reads exactly len(dst) bytes through stream. +func (stream *streamCopier) readFull(dst []byte) error { + _, err := io.ReadFull(stream, dst) + if err != nil { + return err + } + + return nil +} + +// writeChunk mirrors src bytes to destination artifacts and accounting. +func (stream *streamCopier) writeChunk(src []byte) error { + _, err := stream.packFile.WriteAt(src, int64(stream.offset)) + if err != nil { + return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} + } + + if stream.entryCRC != nil { + _, _ = stream.entryCRC.Write(src) + } + stream.verifier.write(src) + stream.offset += uint64(len(src)) + + return nil +} + +// beginEntryCRC starts inline CRC accumulation for one packed entry. +func (stream *streamCopier) beginEntryCRC() { + stream.entryCRC = crc32.NewIEEE() +} + +// endEntryCRC finishes inline CRC accumulation for one packed entry. +func (stream *streamCopier) endEntryCRC() (uint32, error) { + if stream.entryCRC == nil { + return 0, fmt.Errorf("format/pack/ingest: entry CRC not started") + } + crc := stream.entryCRC.Sum32() + stream.entryCRC = nil + + return crc, nil +} |
