aboutsummaryrefslogtreecommitdiff
path: root/format
diff options
context:
space:
mode:
Diffstat (limited to 'format')
-rw-r--r--format/pack/ingest/stream.go19
-rw-r--r--format/pack/ingest/stream_scan.go6
2 files changed, 21 insertions, 4 deletions
diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go
index 17a19d8d..303e91b2 100644
--- a/format/pack/ingest/stream.go
+++ b/format/pack/ingest/stream.go
@@ -13,7 +13,7 @@ import (
// trailer verification state.
type streamCopier struct {
reader *bufio.Reader
- packFile *os.File
+ writer *bufio.Writer
verifier *trailerVerifier
offset uint64
entryCRC hash.Hash32
@@ -23,7 +23,7 @@ type streamCopier struct {
func newStreamCopier(src io.Reader, packFile *os.File, verifier *trailerVerifier) *streamCopier {
return &streamCopier{
reader: bufio.NewReaderSize(src, 64<<10),
- packFile: packFile,
+ writer: bufio.NewWriterSize(packFile, 256<<10),
verifier: verifier,
}
}
@@ -66,10 +66,13 @@ func (stream *streamCopier) readFull(dst []byte) error {
// writeChunk mirrors src bytes to destination artifacts and accounting.
func (stream *streamCopier) writeChunk(src []byte) error {
- _, err := stream.packFile.WriteAt(src, int64(stream.offset))
+ n, err := stream.writer.Write(src)
if err != nil {
return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)}
}
+ if n != len(src) {
+ return &ErrDestinationWrite{Op: "write pack: short write"}
+ }
if stream.entryCRC != nil {
_, _ = stream.entryCRC.Write(src)
@@ -80,6 +83,16 @@ func (stream *streamCopier) writeChunk(src []byte) error {
return nil
}
+// flush flushes buffered pack output bytes to the destination file.
+func (stream *streamCopier) flush() error {
+ err := stream.writer.Flush()
+ if err != nil {
+ return &ErrDestinationWrite{Op: fmt.Sprintf("flush pack: %v", err)}
+ }
+
+ return nil
+}
+
// beginEntryCRC starts inline CRC accumulation for one packed entry.
func (stream *streamCopier) beginEntryCRC() {
stream.entryCRC = crc32.NewIEEE()
diff --git a/format/pack/ingest/stream_scan.go b/format/pack/ingest/stream_scan.go
index 2c2389d8..49290c0e 100644
--- a/format/pack/ingest/stream_scan.go
+++ b/format/pack/ingest/stream_scan.go
@@ -45,7 +45,11 @@ func streamPackAndScan(state *ingestState) error {
}
}
- return finalizeStreamPackHash(state)
+ if err := finalizeStreamPackHash(state); err != nil {
+ return err
+ }
+
+ return state.stream.flush()
}
// readAndValidatePackHeader reads and validates PACK header from the stream.