aboutsummaryrefslogtreecommitdiff
path: root/object/store/packed/internal/ingest/stream.go
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-03-30 14:28:13 +0000
committerGravatar Runxi Yu2026-03-30 14:28:13 +0000
commita4eeb727468a178a4de0dfc718828f26740484ac (patch)
tree4318d38d49facc80e2e2186f5919fa656be3b31f /object/store/packed/internal/ingest/stream.go
parentobject/store/packed: Make store own root, algo, opts (diff)
signatureNo signature
object,store/packed{,/internal/ingest}: Move from format/packfile/ingest
Diffstat (limited to 'object/store/packed/internal/ingest/stream.go')
-rw-r--r--object/store/packed/internal/ingest/stream.go111
1 files changed, 111 insertions, 0 deletions
diff --git a/object/store/packed/internal/ingest/stream.go b/object/store/packed/internal/ingest/stream.go
new file mode 100644
index 00000000..a403087a
--- /dev/null
+++ b/object/store/packed/internal/ingest/stream.go
@@ -0,0 +1,111 @@
+package ingest
+
+import (
+ "errors"
+ "hash"
+ "io"
+ "os"
+)
+
+const streamScannerBufferSize = 64 << 10
+
+// streamScanner incrementally reads/consumes one pack stream while mirroring
+// consumed bytes into one destination pack file.
+type streamScanner struct {
+ src io.Reader
+ dstFile *os.File
+
+ // Input buffer window: buf[off:n] is unread.
+ buf []byte
+ off int
+ n int
+
+ // Absolute consumed stream bytes.
+ consumed uint64
+
+ // Running pack hash over consumed bytes while hashEnabled is true.
+ hash hash.Hash
+ hashSize int
+ hashEnabled bool
+
+ // Entry CRC state while one entry is being consumed.
+ entryCRC uint32
+ inEntryCRC bool
+
+ packTrailer []byte
+}
+
+// newStreamScanner constructs one scanner with fixed input buffering.
+func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner {
+ return &streamScanner{
+ src: src,
+ dstFile: dstFile,
+ buf: make([]byte, streamScannerBufferSize),
+ hash: hash,
+ hashSize: hashSize,
+ hashEnabled: true,
+ }
+}
+
+// Read implements io.Reader.
+func (scanner *streamScanner) Read(dst []byte) (int, error) {
+ if len(dst) == 0 {
+ return 0, nil
+ }
+
+ if scanner.n-scanner.off == 0 {
+ err := scanner.fill(1)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ return 0, io.EOF
+ }
+
+ return 0, err
+ }
+ }
+
+ unread := scanner.n - scanner.off
+ if unread == 0 {
+ return 0, io.EOF
+ }
+
+ n := min(len(dst), unread)
+
+ copy(dst, scanner.buf[scanner.off:scanner.off+n])
+
+ err := scanner.use(n)
+ if err != nil {
+ return 0, err
+ }
+
+ return n, nil
+}
+
+// ReadByte implements io.ByteReader without allocation.
+func (scanner *streamScanner) ReadByte() (byte, error) {
+ if scanner.n-scanner.off == 0 {
+ err := scanner.fill(1)
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ b := scanner.buf[scanner.off]
+
+ err := scanner.use(1)
+ if err != nil {
+ return 0, err
+ }
+
+ return b, nil
+}
+
+// readFull reads exactly len(dst) bytes through receiver.
+func (scanner *streamScanner) readFull(dst []byte) error {
+ _, err := io.ReadFull(scanner, dst)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}