diff options
| author | 2026-02-21 04:55:02 +0800 | |
|---|---|---|
| committer | 2026-02-21 04:55:02 +0800 | |
| commit | 564c8ecc84ed1bfc28ea3a0251020051906b8548 (patch) | |
| tree | 1961ff46e328a997ffc2d2f19f53ee16f0952d3a /objectstore/loose | |
| parent | objectstore/loose: Add loose writer in bytes (diff) | |
| signature | No signature | |
objectstore/loose: Add streaming writer
Diffstat (limited to 'objectstore/loose')
| -rw-r--r-- | objectstore/loose/write_bytes.go | 109 | ||||
| -rw-r--r-- | objectstore/loose/write_test.go | 145 | ||||
| -rw-r--r-- | objectstore/loose/write_writer.go | 282 |
3 files changed, 411 insertions, 125 deletions
diff --git a/objectstore/loose/write_bytes.go b/objectstore/loose/write_bytes.go index fe2bafb9..1f7ab59f 100644 --- a/objectstore/loose/write_bytes.go +++ b/objectstore/loose/write_bytes.go @@ -1,123 +1,44 @@ package loose import ( - "compress/zlib" - "crypto/rand" - "errors" - "fmt" - "io/fs" - "os" - "path/filepath" + "bytes" - "codeberg.org/lindenii/furgit/objectheader" "codeberg.org/lindenii/furgit/objectid" "codeberg.org/lindenii/furgit/objecttype" ) -const tempObjectFilePrefix = "tmp_obj_" - // WriteBytesFull writes a full serialized object as "type size\\x00content". func (store *Store) WriteBytesFull(raw []byte) (objectid.ObjectID, error) { var zero objectid.ObjectID - if _, _, err := parseRaw(raw); err != nil { + writer, finalize, err := store.WriteWriterFull() + if err != nil { return zero, err } - - id := store.algo.Sum(raw) - relPath, err := store.objectPath(id) - if err != nil { + if _, err := bytes.NewReader(raw).WriteTo(writer); err != nil { + _ = writer.Close() return zero, err } - if err := store.writeCompressedAtomic(relPath, raw); err != nil { + if err := writer.Close(); err != nil { return zero, err } - return id, nil + return finalize() } // WriteBytesContent writes typed content bytes as a loose object. func (store *Store) WriteBytesContent(ty objecttype.Type, content []byte) (objectid.ObjectID, error) { var zero objectid.ObjectID - header, ok := objectheader.Encode(ty, int64(len(content))) - if !ok { - return zero, fmt.Errorf("objectstore/loose: failed to encode object header for type %d", ty) - } - - raw := make([]byte, len(header)+len(content)) - copy(raw, header) - copy(raw[len(header):], content) - return store.WriteBytesFull(raw) -} - -// writeCompressedAtomic compresses raw and writes it to relPath atomically. -func (store *Store) writeCompressedAtomic(relPath string, raw []byte) error { - if _, err := store.root.Stat(relPath); err == nil { - return nil - } else if !errors.Is(err, fs.ErrNotExist) { - return err - } - - dir := filepath.Dir(relPath) - if err := store.root.MkdirAll(dir, 0o755); err != nil { - return err - } - - tmpRelPath, tmpFile, err := store.createTempObjectFile(dir) + writer, finalize, err := store.WriteWriterContent(ty, int64(len(content))) if err != nil { - return err - } - - cleanup := true - defer func() { - if tmpFile != nil { - _ = tmpFile.Close() - } - if cleanup { - _ = store.root.Remove(tmpRelPath) - } - }() - - zw := zlib.NewWriter(tmpFile) - if _, err := zw.Write(raw); err != nil { - _ = zw.Close() - return err - } - if err := zw.Close(); err != nil { - return err - } - if err := tmpFile.Sync(); err != nil { - return err - } - if err := tmpFile.Close(); err != nil { - return err + return zero, err } - tmpFile = nil - - if err := store.root.Rename(tmpRelPath, relPath); err != nil { - if errors.Is(err, fs.ErrExist) { - return nil - } - return err + if _, err := bytes.NewReader(content).WriteTo(writer); err != nil { + _ = writer.Close() + return zero, err } - - cleanup = false - return nil -} - -// createTempObjectFile creates a unique temporary object file within dir. -func (store *Store) createTempObjectFile(dir string) (string, *os.File, error) { - for range 16 { - relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text()) - file, err := store.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644) - if err == nil { - return relPath, file, nil - } - if errors.Is(err, fs.ErrExist) { - continue - } - return "", nil, err + if err := writer.Close(); err != nil { + return zero, err } - - return "", nil, errors.New("objectstore/loose: failed to create temporary object file") + return finalize() } diff --git a/objectstore/loose/write_test.go b/objectstore/loose/write_test.go index 835d451f..0dcb3a5f 100644 --- a/objectstore/loose/write_test.go +++ b/objectstore/loose/write_test.go @@ -2,6 +2,7 @@ package loose_test import ( "bytes" + "io" "testing" "codeberg.org/lindenii/furgit/internal/testgit" @@ -10,47 +11,68 @@ import ( "codeberg.org/lindenii/furgit/objecttype" ) -func TestLooseStoreWriteBytesContentAgainstGit(t *testing.T) { +func TestLooseStoreWriteWriterContentAgainstGit(t *testing.T) { testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { testRepo := testgit.NewBareRepo(t, algo) store := openLooseStore(t, testRepo.Dir(), algo) - content := []byte("written-by-loose-store\n") + content := []byte("written-by-content-writer\n") expectedHex := testRepo.RunInput(t, content, "hash-object", "-t", "blob", "--stdin") expectedID, err := objectid.ParseHex(algo, expectedHex) if err != nil { t.Fatalf("ParseHex(expected): %v", err) } - writtenID, err := store.WriteBytesContent(objecttype.TypeBlob, content) + writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, int64(len(content))) if err != nil { - t.Fatalf("WriteBytesContent: %v", err) + t.Fatalf("WriteWriterContent: %v", err) } - if writtenID != expectedID { - t.Fatalf("WriteBytesContent id = %s, want %s", writtenID, expectedID) + if _, err := io.Copy(writer, bytes.NewReader(content)); err != nil { + t.Fatalf("WriteWriterContent write: %v", err) } - - gotBody := testRepo.CatFile(t, "blob", writtenID) - if !bytes.Equal(gotBody, content) { - t.Fatalf("git cat-file body mismatch") + if err := writer.Close(); err != nil { + t.Fatalf("WriteWriterContent close: %v", err) } - - writtenID2, err := store.WriteBytesContent(objecttype.TypeBlob, content) + writtenID, err := finalize() if err != nil { - t.Fatalf("WriteBytesContent second write: %v", err) + t.Fatalf("WriteWriterContent finalize: %v", err) } - if writtenID2 != expectedID { - t.Fatalf("WriteBytesContent second id = %s, want %s", writtenID2, expectedID) + if writtenID != expectedID { + t.Fatalf("WriteWriterContent id = %s, want %s", writtenID, expectedID) } - }) + + gotBody := testRepo.CatFile(t, "blob", writtenID) + if !bytes.Equal(gotBody, content) { + t.Fatalf("git cat-file body mismatch") + } + + // Writing the same object again should succeed and return the same ID. + writer, finalize, err = store.WriteWriterContent(objecttype.TypeBlob, int64(len(content))) + if err != nil { + t.Fatalf("WriteWriterContent second: %v", err) + } + if _, err := io.Copy(writer, bytes.NewReader(content)); err != nil { + t.Fatalf("WriteWriterContent second write: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("WriteWriterContent second close: %v", err) + } + writtenID2, err := finalize() + if err != nil { + t.Fatalf("WriteWriterContent second finalize: %v", err) + } + if writtenID2 != expectedID { + t.Fatalf("WriteWriterContent second id = %s, want %s", writtenID2, expectedID) + } + }) } -func TestLooseStoreWriteBytesFullAgainstGit(t *testing.T) { +func TestLooseStoreWriteWriterFullAgainstGit(t *testing.T) { testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { testRepo := testgit.NewBareRepo(t, algo) store := openLooseStore(t, testRepo.Dir(), algo) - body := []byte("full-write-body\n") + body := []byte("full-writer-body\n") header, ok := objectheader.Encode(objecttype.TypeBlob, int64(len(body))) if !ok { t.Fatalf("objectheader.Encode failed") @@ -60,12 +82,22 @@ func TestLooseStoreWriteBytesFullAgainstGit(t *testing.T) { copy(raw[len(header):], body) wantID := algo.Sum(raw) - gotID, err := store.WriteBytesFull(raw) + writer, finalize, err := store.WriteWriterFull() + if err != nil { + t.Fatalf("WriteWriterFull: %v", err) + } + if _, err := io.Copy(writer, bytes.NewReader(raw)); err != nil { + t.Fatalf("WriteWriterFull write: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("WriteWriterFull close: %v", err) + } + gotID, err := finalize() if err != nil { - t.Fatalf("WriteBytesFull: %v", err) + t.Fatalf("WriteWriterFull finalize: %v", err) } if gotID != wantID { - t.Fatalf("WriteBytesFull id = %s, want %s", gotID, wantID) + t.Fatalf("WriteWriterFull id = %s, want %s", gotID, wantID) } gotBody := testRepo.CatFile(t, "blob", gotID) @@ -75,19 +107,70 @@ func TestLooseStoreWriteBytesFullAgainstGit(t *testing.T) { }) } -func TestLooseStoreWriteValidationErrors(t *testing.T) { +func TestLooseStoreWriterValidationErrors(t *testing.T) { testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { testRepo := testgit.NewBareRepo(t, algo) store := openLooseStore(t, testRepo.Dir(), algo) - if _, err := store.WriteBytesFull([]byte("blob 1\x00hello")); err == nil { - t.Fatalf("WriteBytesFull expected size/content mismatch error") - } - if _, err := store.WriteBytesFull([]byte("not-a-header")); err == nil { - t.Fatalf("WriteBytesFull expected malformed header error") - } - if _, err := store.WriteBytesContent(objecttype.TypeInvalid, []byte("x")); err == nil { - t.Fatalf("WriteBytesContent expected invalid type error") - } + t.Run("content overflow", func(t *testing.T) { + writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, 1) + if err != nil { + t.Fatalf("WriteWriterContent: %v", err) + } + if _, err := writer.Write([]byte("hello")); err == nil { + t.Fatalf("expected overflow error") + } + _ = writer.Close() + if _, err := finalize(); err == nil { + t.Fatalf("expected finalize error after overflow") + } + }) + + t.Run("content short", func(t *testing.T) { + writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, 5) + if err != nil { + t.Fatalf("WriteWriterContent: %v", err) + } + if _, err := writer.Write([]byte("x")); err != nil { + t.Fatalf("write short: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("close short: %v", err) + } + if _, err := finalize(); err == nil { + t.Fatalf("expected finalize error for short content") + } + }) + + t.Run("full malformed header", func(t *testing.T) { + writer, finalize, err := store.WriteWriterFull() + if err != nil { + t.Fatalf("WriteWriterFull: %v", err) + } + if _, err := writer.Write([]byte("not-a-header")); err != nil { + t.Fatalf("write malformed header bytes unexpectedly failed: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("close malformed header: %v", err) + } + if _, err := finalize(); err == nil { + t.Fatalf("expected finalize error for malformed header") + } + }) + + t.Run("full size mismatch", func(t *testing.T) { + writer, finalize, err := store.WriteWriterFull() + if err != nil { + t.Fatalf("WriteWriterFull: %v", err) + } + raw := []byte("blob 1\x00hello") + if _, err := io.Copy(writer, bytes.NewReader(raw)); err == nil { + t.Fatalf("expected overflow error") + } + _ = writer.Close() + if _, err := finalize(); err == nil { + t.Fatalf("expected finalize error after mismatch") + } + }) }) } diff --git a/objectstore/loose/write_writer.go b/objectstore/loose/write_writer.go new file mode 100644 index 00000000..abbbae31 --- /dev/null +++ b/objectstore/loose/write_writer.go @@ -0,0 +1,282 @@ +package loose + +import ( + "bytes" + "compress/zlib" + "crypto/rand" + "errors" + "fmt" + "hash" + "io" + "io/fs" + "os" + "path/filepath" + + "codeberg.org/lindenii/furgit/objectheader" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +const tempObjectFilePrefix = "tmp_obj_" + +// WriteWriterContent returns a writer for object content bytes. +// The writer accepts exactly size bytes. After closing the writer, +// call finalize to atomically publish the loose object and get its ID. +func (store *Store) WriteWriterContent(ty objecttype.Type, size int64) (io.WriteCloser, func() (objectid.ObjectID, error), error) { + if size < 0 { + return nil, nil, errors.New("objectstore/loose: negative content size") + } + + header, ok := objectheader.Encode(ty, size) + if !ok { + return nil, nil, fmt.Errorf("objectstore/loose: failed to encode object header for type %d", ty) + } + + writer, err := store.newStreamWriter(false) + if err != nil { + return nil, nil, err + } + writer.headerDone = true + writer.expectedContentLeft = size + if err := writer.writeRawChunk(header); err != nil { + _ = writer.Close() + return nil, nil, err + } + + return writer, writer.Finalize, nil +} + +// WriteWriterFull returns a writer for full raw object bytes: +// "type size\0content". After closing the writer, call finalize +// to atomically publish the loose object and get its ID. +func (store *Store) WriteWriterFull() (io.WriteCloser, func() (objectid.ObjectID, error), error) { + writer, err := store.newStreamWriter(true) + if err != nil { + return nil, nil, err + } + return writer, writer.Finalize, nil +} + +// streamWriter incrementally hashes and deflates an object into a temp file. +// Finalize validates size accounting and atomically renames the temp file. +type streamWriter struct { + // store owns path and root operations used by this write session. + store *Store + // file is the temporary destination file under objects/. + file *os.File + // zw compresses raw object bytes into file. + zw *zlib.Writer + // hash receives the same raw bytes used to compute the resulting object ID. + hash hash.Hash + + // tmpRelPath is the relative path of file under the objects root. + tmpRelPath string + + // fullMode selects full-object input ("type size\0content") as opposed to content-only input. + fullMode bool + + // headerBuf accumulates header bytes while fullMode parses up to the first NUL. + headerBuf []byte + // headerDone reports whether the full-object header has been parsed. + headerDone bool + // expectedContentLeft tracks remaining declared content bytes. + expectedContentLeft int64 + + closed bool + finalized bool + finalID objectid.ObjectID + finalErr error +} + +// newStreamWriter creates a stream writer with a temp file rooted in objects/. +func (store *Store) newStreamWriter(fullMode bool) (*streamWriter, error) { + hashFn, err := store.algo.New() + if err != nil { + return nil, err + } + + tmpRelPath, file, err := store.createTempObjectFile(".") + if err != nil { + return nil, err + } + + return &streamWriter{ + store: store, + file: file, + zw: zlib.NewWriter(file), + hash: hashFn, + tmpRelPath: tmpRelPath, + fullMode: fullMode, + headerBuf: make([]byte, 0, 64), + }, nil +} + +// Write validates and writes raw bytes into the stream. +// In full mode, it parses and enforces the streamed header-declared content size. +func (writer *streamWriter) Write(src []byte) (int, error) { + if writer.finalized { + return 0, errors.New("objectstore/loose: write after finalize") + } + if writer.closed { + return 0, errors.New("objectstore/loose: write after close") + } + + if writer.fullMode { + if err := writer.acceptFull(src); err != nil { + return 0, err + } + } else { + if err := writer.acceptContent(int64(len(src))); err != nil { + return 0, err + } + } + + if err := writer.writeRawChunk(src); err != nil { + return 0, err + } + return len(src), nil +} + +// acceptFull validates and accounts raw full-object input. +func (writer *streamWriter) acceptFull(src []byte) error { + if !writer.headerDone { + if nul := bytes.IndexByte(src, 0); nul >= 0 { + headerChunkLen := nul + 1 + writer.headerBuf = append(writer.headerBuf, src[:headerChunkLen]...) + _, size, _, ok := objectheader.Parse(writer.headerBuf) + if !ok { + return errors.New("objectstore/loose: malformed object header") + } + writer.headerDone = true + writer.expectedContentLeft = size + return writer.acceptContent(int64(len(src) - headerChunkLen)) + } + + writer.headerBuf = append(writer.headerBuf, src...) + return nil + } + + return writer.acceptContent(int64(len(src))) +} + +// acceptContent validates and accounts content byte counts. +func (writer *streamWriter) acceptContent(n int64) error { + if n > writer.expectedContentLeft { + return errors.New("objectstore/loose: object content exceeds declared size") + } + writer.expectedContentLeft -= n + return nil +} + +// writeRawChunk forwards raw bytes to the hash and deflate pipeline. +func (writer *streamWriter) writeRawChunk(src []byte) error { + if _, err := writer.hash.Write(src); err != nil { + return err + } + if _, err := writer.zw.Write(src); err != nil { + return err + } + return nil +} + +// Close flushes and closes the underlying zlib stream and temp file. +// It is safe to call multiple times. +func (writer *streamWriter) Close() error { + if writer.closed { + return nil + } + writer.closed = true + + errZlib := writer.zw.Close() + errSync := writer.file.Sync() + errFile := writer.file.Close() + writer.file = nil + return errors.Join(errZlib, errSync, errFile) +} + +// Finalize validates write completeness and atomically publishes the object. +// Publication is no-clobber: it links tmpRelPath to the object path and treats +// existing destination objects as success. +func (writer *streamWriter) Finalize() (objectid.ObjectID, error) { + if writer.finalized { + return writer.finalID, writer.finalErr + } + writer.finalized = true + + var zero objectid.ObjectID + + if !writer.closed { + if err := writer.Close(); err != nil { + writer.finalErr = err + return zero, err + } + } + + if writer.fullMode && !writer.headerDone { + writer.finalErr = errors.New("objectstore/loose: missing full object header") + return zero, writer.finalErr + } + if writer.expectedContentLeft != 0 { + writer.finalErr = errors.New("objectstore/loose: object content shorter than declared size") + return zero, writer.finalErr + } + + idBytes := writer.hash.Sum(nil) + id, err := objectid.FromBytes(writer.store.algo, idBytes) + if err != nil { + writer.finalErr = err + return zero, err + } + + relPath, err := writer.store.objectPath(id) + if err != nil { + writer.finalErr = err + return zero, err + } + + dir := filepath.Dir(relPath) + if err := writer.store.root.MkdirAll(dir, 0o755); err != nil { + writer.finalErr = err + return zero, err + } + + cleanup := true + defer func() { + if cleanup { + _ = writer.store.root.Remove(writer.tmpRelPath) + } + }() + + if err := writer.store.root.Link(writer.tmpRelPath, relPath); err != nil { + if errors.Is(err, fs.ErrExist) { + writer.finalID = id + cleanup = false + _ = writer.store.root.Remove(writer.tmpRelPath) + return id, nil + } + writer.finalErr = err + return zero, err + } + + writer.finalID = id + cleanup = false + return id, nil +} + +// createTempObjectFile creates a unique temporary object file within dir. +// The returned path is relative to the objects root. +func (store *Store) createTempObjectFile(dir string) (string, *os.File, error) { + for range 16 { + relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text()) + file, err := store.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644) + if err == nil { + return relPath, file, nil + } + if errors.Is(err, fs.ErrExist) { + continue + } + return "", nil, err + } + + return "", nil, errors.New("objectstore/loose: failed to create temporary object file") +} |
