diff options
| author | 2026-03-05 19:39:31 +0800 | |
|---|---|---|
| committer | 2026-03-05 19:39:31 +0800 | |
| commit | 6ab570c7f847560d85999783e597a29790cbd529 (patch) | |
| tree | de52cb39639624b1e2f842277ccd8034a06a50ac /format | |
| parent | go.mod: Remove klauspost/compress dep (diff) | |
| signature | No signature | |
format/pack/ingest: Optimize writes
Diffstat (limited to 'format')
| -rw-r--r-- | format/pack/ingest/stream.go | 19 | ||||
| -rw-r--r-- | format/pack/ingest/stream_scan.go | 6 |
2 files changed, 21 insertions, 4 deletions
diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go index 17a19d8d..303e91b2 100644 --- a/format/pack/ingest/stream.go +++ b/format/pack/ingest/stream.go @@ -13,7 +13,7 @@ import ( // trailer verification state. type streamCopier struct { reader *bufio.Reader - packFile *os.File + writer *bufio.Writer verifier *trailerVerifier offset uint64 entryCRC hash.Hash32 @@ -23,7 +23,7 @@ type streamCopier struct { func newStreamCopier(src io.Reader, packFile *os.File, verifier *trailerVerifier) *streamCopier { return &streamCopier{ reader: bufio.NewReaderSize(src, 64<<10), - packFile: packFile, + writer: bufio.NewWriterSize(packFile, 256<<10), verifier: verifier, } } @@ -66,10 +66,13 @@ func (stream *streamCopier) readFull(dst []byte) error { // writeChunk mirrors src bytes to destination artifacts and accounting. func (stream *streamCopier) writeChunk(src []byte) error { - _, err := stream.packFile.WriteAt(src, int64(stream.offset)) + n, err := stream.writer.Write(src) if err != nil { return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} } + if n != len(src) { + return &ErrDestinationWrite{Op: "write pack: short write"} + } if stream.entryCRC != nil { _, _ = stream.entryCRC.Write(src) @@ -80,6 +83,16 @@ func (stream *streamCopier) writeChunk(src []byte) error { return nil } +// flush flushes buffered pack output bytes to the destination file. +func (stream *streamCopier) flush() error { + err := stream.writer.Flush() + if err != nil { + return &ErrDestinationWrite{Op: fmt.Sprintf("flush pack: %v", err)} + } + + return nil +} + // beginEntryCRC starts inline CRC accumulation for one packed entry. func (stream *streamCopier) beginEntryCRC() { stream.entryCRC = crc32.NewIEEE() diff --git a/format/pack/ingest/stream_scan.go b/format/pack/ingest/stream_scan.go index 2c2389d8..49290c0e 100644 --- a/format/pack/ingest/stream_scan.go +++ b/format/pack/ingest/stream_scan.go @@ -45,7 +45,11 @@ func streamPackAndScan(state *ingestState) error { } } - return finalizeStreamPackHash(state) + if err := finalizeStreamPackHash(state); err != nil { + return err + } + + return state.stream.flush() } // readAndValidatePackHeader reads and validates PACK header from the stream. |
