diff options
| author | 2026-03-06 11:04:15 +0800 | |
|---|---|---|
| committer | 2026-03-06 11:16:16 +0800 | |
| commit | 6945464a0438396de97b9bc28c1bd31c22456092 (patch) | |
| tree | 94de06ad52d6fdff77c6d05dc5c415c65a8311e6 /format/pack/ingest/stream.go | |
| parent | reachability: Split walk files (diff) | |
| signature | No signature | |
format/pack/ingest: Split files
Diffstat (limited to 'format/pack/ingest/stream.go')
| -rw-r--r-- | format/pack/ingest/stream.go | 158 |
1 files changed, 0 insertions, 158 deletions
diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go index 72f6c5a4..66a6fc5f 100644 --- a/format/pack/ingest/stream.go +++ b/format/pack/ingest/stream.go @@ -1,11 +1,8 @@ package ingest import ( - "bytes" "errors" - "fmt" "hash" - "hash/crc32" "io" "os" ) @@ -106,71 +103,6 @@ func (scanner *streamScanner) ReadByte() (byte, error) { return b, nil } -// fill ensures at least min unread bytes are available in receiver's buffer. -func (scanner *streamScanner) fill(minLen int) error { - if minLen <= 0 { - return nil - } - - if minLen > len(scanner.buf) { - return fmt.Errorf("format/pack/ingest: fill(%d) exceeds scanner buffer", minLen) - } - - for scanner.n-scanner.off < minLen { - err := scanner.flushConsumedPrefix() - if err != nil { - return err - } - - readN, err := scanner.src.Read(scanner.buf[scanner.n:]) - if readN > 0 { - scanner.n += readN - } - - if err != nil { - if errors.Is(err, io.EOF) && scanner.n-scanner.off >= minLen { - return nil - } - - return err - } - - if readN == 0 { - return io.ErrNoProgress - } - } - - return nil -} - -// use consumes n unread bytes and updates accounting/checksum state. -func (scanner *streamScanner) use(n int) error { - if n < 0 || n > scanner.n-scanner.off { - return fmt.Errorf("format/pack/ingest: invalid consume length %d", n) - } - - if n == 0 { - return nil - } - - chunk := scanner.buf[scanner.off : scanner.off+n] - if scanner.hashEnabled { - _, err := scanner.hash.Write(chunk) - if err != nil { - return err - } - } - - if scanner.inEntryCRC { - scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk) - } - - scanner.off += n - scanner.consumed += uint64(n) - - return nil -} - // readFull reads exactly len(dst) bytes through receiver. func (scanner *streamScanner) readFull(dst []byte) error { _, err := io.ReadFull(scanner, dst) @@ -180,93 +112,3 @@ func (scanner *streamScanner) readFull(dst []byte) error { return nil } - -// flush writes all consumed-but-unflushed bytes to destination pack file. -func (scanner *streamScanner) flush() error { - return scanner.flushConsumedPrefix() -} - -// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum, -// and ensures no trailing garbage remains in stream. -func (scanner *streamScanner) finishAndFlushTrailer() error { - if scanner.hashSize <= 0 { - return fmt.Errorf("format/pack/ingest: invalid hash size") - } - - trailer := make([]byte, scanner.hashSize) - - scanner.hashEnabled = false - - err := scanner.readFull(trailer) - if err != nil { - return &ErrPackTrailerMismatch{} - } - - scanner.packTrailer = append(scanner.packTrailer[:0], trailer...) - - var probe [1]byte - - n, err := scanner.Read(probe[:]) - if n > 0 || err == nil { - return fmt.Errorf("format/pack/ingest: pack has trailing garbage") - } - - if !errors.Is(err, io.EOF) { - return err - } - - computed := scanner.hash.Sum(nil) - if !bytes.Equal(computed, trailer) { - return &ErrPackTrailerMismatch{} - } - - return nil -} - -// beginEntryCRC starts inline CRC accumulation for one packed entry. -func (scanner *streamScanner) beginEntryCRC() { - scanner.entryCRC = 0 - scanner.inEntryCRC = true -} - -// endEntryCRC finishes inline CRC accumulation for one packed entry. -func (scanner *streamScanner) endEntryCRC() (uint32, error) { - if !scanner.inEntryCRC { - return 0, fmt.Errorf("format/pack/ingest: entry CRC not started") - } - - crc := scanner.entryCRC - scanner.entryCRC = 0 - scanner.inEntryCRC = false - - return crc, nil -} - -// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread -// bytes to the start of buffer. -func (scanner *streamScanner) flushConsumedPrefix() error { - if scanner.off == 0 { - return nil - } - - written := 0 - for written < scanner.off { - n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off]) - if err != nil { - return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} - } - - if n == 0 { - return &ErrDestinationWrite{Op: "write pack: short write"} - } - - written += n - } - - unread := scanner.n - scanner.off - copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n]) - scanner.off = 0 - scanner.n = unread - - return nil -} |
