aboutsummaryrefslogtreecommitdiff
path: root/object/store/packed/internal/ingest/flush.go
diff options
context:
space:
mode:
authorGravatar Runxi Yu2026-03-30 14:28:13 +0000
committerGravatar Runxi Yu2026-03-30 14:28:13 +0000
commita4eeb727468a178a4de0dfc718828f26740484ac (patch)
tree4318d38d49facc80e2e2186f5919fa656be3b31f /object/store/packed/internal/ingest/flush.go
parentobject/store/packed: Make store own root, algo, opts (diff)
object,store/packed{,/internal/ingest}: Move from format/packfile/ingest
Diffstat (limited to 'object/store/packed/internal/ingest/flush.go')
-rw-r--r--object/store/packed/internal/ingest/flush.go37
1 files changed, 37 insertions, 0 deletions
diff --git a/object/store/packed/internal/ingest/flush.go b/object/store/packed/internal/ingest/flush.go
new file mode 100644
index 00000000..96753170
--- /dev/null
+++ b/object/store/packed/internal/ingest/flush.go
@@ -0,0 +1,37 @@
+package ingest
+
+import "fmt"
+
+// flush writes all consumed-but-unflushed bytes to destination pack file.
+func (scanner *streamScanner) flush() error {
+ return scanner.flushConsumedPrefix()
+}
+
+// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread
+// bytes to the start of buffer.
+func (scanner *streamScanner) flushConsumedPrefix() error {
+ if scanner.off == 0 {
+ return nil
+ }
+
+ written := 0
+ for written < scanner.off {
+ n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off])
+ if err != nil {
+ return &DestinationWriteError{Op: fmt.Sprintf("write pack: %v", err)}
+ }
+
+ if n == 0 {
+ return &DestinationWriteError{Op: "write pack: short write"}
+ }
+
+ written += n
+ }
+
+ unread := scanner.n - scanner.off
+ copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n])
+ scanner.off = 0
+ scanner.n = unread
+
+ return nil
+}