aboutsummaryrefslogtreecommitdiff
path: root/format/pack/ingest/stream.go
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-03-06 11:04:15 +0800
committerGravatar Runxi Yu2026-03-06 11:16:16 +0800
commit6945464a0438396de97b9bc28c1bd31c22456092 (patch)
tree94de06ad52d6fdff77c6d05dc5c415c65a8311e6 /format/pack/ingest/stream.go
parentreachability: Split walk files (diff)
signatureNo signature
format/pack/ingest: Split files
Diffstat (limited to 'format/pack/ingest/stream.go')
-rw-r--r--format/pack/ingest/stream.go158
1 files changed, 0 insertions, 158 deletions
diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go
index 72f6c5a4..66a6fc5f 100644
--- a/format/pack/ingest/stream.go
+++ b/format/pack/ingest/stream.go
@@ -1,11 +1,8 @@
package ingest
import (
- "bytes"
"errors"
- "fmt"
"hash"
- "hash/crc32"
"io"
"os"
)
@@ -106,71 +103,6 @@ func (scanner *streamScanner) ReadByte() (byte, error) {
return b, nil
}
-// fill ensures at least min unread bytes are available in receiver's buffer.
-func (scanner *streamScanner) fill(minLen int) error {
- if minLen <= 0 {
- return nil
- }
-
- if minLen > len(scanner.buf) {
- return fmt.Errorf("format/pack/ingest: fill(%d) exceeds scanner buffer", minLen)
- }
-
- for scanner.n-scanner.off < minLen {
- err := scanner.flushConsumedPrefix()
- if err != nil {
- return err
- }
-
- readN, err := scanner.src.Read(scanner.buf[scanner.n:])
- if readN > 0 {
- scanner.n += readN
- }
-
- if err != nil {
- if errors.Is(err, io.EOF) && scanner.n-scanner.off >= minLen {
- return nil
- }
-
- return err
- }
-
- if readN == 0 {
- return io.ErrNoProgress
- }
- }
-
- return nil
-}
-
-// use consumes n unread bytes and updates accounting/checksum state.
-func (scanner *streamScanner) use(n int) error {
- if n < 0 || n > scanner.n-scanner.off {
- return fmt.Errorf("format/pack/ingest: invalid consume length %d", n)
- }
-
- if n == 0 {
- return nil
- }
-
- chunk := scanner.buf[scanner.off : scanner.off+n]
- if scanner.hashEnabled {
- _, err := scanner.hash.Write(chunk)
- if err != nil {
- return err
- }
- }
-
- if scanner.inEntryCRC {
- scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk)
- }
-
- scanner.off += n
- scanner.consumed += uint64(n)
-
- return nil
-}
-
// readFull reads exactly len(dst) bytes through receiver.
func (scanner *streamScanner) readFull(dst []byte) error {
_, err := io.ReadFull(scanner, dst)
@@ -180,93 +112,3 @@ func (scanner *streamScanner) readFull(dst []byte) error {
return nil
}
-
-// flush writes all consumed-but-unflushed bytes to destination pack file.
-func (scanner *streamScanner) flush() error {
- return scanner.flushConsumedPrefix()
-}
-
-// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum,
-// and ensures no trailing garbage remains in stream.
-func (scanner *streamScanner) finishAndFlushTrailer() error {
- if scanner.hashSize <= 0 {
- return fmt.Errorf("format/pack/ingest: invalid hash size")
- }
-
- trailer := make([]byte, scanner.hashSize)
-
- scanner.hashEnabled = false
-
- err := scanner.readFull(trailer)
- if err != nil {
- return &ErrPackTrailerMismatch{}
- }
-
- scanner.packTrailer = append(scanner.packTrailer[:0], trailer...)
-
- var probe [1]byte
-
- n, err := scanner.Read(probe[:])
- if n > 0 || err == nil {
- return fmt.Errorf("format/pack/ingest: pack has trailing garbage")
- }
-
- if !errors.Is(err, io.EOF) {
- return err
- }
-
- computed := scanner.hash.Sum(nil)
- if !bytes.Equal(computed, trailer) {
- return &ErrPackTrailerMismatch{}
- }
-
- return nil
-}
-
-// beginEntryCRC starts inline CRC accumulation for one packed entry.
-func (scanner *streamScanner) beginEntryCRC() {
- scanner.entryCRC = 0
- scanner.inEntryCRC = true
-}
-
-// endEntryCRC finishes inline CRC accumulation for one packed entry.
-func (scanner *streamScanner) endEntryCRC() (uint32, error) {
- if !scanner.inEntryCRC {
- return 0, fmt.Errorf("format/pack/ingest: entry CRC not started")
- }
-
- crc := scanner.entryCRC
- scanner.entryCRC = 0
- scanner.inEntryCRC = false
-
- return crc, nil
-}
-
-// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread
-// bytes to the start of buffer.
-func (scanner *streamScanner) flushConsumedPrefix() error {
- if scanner.off == 0 {
- return nil
- }
-
- written := 0
- for written < scanner.off {
- n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off])
- if err != nil {
- return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)}
- }
-
- if n == 0 {
- return &ErrDestinationWrite{Op: "write pack: short write"}
- }
-
- written += n
- }
-
- unread := scanner.n - scanner.off
- copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n])
- scanner.off = 0
- scanner.n = unread
-
- return nil
-}