diff options
Diffstat (limited to 'object/store')
54 files changed, 5515 insertions, 64 deletions
diff --git a/object/store/chain/bytes.go b/object/store/chain/bytes.go index fe045d0f..1739fcaa 100644 --- a/object/store/chain/bytes.go +++ b/object/store/chain/bytes.go @@ -41,8 +41,8 @@ func (chain *Chain) ReadBytesContent(id id.ObjectID) (typ.Type, []byte, error) { continue } - return typ.TypeUnknown, nil, fmt.Errorf("object/store/chain: read bytes content: %w", err) + return typ.Unknown, nil, fmt.Errorf("object/store/chain: read bytes content: %w", err) } - return typ.TypeUnknown, nil, store.ErrObjectNotFound + return typ.Unknown, nil, store.ErrObjectNotFound } diff --git a/object/store/chain/header.go b/object/store/chain/header.go index 2efd16a4..3a5ad815 100644 --- a/object/store/chain/header.go +++ b/object/store/chain/header.go @@ -11,7 +11,7 @@ import ( // ReadHeader reads object header data // from the first backend that has it. -func (chain *Chain) ReadHeader(id id.ObjectID) (typ.Type, uint64, error) { +func (chain *Chain) ReadHeader(id id.ObjectID) (typ.Type, int, error) { for _, backend := range chain.backends { ty, size, err := backend.ReadHeader(id) if err == nil { @@ -22,15 +22,15 @@ func (chain *Chain) ReadHeader(id id.ObjectID) (typ.Type, uint64, error) { continue } - return typ.TypeUnknown, 0, fmt.Errorf("object/store/chain: read header: %w", err) + return typ.Unknown, 0, fmt.Errorf("object/store/chain: read header: %w", err) } - return typ.TypeUnknown, 0, store.ErrObjectNotFound + return typ.Unknown, 0, store.ErrObjectNotFound } // ReadSize reads object content length // from the first backend that has it. -func (chain *Chain) ReadSize(id id.ObjectID) (uint64, error) { +func (chain *Chain) ReadSize(id id.ObjectID) (int, error) { for _, backend := range chain.backends { size, err := backend.ReadSize(id) if err == nil { diff --git a/object/store/chain/reader.go b/object/store/chain/reader.go index 2e0f317e..e7f07c33 100644 --- a/object/store/chain/reader.go +++ b/object/store/chain/reader.go @@ -31,7 +31,7 @@ func (chain *Chain) ReadReaderFull(id id.ObjectID) (io.ReadCloser, error) { // ReadReaderContent reads an object's type, declared content length, // and content stream from the first backend that has it. -func (chain *Chain) ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.ReadCloser, error) { +func (chain *Chain) ReadReaderContent(id id.ObjectID) (typ.Type, int, io.ReadCloser, error) { for _, backend := range chain.backends { ty, size, reader, err := backend.ReadReaderContent(id) if err == nil { @@ -42,8 +42,8 @@ func (chain *Chain) ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.Read continue } - return typ.TypeUnknown, 0, nil, fmt.Errorf("object/store/chain: read reader content: %w", err) + return typ.Unknown, 0, nil, fmt.Errorf("object/store/chain: read reader content: %w", err) } - return typ.TypeUnknown, 0, nil, store.ErrObjectNotFound + return typ.Unknown, 0, nil, store.ErrObjectNotFound } diff --git a/object/store/dual/quarantine.go b/object/store/dual/quarantine.go index eb1fca21..b73e48fe 100644 --- a/object/store/dual/quarantine.go +++ b/object/store/dual/quarantine.go @@ -79,15 +79,15 @@ func (quarantine *coordinatedQuarantine) ReadReaderFull(id id.ObjectID) (io.Read return quarantine.reader.ReadReaderFull(id) //nolint:wrapcheck } -func (quarantine *coordinatedQuarantine) ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.ReadCloser, error) { +func (quarantine *coordinatedQuarantine) ReadReaderContent(id id.ObjectID) (typ.Type, int, io.ReadCloser, error) { return quarantine.reader.ReadReaderContent(id) //nolint:wrapcheck } -func (quarantine *coordinatedQuarantine) ReadSize(id id.ObjectID) (uint64, error) { +func (quarantine *coordinatedQuarantine) ReadSize(id id.ObjectID) (int, error) { return quarantine.reader.ReadSize(id) //nolint:wrapcheck } -func (quarantine *coordinatedQuarantine) ReadHeader(id id.ObjectID) (typ.Type, uint64, error) { +func (quarantine *coordinatedQuarantine) ReadHeader(id id.ObjectID) (typ.Type, int, error) { return quarantine.reader.ReadHeader(id) //nolint:wrapcheck } @@ -107,7 +107,7 @@ func (quarantine *coordinatedQuarantine) WriteReaderFull(src io.Reader) (id.Obje return quarantine.objectQ.WriteReaderFull(src) //nolint:wrapcheck } -func (quarantine *coordinatedQuarantine) WriteReaderContent(ty typ.Type, size uint64, src io.Reader) (id.ObjectID, error) { +func (quarantine *coordinatedQuarantine) WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.ObjectID, error) { return quarantine.objectQ.WriteReaderContent(ty, size, src) //nolint:wrapcheck } diff --git a/object/store/dual/reader.go b/object/store/dual/reader.go index a51cfbd0..7e5c8d6b 100644 --- a/object/store/dual/reader.go +++ b/object/store/dual/reader.go @@ -24,17 +24,17 @@ func (dual *Dual) ReadReaderFull(id id.ObjectID) (io.ReadCloser, error) { // ReadReaderContent reads an object's type, declared content length, // and content stream from the combined view. -func (dual *Dual) ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.ReadCloser, error) { +func (dual *Dual) ReadReaderContent(id id.ObjectID) (typ.Type, int, io.ReadCloser, error) { return dual.reader.ReadReaderContent(id) //nolint:wrapcheck } // ReadSize reads an object's declared content length from the combined view. -func (dual *Dual) ReadSize(id id.ObjectID) (uint64, error) { +func (dual *Dual) ReadSize(id id.ObjectID) (int, error) { return dual.reader.ReadSize(id) //nolint:wrapcheck } // ReadHeader reads an object's type and declared content length from the combined view. -func (dual *Dual) ReadHeader(id id.ObjectID) (typ.Type, uint64, error) { +func (dual *Dual) ReadHeader(id id.ObjectID) (typ.Type, int, error) { return dual.reader.ReadHeader(id) //nolint:wrapcheck } diff --git a/object/store/dual/writer.go b/object/store/dual/writer.go index 5961e7c7..f75f49e1 100644 --- a/object/store/dual/writer.go +++ b/object/store/dual/writer.go @@ -24,7 +24,7 @@ func (dual *Dual) WriteReaderFull(src io.Reader) (id.ObjectID, error) { } // WriteReaderContent writes one typed object content stream to the object side. -func (dual *Dual) WriteReaderContent(ty typ.Type, size uint64, src io.Reader) (id.ObjectID, error) { +func (dual *Dual) WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.ObjectID, error) { return dual.object.WriteReaderContent(ty, size, src) //nolint:wrapcheck } diff --git a/object/store/loose/doc.go b/object/store/loose/doc.go new file mode 100644 index 00000000..ce6a0269 --- /dev/null +++ b/object/store/loose/doc.go @@ -0,0 +1,2 @@ +// Package loose provides a loose object backend (objects/XX/YYYYY..). +package loose diff --git a/object/store/loose/helpers_test.go b/object/store/loose/helpers_test.go new file mode 100644 index 00000000..d1e9a50d --- /dev/null +++ b/object/store/loose/helpers_test.go @@ -0,0 +1,137 @@ +package loose_test + +import ( + "os" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store/loose" + "lindenii.org/go/furgit/object/typ" +) + +// gitOracleObject is one object created by git, +// paired with its expected content body and full serialized form. +type gitOracleObject struct { + name string + ty typ.Type + id id.ObjectID + body []byte + raw []byte +} + +// openLooseStore opens a loose store over the repository's objects directory. +func openLooseStore(t *testing.T, repo *testgit.Repo) *loose.Loose { + t.Helper() + + repoRoot := repo.Root(t) + + objectsRoot, err := repoRoot.OpenRoot(".git/objects") + if err != nil { + _ = repoRoot.Close() + + t.Fatalf("OpenRoot(.git/objects): %v", err) + } + + _ = repoRoot.Close() + + t.Cleanup(func() { _ = objectsRoot.Close() }) + + looseStore, err := loose.New(objectsRoot, repo.ObjectFormat(t)) + if err != nil { + t.Fatalf("loose.New: %v", err) + } + + return looseStore +} + +// gitOracleObjects seeds the repository with history +// and precomputes every seeded object's +// expected content body and full serialized form. +func gitOracleObjects(t *testing.T, repo *testgit.Repo) []gitOracleObject { + t.Helper() + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + groups := []struct { + name string + ty typ.Type + oids []id.ObjectID + }{ + {name: "blob", ty: typ.Blob, oids: seeded.Blobs}, + {name: "tree", ty: typ.Tree, oids: seeded.Trees}, + {name: "commit", ty: typ.Commit, oids: seeded.Commits}, + {name: "tag", ty: typ.Tag, oids: seeded.Tags}, + } + + objects := make([]gitOracleObject, 0, len(seeded.All())) + + for _, group := range groups { + for _, oid := range group.oids { + body, err := repo.CatFile(t, group.ty, oid) + if err != nil { + t.Fatalf("CatFile(%s %s): %v", group.name, oid, err) + } + + raw := header.Append(nil, group.ty, len(body)) + raw = append(raw, body...) + + objects = append(objects, gitOracleObject{ + name: group.name + " " + oid.String(), + ty: group.ty, + id: oid, + body: body, + raw: raw, + }) + } + } + + return objects +} + +// corruptLooseObjectTrailer flips the final byte of a loose object file, +// damaging the zlib Adler-32 trailer. +func corruptLooseObjectTrailer(t *testing.T, repo *testgit.Repo, objectID id.ObjectID) { + t.Helper() + + root := repo.Root(t) + + defer func() { _ = root.Close() }() + + hex := objectID.String() + relPath := ".git/objects/" + hex[:2] + "/" + hex[2:] + + file, err := root.OpenFile(relPath, os.O_RDWR, 0) + if err != nil { + t.Fatalf("OpenFile(%q): %v", relPath, err) + } + + defer func() { _ = file.Close() }() + + info, err := file.Stat() + if err != nil { + t.Fatalf("Stat(%q): %v", relPath, err) + } + + if info.Size() == 0 { + t.Fatalf("corrupt trailer on empty file %q", relPath) + } + + last := make([]byte, 1) + + _, err = file.ReadAt(last, info.Size()-1) + if err != nil { + t.Fatalf("ReadAt(%q): %v", relPath, err) + } + + last[0] ^= 0xff + + _, err = file.WriteAt(last, info.Size()-1) + if err != nil { + t.Fatalf("WriteAt(%q): %v", relPath, err) + } +} diff --git a/object/store/loose/loose.go b/object/store/loose/loose.go new file mode 100644 index 00000000..02a63df1 --- /dev/null +++ b/object/store/loose/loose.go @@ -0,0 +1,62 @@ +package loose + +import ( + "fmt" + "os" + "path/filepath" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +// Loose reads loose Git objects from an objects directory root. +// +// Loose objects are zlib streams whose trailer uses Adler-32. +// Which reads consume enough of the stream +// to reach and verify that trailer +// is documented on the individual methods. +// +// Labels: Close-Caller. +type Loose struct { + // root is the objects directory capability used for all object file access. + // Object files are opened by relative paths like "<first2>/<rest>". + // Loose borrows this root. + root *os.Root + // objectFormat is the expected object format for lookups. + objectFormat id.ObjectFormat +} + +var ( + _ store.ObjectReader = (*Loose)(nil) + _ store.ObjectWriter = (*Loose)(nil) +) + +// New creates a loose-object store rooted at an objects directory for objectFormat. +// +// Labels: Deps-Borrowed, Life-Parent. +func New(root *os.Root, objectFormat id.ObjectFormat) (*Loose, error) { + if objectFormat.Size() == 0 { + return nil, id.ErrInvalidObjectFormat + } + + return &Loose{ + root: root, + objectFormat: objectFormat, + }, nil +} + +// Close releases resources associated with the backend. +// +// Labels: MT-Unsafe. +func (loose *Loose) Close() error { return nil } + +// objectPath returns the loose object path for objectID relative to the objects root. +func (loose *Loose) objectPath(objectID id.ObjectID) (string, error) { + if objectID.ObjectFormat() != loose.objectFormat { + return "", fmt.Errorf("%w: got %s want %s", id.ErrInvalidObjectFormat, objectID.ObjectFormat(), loose.objectFormat) + } + + hex := objectID.String() + + return filepath.Join(hex[:2], hex[2:]), nil +} diff --git a/object/store/loose/loose_test.go b/object/store/loose/loose_test.go new file mode 100644 index 00000000..d55c87c5 --- /dev/null +++ b/object/store/loose/loose_test.go @@ -0,0 +1,26 @@ +package loose_test + +import ( + "errors" + "os" + "testing" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store/loose" +) + +func TestNewRejectsUnknownObjectFormat(t *testing.T) { + t.Parallel() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + defer func() { _ = root.Close() }() + + _, err = loose.New(root, id.ObjectFormatUnknown) + if !errors.Is(err, id.ErrInvalidObjectFormat) { + t.Fatalf("loose.New(unknown) = %v, want ErrInvalidObjectFormat", err) + } +} diff --git a/object/store/loose/parse.go b/object/store/loose/parse.go new file mode 100644 index 00000000..c3af6159 --- /dev/null +++ b/object/store/loose/parse.go @@ -0,0 +1,61 @@ +package loose + +import ( + "bufio" + "fmt" + "io" + "os" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" +) + +// decodeAll inflates the full loose object payload from file. +func decodeAll(file *os.File) ([]byte, error) { + zr, err := zlib.NewReader(file) + if err != nil { + return nil, fmt.Errorf("object/store/loose: %w", err) + } + + defer func() { _ = zr.Close() }() + + data, err := io.ReadAll(zr) + if err != nil { + return nil, fmt.Errorf("object/store/loose: %w", err) + } + + return data, nil +} + +// parseRaw parses a loose object payload in "type size\x00content" format. +func parseRaw(raw []byte) (typ.Type, []byte, error) { + ty, size, consumed, err := header.Parse(raw) + if err != nil { + return typ.Unknown, nil, fmt.Errorf("%w: %w", store.ErrInvalidObject, err) + } + + content := raw[consumed:] + if len(content) != size { + return typ.Unknown, nil, fmt.Errorf("%w: header size/content mismatch", store.ErrInvalidObject) + } + + return ty, content, nil +} + +// readHeader reads and parses a loose object header from br, +// and returns the raw header bytes including the trailing NUL. +func readHeader(br *bufio.Reader) ([]byte, typ.Type, int, error) { + headerBytes, err := br.ReadSlice(0) + if err != nil { + return nil, typ.Unknown, 0, fmt.Errorf("object/store/loose: %w", err) + } + + ty, size, _, err := header.Parse(headerBytes) + if err != nil { + return nil, typ.Unknown, 0, fmt.Errorf("%w: %w", store.ErrInvalidObject, err) + } + + return headerBytes, ty, size, nil +} diff --git a/object/store/loose/quarantine.go b/object/store/loose/quarantine.go new file mode 100644 index 00000000..214f7219 --- /dev/null +++ b/object/store/loose/quarantine.go @@ -0,0 +1,203 @@ +package loose + +import ( + "crypto/rand" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + + "lindenii.org/go/furgit/object/store" +) + +var ( + _ store.ObjectQuarantiner = (*Loose)(nil) + _ store.ObjectQuarantine = (*objectQuarantine)(nil) +) + +// objectQuarantine is one quarantined loose store +// rooted privately beneath a destination loose root. +type objectQuarantine struct { + *Loose + + parent *Loose + tempName string + tempRoot *os.Root +} + +// BeginObjectQuarantine creates one quarantined loose store rooted privately +// beneath the destination loose root. +// +// Labels: Deps-Borrowed, Life-Parent, Close-No. +func (loose *Loose) BeginObjectQuarantine(_ store.ObjectQuarantineOptions) (store.ObjectQuarantine, error) { //nolint:ireturn + tempName, tempRoot, err := createLooseQuarantineRoot(loose.root) + if err != nil { + return nil, err + } + + quarantineStore, err := New(tempRoot, loose.objectFormat) + if err != nil { + _ = tempRoot.Close() + _ = loose.root.RemoveAll(tempName) + + return nil, err + } + + return &objectQuarantine{ + Loose: quarantineStore, + parent: loose, + tempName: tempName, + tempRoot: tempRoot, + }, nil +} + +// Discard removes the quarantine and invalidates the receiver. +func (quarantine *objectQuarantine) Discard() error { + closeErr := quarantine.Close() + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + if closeErr != nil { + return closeErr + } + + if tempRootErr != nil { + return fmt.Errorf("object/store/loose: %w", tempRootErr) + } + + if removeErr != nil { + return fmt.Errorf("object/store/loose: %w", removeErr) + } + + return nil +} + +// Promote publishes all quarantined loose objects into the parent loose store +// and invalidates the receiver. +func (quarantine *objectQuarantine) Promote() error { + closeErr := quarantine.Close() + promoteErr := promoteLooseQuarantine(quarantine.parent, quarantine.tempName, quarantine.tempRoot) + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + if closeErr != nil { + return closeErr + } + + if promoteErr != nil { + return promoteErr + } + + if tempRootErr != nil { + return fmt.Errorf("object/store/loose: %w", tempRootErr) + } + + if removeErr != nil { + return fmt.Errorf("object/store/loose: %w", removeErr) + } + + return nil +} + +func createLooseQuarantineRoot(parent *os.Root) (string, *os.Root, error) { + var lastErr error + + for range 32 { + name := "tmp_looseq_" + rand.Text() + + err := parent.Mkdir(name, 0o700) + if err == nil { + root, err := parent.OpenRoot(name) + if err == nil { + return name, root, nil + } + + _ = parent.RemoveAll(name) + + return "", nil, fmt.Errorf("object/store/loose: %w", err) + } + + lastErr = err + + if errors.Is(err, fs.ErrExist) { + continue + } + + return "", nil, fmt.Errorf("object/store/loose: %w", err) + } + + return "", nil, fmt.Errorf("object/store/loose: failed to create quarantine directory: %w", lastErr) +} + +func promoteLooseQuarantine(parent *Loose, tempName string, tempRoot *os.Root) error { + entries, err := fs.ReadDir(tempRoot.FS(), ".") + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("object/store/loose: %w", err) + } + + for _, entry := range entries { + if !entry.IsDir() { + return fmt.Errorf("%w: quarantine contains unexpected file %q", store.ErrInvalidObject, entry.Name()) + } + + err := promoteLooseQuarantineShard(parent, tempName, tempRoot, entry.Name()) + if err != nil { + return err + } + } + + return nil +} + +func promoteLooseQuarantineShard(parent *Loose, tempName string, tempRoot *os.Root, shard string) error { + entries, err := fs.ReadDir(tempRoot.FS(), shard) + if err != nil { + return fmt.Errorf("object/store/loose: %w", err) + } + + for _, entry := range entries { + if entry.IsDir() { + return fmt.Errorf("%w: quarantine shard %q contains unexpected directory %q", store.ErrInvalidObject, shard, entry.Name()) + } + + objectID, err := parent.objectFormat.FromString(shard + entry.Name()) + if err != nil { + return fmt.Errorf("%w: quarantine shard %q contains invalid object %q: %w", store.ErrInvalidObject, shard, entry.Name(), err) + } + + dst, err := parent.objectPath(objectID) + if err != nil { + return err + } + + err = parent.root.MkdirAll(shard, 0o755) + if err != nil { + return fmt.Errorf("object/store/loose: %w", err) + } + + err = promoteLooseQuarantineObject(parent.root, filepath.Join(tempName, shard, entry.Name()), dst) + if err != nil { + return err + } + } + + return nil +} + +func promoteLooseQuarantineObject(root *os.Root, src, dst string) error { + err := root.Link(src, dst) + if err == nil { + _ = root.Remove(src) + + return nil + } + + if errors.Is(err, fs.ErrExist) { + _ = root.Remove(src) + + return nil + } + + return fmt.Errorf("object/store/loose: promote quarantine %q -> %q: %w", src, dst, err) +} diff --git a/object/store/loose/quarantine_test.go b/object/store/loose/quarantine_test.go new file mode 100644 index 00000000..148d4221 --- /dev/null +++ b/object/store/loose/quarantine_test.go @@ -0,0 +1,116 @@ +package loose_test + +import ( + "bytes" + "errors" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" +) + +func TestQuarantinePromote(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + looseStore := openLooseStore(t, repo) + + quarantine, err := looseStore.BeginObjectQuarantine(store.ObjectQuarantineOptions{}) + if err != nil { + t.Fatalf("BeginObjectQuarantine: %v", err) + } + + content := []byte("quarantined object\n") + + objectID, err := quarantine.WriteBytesContent(typ.Blob, content) + if err != nil { + t.Fatalf("quarantine.WriteBytesContent: %v", err) + } + + ty, got, err := quarantine.ReadBytesContent(objectID) + if err != nil { + t.Fatalf("quarantine.ReadBytesContent: %v", err) + } + + if ty != typ.Blob { + t.Fatalf("quarantine type = %v, want %v", ty, typ.Blob) + } + + if !bytes.Equal(got, content) { + t.Fatalf("quarantine body mismatch") + } + + _, _, err = looseStore.ReadBytesContent(objectID) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("parent saw quarantined object before promote: %v", err) + } + + err = quarantine.Promote() + if err != nil { + t.Fatalf("Promote: %v", err) + } + + ty, got, err = looseStore.ReadBytesContent(objectID) + if err != nil { + t.Fatalf("parent ReadBytesContent after promote: %v", err) + } + + if ty != typ.Blob { + t.Fatalf("parent type = %v, want %v", ty, typ.Blob) + } + + if !bytes.Equal(got, content) { + t.Fatalf("parent body mismatch") + } + }) + } +} + +func TestQuarantineDiscard(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + looseStore := openLooseStore(t, repo) + + quarantine, err := looseStore.BeginObjectQuarantine(store.ObjectQuarantineOptions{}) + if err != nil { + t.Fatalf("BeginObjectQuarantine: %v", err) + } + + content := []byte("discarded object\n") + + objectID, err := quarantine.WriteBytesContent(typ.Blob, content) + if err != nil { + t.Fatalf("quarantine.WriteBytesContent: %v", err) + } + + err = quarantine.Discard() + if err != nil { + t.Fatalf("Discard: %v", err) + } + + _, _, err = looseStore.ReadBytesContent(objectID) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("parent saw discarded object: %v", err) + } + }) + } +} diff --git a/object/store/loose/read_test.go b/object/store/loose/read_test.go new file mode 100644 index 00000000..d8fdad1a --- /dev/null +++ b/object/store/loose/read_test.go @@ -0,0 +1,264 @@ +package loose_test + +import ( + "bytes" + "errors" + "io" + "strings" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" +) + +func TestRead(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + objects := gitOracleObjects(t, repo) + looseStore := openLooseStore(t, repo) + + t.Run("BytesFull", func(t *testing.T) { + t.Parallel() + + for _, o := range objects { + got, err := looseStore.ReadBytesFull(o.id) + if err != nil { + t.Fatalf("%s: ReadBytesFull: %v", o.name, err) + } + + if !bytes.Equal(got, o.raw) { + t.Fatalf("%s: ReadBytesFull mismatch", o.name) + } + } + }) + + t.Run("BytesContent", func(t *testing.T) { + t.Parallel() + + for _, o := range objects { + gotType, gotBody, err := looseStore.ReadBytesContent(o.id) + if err != nil { + t.Fatalf("%s: ReadBytesContent: %v", o.name, err) + } + + if gotType != o.ty { + t.Fatalf("%s: ReadBytesContent type = %v, want %v", o.name, gotType, o.ty) + } + + if !bytes.Equal(gotBody, o.body) { + t.Fatalf("%s: ReadBytesContent body mismatch", o.name) + } + } + }) + + t.Run("Header", func(t *testing.T) { + t.Parallel() + + for _, o := range objects { + gotType, gotSize, err := looseStore.ReadHeader(o.id) + if err != nil { + t.Fatalf("%s: ReadHeader: %v", o.name, err) + } + + if gotType != o.ty { + t.Fatalf("%s: ReadHeader type = %v, want %v", o.name, gotType, o.ty) + } + + if gotSize != len(o.body) { + t.Fatalf("%s: ReadHeader size = %d, want %d", o.name, gotSize, len(o.body)) + } + } + }) + + t.Run("ReaderFull", func(t *testing.T) { + t.Parallel() + + for _, o := range objects { + reader, err := looseStore.ReadReaderFull(o.id) + if err != nil { + t.Fatalf("%s: ReadReaderFull: %v", o.name, err) + } + + got, err := io.ReadAll(reader) + if err != nil { + _ = reader.Close() + + t.Fatalf("%s: ReadReaderFull ReadAll: %v", o.name, err) + } + + err = reader.Close() + if err != nil { + t.Fatalf("%s: ReadReaderFull Close: %v", o.name, err) + } + + if !bytes.Equal(got, o.raw) { + t.Fatalf("%s: ReadReaderFull mismatch", o.name) + } + } + }) + + t.Run("ReaderContent", func(t *testing.T) { + t.Parallel() + + for _, o := range objects { + gotType, gotSize, reader, err := looseStore.ReadReaderContent(o.id) + if err != nil { + t.Fatalf("%s: ReadReaderContent: %v", o.name, err) + } + + got, err := io.ReadAll(reader) + if err != nil { + _ = reader.Close() + + t.Fatalf("%s: ReadReaderContent ReadAll: %v", o.name, err) + } + + err = reader.Close() + if err != nil { + t.Fatalf("%s: ReadReaderContent Close: %v", o.name, err) + } + + if gotType != o.ty { + t.Fatalf("%s: ReadReaderContent type = %v, want %v", o.name, gotType, o.ty) + } + + if gotSize != len(o.body) { + t.Fatalf("%s: ReadReaderContent size = %d, want %d", o.name, gotSize, len(o.body)) + } + + if !bytes.Equal(got, o.body) { + t.Fatalf("%s: ReadReaderContent mismatch", o.name) + } + } + }) + }) + } +} + +func TestReadNotFound(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + looseStore := openLooseStore(t, repo) + + missingID, err := objectFormat.FromString(strings.Repeat("0", objectFormat.HexLen())) + if err != nil { + t.Fatalf("FromString(missing): %v", err) + } + + _, err = looseStore.ReadBytesFull(missingID) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("ReadBytesFull not-found = %v", err) + } + + _, _, err = looseStore.ReadBytesContent(missingID) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("ReadBytesContent not-found = %v", err) + } + + _, _, err = looseStore.ReadHeader(missingID) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("ReadHeader not-found = %v", err) + } + + _, err = looseStore.ReadReaderFull(missingID) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("ReadReaderFull not-found = %v", err) + } + + _, _, _, err = looseStore.ReadReaderContent(missingID) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("ReadReaderContent not-found = %v", err) + } + + otherFormat := objectFormat + + for _, candidate := range id.SupportedObjectFormats() { + if candidate != objectFormat { + otherFormat = candidate + + break + } + } + + if otherFormat == objectFormat { + return + } + + mismatchID, err := otherFormat.FromString(strings.Repeat("1", otherFormat.HexLen())) + if err != nil { + t.Fatalf("FromString(mismatch): %v", err) + } + + _, err = looseStore.ReadBytesFull(mismatchID) + if !errors.Is(err, id.ErrInvalidObjectFormat) { + t.Fatalf("ReadBytesFull format mismatch = %v, want ErrInvalidObjectFormat", err) + } + }) + } +} + +func TestReadCorruptTrailer(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + looseStore := openLooseStore(t, repo) + + content := []byte("corrupt-trailer-check\n") + + objectID, err := looseStore.WriteBytesContent(typ.Blob, content) + if err != nil { + t.Fatalf("WriteBytesContent: %v", err) + } + + corruptLooseObjectTrailer(t, repo, objectID) + + // Stops before the trailer. + ty, size, err := looseStore.ReadHeader(objectID) + if err != nil { + t.Fatalf("ReadHeader: %v", err) + } + + if ty != typ.Blob { + t.Fatalf("ReadHeader type = %v, want %v", ty, typ.Blob) + } + + if size != len(content) { + t.Fatalf("ReadHeader size = %d, want %d", size, len(content)) + } + + // Consumes the whole stream. + _, err = looseStore.ReadBytesFull(objectID) + if err == nil { + t.Fatalf("ReadBytesFull on corrupt trailer succeeded") + } + }) + } +} diff --git a/object/store/loose/reader.go b/object/store/loose/reader.go new file mode 100644 index 00000000..2f26efe5 --- /dev/null +++ b/object/store/loose/reader.go @@ -0,0 +1,239 @@ +package loose + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "io/fs" + "os" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/iolimit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" +) + +// ReadBytesFull reads a full serialized object as "type size\x00content". +// +// It inflates and parses the full loose object, +// including verifying the zlib Adler-32 trailer. +func (loose *Loose) ReadBytesFull(objectID id.ObjectID) ([]byte, error) { + raw, _, _, err := loose.readBytesParsed(objectID) + if err != nil { + return nil, err + } + + return raw, nil +} + +// ReadBytesContent reads an object's type and content bytes. +// +// Like ReadBytesFull, +// it inflates and parses the full loose object, +// including verifying the zlib Adler-32 trailer. +func (loose *Loose) ReadBytesContent(objectID id.ObjectID) (typ.Type, []byte, error) { + _, ty, content, err := loose.readBytesParsed(objectID) + if err != nil { + return typ.Unknown, nil, err + } + + return ty, content, nil +} + +// ReadHeader reads an object's type and declared content length. +// +// It parses only enough of the zlib-decoded object to recover the object header. +// It does not verify that the remaining object content is readable +// and does not verify the zlib Adler-32 trailer. +func (loose *Loose) ReadHeader(objectID id.ObjectID) (typ.Type, int, error) { + file, err := loose.openObject(objectID) + if err != nil { + return typ.Unknown, 0, err + } + + defer func() { _ = file.Close() }() + + zr, err := zlib.NewReader(file) + if err != nil { + return typ.Unknown, 0, fmt.Errorf("object/store/loose: %w", err) + } + + defer func() { _ = zr.Close() }() + + _, ty, size, err := readHeader(bufio.NewReader(zr)) + if err != nil { + return typ.Unknown, 0, err + } + + return ty, size, nil +} + +// ReadSize reads an object's declared content length. +// +// Like ReadHeader, +// it parses only enough of the zlib-decoded object to recover the header +// and does not verify the zlib Adler-32 trailer. +func (loose *Loose) ReadSize(objectID id.ObjectID) (int, error) { + _, size, err := loose.ReadHeader(objectID) + + return size, err +} + +// ReadReaderFull reads a full serialized object stream as "type size\x00content". +// +// Close releases resources only. +// It does not drain unread data for additional validation. +// In particular, +// malformed trailing compressed data, +// trailing bytes past the declared object size, +// and the zlib Adler-32 trailer +// may go unverified unless the caller reads to io.EOF. +func (loose *Loose) ReadReaderFull(objectID id.ObjectID) (io.ReadCloser, error) { + file, zr, err := loose.openInflated(objectID) + if err != nil { + return nil, err + } + + br := bufio.NewReader(zr) + + headerBytes, _, size, err := readHeader(br) + if err != nil { + _ = zr.Close() + _ = file.Close() + + return nil, err + } + + return &objectReader{ + reader: io.MultiReader( + bytes.NewReader(headerBytes), + iolimit.ExpectLengthReader(br, size), + ), + file: file, + zr: zr, + }, nil +} + +// ReadReaderContent reads an object's type, declared content length, +// and content stream. +// +// Close releases resources only. +// It does not drain unread data for additional validation. +// In particular, +// malformed trailing compressed data, +// trailing bytes past the declared object size, +// and the zlib Adler-32 trailer +// may go unverified unless the caller reads to io.EOF. +func (loose *Loose) ReadReaderContent(objectID id.ObjectID) (typ.Type, int, io.ReadCloser, error) { + file, zr, err := loose.openInflated(objectID) + if err != nil { + return typ.Unknown, 0, nil, err + } + + br := bufio.NewReader(zr) + + _, ty, size, err := readHeader(br) + if err != nil { + _ = zr.Close() + _ = file.Close() + + return typ.Unknown, 0, nil, err + } + + return ty, size, &objectReader{ + reader: iolimit.ExpectLengthReader(br, size), + file: file, + zr: zr, + }, nil +} + +// Refresh is a no-op for loose object stores. +func (loose *Loose) Refresh() error { + return nil +} + +// openObject opens the loose object file for objectID. +// Missing files cause store.ErrObjectNotFound. +func (loose *Loose) openObject(objectID id.ObjectID) (*os.File, error) { + relPath, err := loose.objectPath(objectID) + if err != nil { + return nil, err + } + + file, err := loose.root.Open(relPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil, store.ErrObjectNotFound + } + + return nil, fmt.Errorf("object/store/loose: %w", err) + } + + return file, nil +} + +// readBytesParsed reads, inflates, and parses a loose object in one pass. +// It returns the full raw payload and its parsed type and content. +func (loose *Loose) readBytesParsed(objectID id.ObjectID) ([]byte, typ.Type, []byte, error) { + file, err := loose.openObject(objectID) + if err != nil { + return nil, typ.Unknown, nil, err + } + + defer func() { _ = file.Close() }() + + raw, err := decodeAll(file) + if err != nil { + return nil, typ.Unknown, nil, err + } + + ty, content, err := parseRaw(raw) + if err != nil { + return nil, typ.Unknown, nil, err + } + + return raw, ty, content, nil +} + +// openInflated opens and zlib-decodes a loose object file. +// The caller owns both returned closers and must close them. +func (loose *Loose) openInflated(objectID id.ObjectID) (*os.File, io.ReadCloser, error) { + file, err := loose.openObject(objectID) + if err != nil { + return nil, nil, err + } + + zr, err := zlib.NewReader(file) + if err != nil { + _ = file.Close() + + return nil, nil, fmt.Errorf("object/store/loose: %w", err) + } + + return file, zr, nil +} + +// objectReader streams one inflated loose object +// and owns the underlying file and zlib decoder. +type objectReader struct { + // reader is the stream exposed by Read. + reader io.Reader + // file is the underlying loose object file and is closed by Close. + file *os.File + // zr is the zlib decoder and is closed by Close. + zr io.ReadCloser +} + +func (reader *objectReader) Read(dst []byte) (int, error) { + return reader.reader.Read(dst) +} + +func (reader *objectReader) Close() error { + errZlib := reader.zr.Close() + errFile := reader.file.Close() + + return errors.Join(errZlib, errFile) +} diff --git a/object/store/loose/roundtrip_test.go b/object/store/loose/roundtrip_test.go new file mode 100644 index 00000000..cc989b5b --- /dev/null +++ b/object/store/loose/roundtrip_test.go @@ -0,0 +1,138 @@ +package loose_test + +import ( + "bytes" + "io" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/typ" +) + +func TestRoundTrip(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + ty typ.Type + content []byte + }{ + {name: "blob", ty: typ.Blob, content: []byte("roundtrip blob\n")}, + {name: "empty blob", ty: typ.Blob, content: []byte{}}, + {name: "tree", ty: typ.Tree, content: []byte("roundtrip tree bytes")}, + {name: "commit", ty: typ.Commit, content: []byte("roundtrip commit bytes")}, + {name: "tag", ty: typ.Tag, content: []byte("roundtrip tag bytes")}, + } + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + looseStore := openLooseStore(t, repo) + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + wantRaw := header.Append(nil, tc.ty, len(tc.content)) + wantRaw = append(wantRaw, tc.content...) + + objectID, err := looseStore.WriteBytesContent(tc.ty, tc.content) + if err != nil { + t.Fatalf("WriteBytesContent: %v", err) + } + + gotRaw, err := looseStore.ReadBytesFull(objectID) + if err != nil { + t.Fatalf("ReadBytesFull: %v", err) + } + + if !bytes.Equal(gotRaw, wantRaw) { + t.Fatalf("ReadBytesFull mismatch") + } + + gotType, gotBody, err := looseStore.ReadBytesContent(objectID) + if err != nil { + t.Fatalf("ReadBytesContent: %v", err) + } + + if gotType != tc.ty { + t.Fatalf("ReadBytesContent type = %v, want %v", gotType, tc.ty) + } + + if !bytes.Equal(gotBody, tc.content) { + t.Fatalf("ReadBytesContent body mismatch") + } + + headType, headSize, err := looseStore.ReadHeader(objectID) + if err != nil { + t.Fatalf("ReadHeader: %v", err) + } + + if headType != tc.ty { + t.Fatalf("ReadHeader type = %v, want %v", headType, tc.ty) + } + + if headSize != len(tc.content) { + t.Fatalf("ReadHeader size = %d, want %d", headSize, len(tc.content)) + } + + fullReader, err := looseStore.ReadReaderFull(objectID) + if err != nil { + t.Fatalf("ReadReaderFull: %v", err) + } + + gotFull, err := io.ReadAll(fullReader) + if err != nil { + _ = fullReader.Close() + + t.Fatalf("ReadReaderFull ReadAll: %v", err) + } + + err = fullReader.Close() + if err != nil { + t.Fatalf("ReadReaderFull Close: %v", err) + } + + if !bytes.Equal(gotFull, wantRaw) { + t.Fatalf("ReadReaderFull mismatch") + } + + contentType, contentSize, contentReader, err := looseStore.ReadReaderContent(objectID) + if err != nil { + t.Fatalf("ReadReaderContent: %v", err) + } + + gotContent, err := io.ReadAll(contentReader) + if err != nil { + _ = contentReader.Close() + + t.Fatalf("ReadReaderContent ReadAll: %v", err) + } + + err = contentReader.Close() + if err != nil { + t.Fatalf("ReadReaderContent Close: %v", err) + } + + if contentType != tc.ty { + t.Fatalf("ReadReaderContent type = %v, want %v", contentType, tc.ty) + } + + if contentSize != len(tc.content) { + t.Fatalf("ReadReaderContent size = %d, want %d", contentSize, len(tc.content)) + } + + if !bytes.Equal(gotContent, tc.content) { + t.Fatalf("ReadReaderContent mismatch") + } + }) + } + }) + } +} diff --git a/object/store/loose/streamwriter.go b/object/store/loose/streamwriter.go new file mode 100644 index 00000000..bdf78155 --- /dev/null +++ b/object/store/loose/streamwriter.go @@ -0,0 +1,261 @@ +package loose + +import ( + "bytes" + "crypto/rand" + "errors" + "fmt" + "hash" + "io/fs" + "os" + "path/filepath" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +const tempObjectFilePrefix = "tmp_obj_" + +// streamWriter incrementally hashes and deflates an object into a temp file. +// finalize validates size accounting and atomically renames the temp file. +type streamWriter struct { + // loose owns path and root operations used by this write session. + loose *Loose + // file is the temporary destination file under objects/. + file *os.File + // zw compresses raw object bytes into file. + zw *zlib.Writer + // hash receives the same raw bytes used to compute the resulting object ID. + hash hash.Hash + + // tmpRelPath is the relative path of file under the objects root. + tmpRelPath string + + // fullMode selects full-object input ("type size\x00content") + // as opposed to content-only input. + fullMode bool + + // headerBuf accumulates header bytes while fullMode parses up to the first NUL. + headerBuf []byte + // headerDone reports whether the full-object header has been parsed. + headerDone bool + // expectedContentLeft tracks remaining declared content bytes. + expectedContentLeft int + + closed bool + finalized bool +} + +// Write validates and writes raw bytes into the stream. +// In full mode, it parses and enforces the streamed header-declared content size. +func (writer *streamWriter) Write(src []byte) (int, error) { + if writer.finalized { + return 0, fmt.Errorf("%w: write after finalize", store.ErrInvalidObject) + } + + if writer.closed { + return 0, fmt.Errorf("%w: write after close", store.ErrInvalidObject) + } + + if writer.fullMode { + err := writer.acceptFull(src) + if err != nil { + return 0, err + } + } else { + err := writer.acceptContent(len(src)) + if err != nil { + return 0, err + } + } + + err := writer.writeRawChunk(src) + if err != nil { + return 0, err + } + + return len(src), nil +} + +// Close flushes and closes the underlying zlib stream and temp file. +func (writer *streamWriter) Close() error { + errZlib := writer.zw.Close() + errSync := writer.file.Sync() + errFile := writer.file.Close() + + writer.closed = true + writer.file = nil + + return errors.Join(errZlib, errSync, errFile) +} + +// acceptFull validates and accounts raw full-object input. +func (writer *streamWriter) acceptFull(src []byte) error { + if writer.headerDone { + return writer.acceptContent(len(src)) + } + + nul := bytes.IndexByte(src, 0) + if nul < 0 { + writer.headerBuf = append(writer.headerBuf, src...) + + return nil + } + + headerChunkLen := nul + 1 + writer.headerBuf = append(writer.headerBuf, src[:headerChunkLen]...) + + _, size, _, err := header.Parse(writer.headerBuf) + if err != nil { + return fmt.Errorf("%w: %w", store.ErrInvalidObject, err) + } + + writer.headerDone = true + writer.expectedContentLeft = size + + return writer.acceptContent(len(src) - headerChunkLen) +} + +// acceptContent validates and accounts content byte counts. +func (writer *streamWriter) acceptContent(n int) error { + if n > writer.expectedContentLeft { + return fmt.Errorf("%w: object content exceeds declared size", store.ErrInvalidObject) + } + + writer.expectedContentLeft -= n + + return nil +} + +// writeRawChunk forwards raw bytes to the hash and deflate pipeline. +func (writer *streamWriter) writeRawChunk(src []byte) error { + _, err := writer.hash.Write(src) + if err != nil { + return fmt.Errorf("object/store/loose: %w", err) + } + + _, err = writer.zw.Write(src) + if err != nil { + return fmt.Errorf("object/store/loose: %w", err) + } + + return nil +} + +// finalize validates write completeness and atomically publishes the object. +// Publication is no-clobber: it links tmpRelPath to the object path and treats +// existing destination objects as success. +func (writer *streamWriter) finalize() (id.ObjectID, error) { + writer.finalized = true + + var zero id.ObjectID + + if !writer.closed { + err := writer.Close() + if err != nil { + return zero, err + } + } + + if writer.fullMode && !writer.headerDone { + return zero, fmt.Errorf("%w: missing full object header", store.ErrInvalidObject) + } + + if writer.expectedContentLeft != 0 { + return zero, fmt.Errorf("%w: object content shorter than declared size", store.ErrInvalidObject) + } + + idBytes := writer.hash.Sum(nil) + + objectID, err := writer.loose.objectFormat.FromBytes(idBytes) + if err != nil { + return zero, fmt.Errorf("object/store/loose: %w", err) + } + + relPath, err := writer.loose.objectPath(objectID) + if err != nil { + return zero, err + } + + dir := filepath.Dir(relPath) + + err = writer.loose.root.MkdirAll(dir, 0o755) + if err != nil { + return zero, fmt.Errorf("object/store/loose: %w", err) + } + + cleanup := true + + defer func() { + if cleanup { + _ = writer.loose.root.Remove(writer.tmpRelPath) + } + }() + + err = writer.loose.root.Link(writer.tmpRelPath, relPath) + if err != nil { + if errors.Is(err, fs.ErrExist) { + cleanup = false + _ = writer.loose.root.Remove(writer.tmpRelPath) + + return objectID, nil + } + + return zero, fmt.Errorf("object/store/loose: %w", err) + } + + cleanup = false + _ = writer.loose.root.Remove(writer.tmpRelPath) + + return objectID, nil +} + +// newStreamWriter creates a stream writer with a temp file rooted in objects/. +func (loose *Loose) newStreamWriter(fullMode bool) (*streamWriter, error) { + hashFn, err := loose.objectFormat.New() + if err != nil { + return nil, fmt.Errorf("object/store/loose: %w", err) + } + + tmpRelPath, file, err := loose.createTempObjectFile(".") + if err != nil { + return nil, err + } + + return &streamWriter{ + loose: loose, + file: file, + zw: zlib.NewWriter(file), + hash: hashFn, + tmpRelPath: tmpRelPath, + fullMode: fullMode, + headerBuf: make([]byte, 0, 64), + }, nil +} + +// createTempObjectFile creates a unique temporary object file within dir. +// The returned path is relative to the objects root. +func (loose *Loose) createTempObjectFile(dir string) (string, *os.File, error) { + var lastErr error + + for range 16 { + relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text()) + + file, err := loose.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644) + if err == nil { + return relPath, file, nil + } + + lastErr = err + + if errors.Is(err, fs.ErrExist) { + continue + } + + return "", nil, fmt.Errorf("object/store/loose: %w", err) + } + + return "", nil, fmt.Errorf("object/store/loose: failed to create temporary object file: %w", lastErr) +} diff --git a/object/store/loose/write_test.go b/object/store/loose/write_test.go new file mode 100644 index 00000000..e3ea2d6c --- /dev/null +++ b/object/store/loose/write_test.go @@ -0,0 +1,158 @@ +package loose_test + +import ( + "bytes" + "errors" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/loose" + "lindenii.org/go/furgit/object/typ" +) + +func TestWrite(t *testing.T) { + t.Parallel() + + writes := []struct { + name string + write func(looseStore *loose.Loose, content []byte) (id.ObjectID, error) + }{ + { + name: "BytesContent", + write: func(looseStore *loose.Loose, content []byte) (id.ObjectID, error) { + return looseStore.WriteBytesContent(typ.Blob, content) + }, + }, + { + name: "ReaderContent", + write: func(looseStore *loose.Loose, content []byte) (id.ObjectID, error) { + return looseStore.WriteReaderContent(typ.Blob, len(content), bytes.NewReader(content)) + }, + }, + { + name: "BytesFull", + write: func(looseStore *loose.Loose, content []byte) (id.ObjectID, error) { + raw := header.Append(nil, typ.Blob, len(content)) + raw = append(raw, content...) + + return looseStore.WriteBytesFull(raw) + }, + }, + { + name: "ReaderFull", + write: func(looseStore *loose.Loose, content []byte) (id.ObjectID, error) { + raw := header.Append(nil, typ.Blob, len(content)) + raw = append(raw, content...) + + return looseStore.WriteReaderFull(bytes.NewReader(raw)) + }, + }, + } + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + looseStore := openLooseStore(t, repo) + + for _, w := range writes { + t.Run(w.name, func(t *testing.T) { + content := []byte("written via " + w.name + "\n") + + want, err := repo.HashObject(t, typ.Blob, bytes.NewReader(content)) + if err != nil { + t.Fatalf("HashObject: %v", err) + } + + got, err := w.write(looseStore, content) + if err != nil { + t.Fatalf("write: %v", err) + } + + if got != want { + t.Fatalf("id = %s, want %s", got, want) + } + + gotBody, err := repo.CatFile(t, typ.Blob, got) + if err != nil { + t.Fatalf("CatFile: %v", err) + } + + if !bytes.Equal(gotBody, content) { + t.Fatalf("git cat-file body mismatch") + } + + regot, err := w.write(looseStore, content) + if err != nil { + t.Fatalf("rewrite: %v", err) + } + + if regot != want { + t.Fatalf("rewrite id = %s, want %s", regot, want) + } + }) + } + }) + } +} + +func TestWriteRejects(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + looseStore := openLooseStore(t, repo) + + t.Run("ContentOverflow", func(t *testing.T) { + t.Parallel() + + _, err := looseStore.WriteReaderContent(typ.Blob, 1, bytes.NewReader([]byte("hello"))) + if !errors.Is(err, store.ErrInvalidObject) { + t.Fatalf("err = %v, want ErrInvalidObject", err) + } + }) + + t.Run("ContentShort", func(t *testing.T) { + t.Parallel() + + _, err := looseStore.WriteReaderContent(typ.Blob, 5, bytes.NewReader([]byte("x"))) + if !errors.Is(err, store.ErrInvalidObject) { + t.Fatalf("err = %v, want ErrInvalidObject", err) + } + }) + + t.Run("FullMalformedHeader", func(t *testing.T) { + t.Parallel() + + _, err := looseStore.WriteReaderFull(bytes.NewReader([]byte("not-a-header"))) + if !errors.Is(err, store.ErrInvalidObject) { + t.Fatalf("err = %v, want ErrInvalidObject", err) + } + }) + + t.Run("FullSizeMismatch", func(t *testing.T) { + t.Parallel() + + _, err := looseStore.WriteReaderFull(bytes.NewReader([]byte("blob 1\x00hello"))) + if !errors.Is(err, store.ErrInvalidObject) { + t.Fatalf("err = %v, want ErrInvalidObject", err) + } + }) + }) + } +} diff --git a/object/store/loose/writer.go b/object/store/loose/writer.go new file mode 100644 index 00000000..1133e8c8 --- /dev/null +++ b/object/store/loose/writer.go @@ -0,0 +1,84 @@ +package loose + +import ( + "bytes" + "fmt" + "io" + + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/typ" +) + +// WriteBytesFull writes a full serialized object as "type size\x00content". +func (loose *Loose) WriteBytesFull(raw []byte) (id.ObjectID, error) { + return loose.WriteReaderFull(bytes.NewReader(raw)) +} + +// WriteBytesContent writes typed content bytes as a loose object. +func (loose *Loose) WriteBytesContent(ty typ.Type, content []byte) (id.ObjectID, error) { + return loose.WriteReaderContent(ty, len(content), bytes.NewReader(content)) +} + +// WriteReaderContent writes one loose object from typed content bytes read from src. +// src must provide exactly size bytes. +// size is required because loose object headers are "type size\x00content", +// so the header must be emitted before streaming content without buffering. +func (loose *Loose) WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.ObjectID, error) { + headerBytes := header.Append(nil, ty, size) + + writer, err := loose.newStreamWriter(false) + if err != nil { + return id.ObjectID{}, err + } + + writer.headerDone = true + writer.expectedContentLeft = size + + err = writer.writeRawChunk(headerBytes) + if err != nil { + _ = writer.Close() + _ = loose.root.Remove(writer.tmpRelPath) + + return id.ObjectID{}, err + } + + return writeReaderIntoStreamWriter(writer, src) +} + +// WriteReaderFull writes one loose object from raw bytes "type size\x00content" read from src. +func (loose *Loose) WriteReaderFull(src io.Reader) (id.ObjectID, error) { + writer, err := loose.newStreamWriter(true) + if err != nil { + return id.ObjectID{}, err + } + + return writeReaderIntoStreamWriter(writer, src) +} + +// writeReaderIntoStreamWriter copies src into writer and publishes the object. +func writeReaderIntoStreamWriter(writer *streamWriter, src io.Reader) (id.ObjectID, error) { + _, err := io.Copy(writer, src) + if err != nil { + _ = writer.Close() + _ = writer.loose.root.Remove(writer.tmpRelPath) + + return id.ObjectID{}, fmt.Errorf("object/store/loose: %w", err) + } + + err = writer.Close() + if err != nil { + _ = writer.loose.root.Remove(writer.tmpRelPath) + + return id.ObjectID{}, err + } + + objectID, err := writer.finalize() + if err != nil { + _ = writer.loose.root.Remove(writer.tmpRelPath) + + return id.ObjectID{}, err + } + + return objectID, nil +} diff --git a/object/store/memory/reader.go b/object/store/memory/reader.go index 19723a6c..e04ad759 100644 --- a/object/store/memory/reader.go +++ b/object/store/memory/reader.go @@ -17,34 +17,36 @@ func (memory *Memory) ReadBytesFull(id id.ObjectID) ([]byte, error) { return nil, store.ErrObjectNotFound } - raw := header.Append(nil, obj.ty, uint64(len(obj.content))) + raw := header.Append(nil, obj.ty, len(obj.content)) raw = append(raw, obj.content...) return raw, nil } // ReadBytesContent reads one object body. +// +// The returned slice aliases the store's own copy of the object content. func (memory *Memory) ReadBytesContent(id id.ObjectID) (typ.Type, []byte, error) { obj, ok := memory.objects.Load(id) if !ok { - return typ.TypeUnknown, nil, store.ErrObjectNotFound + return typ.Unknown, nil, store.ErrObjectNotFound } - return obj.ty, append([]byte(nil), obj.content...), nil + return obj.ty, obj.content, nil } // ReadHeader reads one object header. -func (memory *Memory) ReadHeader(id id.ObjectID) (typ.Type, uint64, error) { +func (memory *Memory) ReadHeader(id id.ObjectID) (typ.Type, int, error) { obj, ok := memory.objects.Load(id) if !ok { - return typ.TypeUnknown, 0, store.ErrObjectNotFound + return typ.Unknown, 0, store.ErrObjectNotFound } - return obj.ty, uint64(len(obj.content)), nil + return obj.ty, len(obj.content), nil } // ReadSize reads one object size. -func (memory *Memory) ReadSize(id id.ObjectID) (uint64, error) { +func (memory *Memory) ReadSize(id id.ObjectID) (int, error) { _, size, err := memory.ReadHeader(id) if err != nil { return 0, err @@ -64,13 +66,13 @@ func (memory *Memory) ReadReaderFull(id id.ObjectID) (io.ReadCloser, error) { } // ReadReaderContent reads one object body through a reader. -func (memory *Memory) ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.ReadCloser, error) { +func (memory *Memory) ReadReaderContent(id id.ObjectID) (typ.Type, int, io.ReadCloser, error) { ty, content, err := memory.ReadBytesContent(id) if err != nil { - return typ.TypeUnknown, 0, nil, err + return typ.Unknown, 0, nil, err } - return ty, uint64(len(content)), io.NopCloser(bytes.NewReader(content)), nil + return ty, len(content), io.NopCloser(bytes.NewReader(content)), nil } // Refresh is a no-op for in-memory object stores. diff --git a/object/store/memory/writer.go b/object/store/memory/writer.go index 185b082b..d76a1f41 100644 --- a/object/store/memory/writer.go +++ b/object/store/memory/writer.go @@ -8,12 +8,11 @@ import ( "lindenii.org/go/furgit/object/id" "lindenii.org/go/furgit/object/store" "lindenii.org/go/furgit/object/typ" - "lindenii.org/go/lgo/intconv" ) // WriteBytesContent writes one typed object content byte slice. func (memory *Memory) WriteBytesContent(ty typ.Type, content []byte) (id.ObjectID, error) { - raw := header.Append(nil, ty, uint64(len(content))) + raw := header.Append(nil, ty, len(content)) raw = append(raw, content...) objectID := memory.objectFormat.Sum(raw) @@ -30,7 +29,7 @@ func (memory *Memory) WriteBytesFull(raw []byte) (id.ObjectID, error) { } content := raw[consumed:] - if uint64(len(content)) != size { + if len(content) != size { return id.ObjectID{}, fmt.Errorf("%w: header size/content mismatch", store.ErrInvalidObject) } @@ -38,21 +37,16 @@ func (memory *Memory) WriteBytesFull(raw []byte) (id.ObjectID, error) { } // WriteReaderContent writes one typed object content stream. -func (memory *Memory) WriteReaderContent(ty typ.Type, size uint64, src io.Reader) (id.ObjectID, error) { - limit, err := intconv.Uint64ToInt64(size) - if err != nil { - return id.ObjectID{}, fmt.Errorf("object/store/memory: content size: %w", err) - } - - content, err := io.ReadAll(io.LimitReader(src, limit+1)) +func (memory *Memory) WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.ObjectID, error) { + content, err := io.ReadAll(io.LimitReader(src, int64(size)+1)) if err != nil { return id.ObjectID{}, fmt.Errorf("object/store/memory: read content: %w", err) } switch { - case uint64(len(content)) > size: + case len(content) > size: return id.ObjectID{}, fmt.Errorf("%w: content longer than declared size", store.ErrInvalidObject) - case uint64(len(content)) < size: + case len(content) < size: return id.ObjectID{}, fmt.Errorf("%w: content shorter than declared size", store.ErrInvalidObject) } diff --git a/object/store/memory/writer_test.go b/object/store/memory/writer_test.go index e68e1671..ad0d8722 100644 --- a/object/store/memory/writer_test.go +++ b/object/store/memory/writer_test.go @@ -19,9 +19,9 @@ func TestWriteReaderContent(t *testing.T) { store := memory.New(objectFormat) content := []byte("memory-content\n") - raw := append(header.Append(nil, typ.TypeBlob, uint64(len(content))), content...) + raw := append(header.Append(nil, typ.Blob, len(content)), content...) - gotID, err := store.WriteReaderContent(typ.TypeBlob, uint64(len(content)), bytes.NewReader(content)) + gotID, err := store.WriteReaderContent(typ.Blob, len(content), bytes.NewReader(content)) if err != nil { t.Fatalf("WriteReaderContent: %v", err) } @@ -36,8 +36,8 @@ func TestWriteReaderContent(t *testing.T) { t.Fatalf("ReadBytesContent: %v", err) } - if gotType != typ.TypeBlob { - t.Fatalf("ReadBytesContent type = %v, want %v", gotType, typ.TypeBlob) + if gotType != typ.Blob { + t.Fatalf("ReadBytesContent type = %v, want %v", gotType, typ.Blob) } if !bytes.Equal(gotContent, content) { @@ -56,7 +56,7 @@ func TestWriteReaderFull(t *testing.T) { store := memory.New(objectFormat) content := []byte("memory-full\n") - raw := append(header.Append(nil, typ.TypeBlob, uint64(len(content))), content...) + raw := append(header.Append(nil, typ.Blob, len(content)), content...) gotID, err := store.WriteReaderFull(bytes.NewReader(raw)) if err != nil { @@ -89,9 +89,9 @@ func TestWriteBytes(t *testing.T) { store := memory.New(objectFormat) content := []byte("memory-bytes\n") - raw := append(header.Append(nil, typ.TypeBlob, uint64(len(content))), content...) + raw := append(header.Append(nil, typ.Blob, len(content)), content...) - gotID, err := store.WriteBytesContent(typ.TypeBlob, content) + gotID, err := store.WriteBytesContent(typ.Blob, content) if err != nil { t.Fatalf("WriteBytesContent: %v", err) } @@ -123,7 +123,7 @@ func TestWriteValidationErrors(t *testing.T) { { name: "content overflow", run: func(store *memory.Memory) error { - _, err := store.WriteReaderContent(typ.TypeBlob, 1, bytes.NewReader([]byte("hello"))) + _, err := store.WriteReaderContent(typ.Blob, 1, bytes.NewReader([]byte("hello"))) return err //nolint:wrapcheck }, @@ -131,7 +131,7 @@ func TestWriteValidationErrors(t *testing.T) { { name: "content short", run: func(store *memory.Memory) error { - _, err := store.WriteReaderContent(typ.TypeBlob, 5, bytes.NewReader([]byte("x"))) + _, err := store.WriteReaderContent(typ.Blob, 5, bytes.NewReader([]byte("x"))) return err //nolint:wrapcheck }, diff --git a/object/store/mix/bytes.go b/object/store/mix/bytes.go index 2b4d3819..e76d16a9 100644 --- a/object/store/mix/bytes.go +++ b/object/store/mix/bytes.go @@ -45,8 +45,8 @@ func (mix *Mix) ReadBytesContent(id id.ObjectID) (typ.Type, []byte, error) { continue } - return typ.TypeUnknown, nil, fmt.Errorf("object/store/mix: read bytes content: %w", err) + return typ.Unknown, nil, fmt.Errorf("object/store/mix: read bytes content: %w", err) } - return typ.TypeUnknown, nil, store.ErrObjectNotFound + return typ.Unknown, nil, store.ErrObjectNotFound } diff --git a/object/store/mix/header.go b/object/store/mix/header.go index 13dda9af..aefa2907 100644 --- a/object/store/mix/header.go +++ b/object/store/mix/header.go @@ -11,7 +11,7 @@ import ( // ReadHeader reads object header data // from the most-recently-used backend that has it. -func (mix *Mix) ReadHeader(id id.ObjectID) (typ.Type, uint64, error) { +func (mix *Mix) ReadHeader(id id.ObjectID) (typ.Type, int, error) { for _, backend := range mix.order.Keys() { ty, size, err := backend.ReadHeader(id) if err == nil { @@ -24,15 +24,15 @@ func (mix *Mix) ReadHeader(id id.ObjectID) (typ.Type, uint64, error) { continue } - return typ.TypeUnknown, 0, fmt.Errorf("object/store/mix: read header: %w", err) + return typ.Unknown, 0, fmt.Errorf("object/store/mix: read header: %w", err) } - return typ.TypeUnknown, 0, store.ErrObjectNotFound + return typ.Unknown, 0, store.ErrObjectNotFound } // ReadSize reads object content length // from the most-recently-used backend that has it. -func (mix *Mix) ReadSize(id id.ObjectID) (uint64, error) { +func (mix *Mix) ReadSize(id id.ObjectID) (int, error) { for _, backend := range mix.order.Keys() { size, err := backend.ReadSize(id) if err == nil { diff --git a/object/store/mix/mix.go b/object/store/mix/mix.go index 2e8e926b..b048fe86 100644 --- a/object/store/mix/mix.go +++ b/object/store/mix/mix.go @@ -28,7 +28,7 @@ func New(backends ...store.ObjectReader) *Mix { present[backend] = struct{}{} } - order := mru.New[store.ObjectReader]() + order := mru.New[store.ObjectReader](mru.Options{Interval: 48}) order.Sync(present) return &Mix{ diff --git a/object/store/mix/reader.go b/object/store/mix/reader.go index f9edc1a4..46a3aedf 100644 --- a/object/store/mix/reader.go +++ b/object/store/mix/reader.go @@ -33,7 +33,7 @@ func (mix *Mix) ReadReaderFull(id id.ObjectID) (io.ReadCloser, error) { // ReadReaderContent reads an object's type, declared content length, // and content stream from the most-recently-used backend that has it. -func (mix *Mix) ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.ReadCloser, error) { +func (mix *Mix) ReadReaderContent(id id.ObjectID) (typ.Type, int, io.ReadCloser, error) { for _, backend := range mix.order.Keys() { ty, size, reader, err := backend.ReadReaderContent(id) if err == nil { @@ -46,8 +46,8 @@ func (mix *Mix) ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.ReadClos continue } - return typ.TypeUnknown, 0, nil, fmt.Errorf("object/store/mix: read reader content: %w", err) + return typ.Unknown, 0, nil, fmt.Errorf("object/store/mix: read reader content: %w", err) } - return typ.TypeUnknown, 0, nil, store.ErrObjectNotFound + return typ.Unknown, 0, nil, store.ErrObjectNotFound } diff --git a/object/store/packed/basecache.go b/object/store/packed/basecache.go new file mode 100644 index 00000000..88597404 --- /dev/null +++ b/object/store/packed/basecache.go @@ -0,0 +1,38 @@ +package packed + +import ( + "lindenii.org/go/furgit/internal/cache/clock" + "lindenii.org/go/furgit/internal/format/packfile" +) + +// baseCacheMaxWeight bounds the delta base cache weight. +const baseCacheMaxWeight = 96 << 20 + +// baseKey addresses an entry in one pack +// as a delta base cache key. +type baseKey struct { + pack *pack + offset int +} + +// cachedBase is a cached delta base, i.e., +// its resolved object entry type and full content. +// +// content is shared with concurrent users; +// it may only be returned to callers as a copy. +// +// Labels: Mut-No. +type cachedBase struct { + entryType packfile.EntryType + content []byte +} + +// newBaseCache creates the delta base cache. +func newBaseCache() *clock.Clock[baseKey, cachedBase] { + return clock.New(baseCacheMaxWeight, baseWeight) +} + +// baseWeight weighs one cached delta base. +func baseWeight(_ baseKey, base cachedBase) uint64 { + return uint64(len(base.content)) + 32 +} diff --git a/object/store/packed/delta.go b/object/store/packed/delta.go new file mode 100644 index 00000000..5b538221 --- /dev/null +++ b/object/store/packed/delta.go @@ -0,0 +1,246 @@ +package packed + +import ( + "fmt" + "io" + "slices" + + "lindenii.org/go/furgit/errs" + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packfile/delta" + "lindenii.org/go/lgo/intconv" + "lindenii.org/go/lgo/sync" +) + +//nolint:gochecknoglobals +var deltaHeaderPool = sync.NewPool(func() *[delta.MaxHeaderSizesLen]byte { + return new([delta.MaxHeaderSizesLen]byte) +}) + +// deltaNode is a delta entry on a resolution chain. +type deltaNode struct { + // payload is the entry's compressed delta payload view. + payload []byte + + // size is the entry's declared inflated delta size. + size uint64 + + // baseOffset is the entry's base entry offset. + baseOffset int +} + +// unpackEntry reconstructs the object stored at offset in p, +// following ref- and ofs-delta chains within the pack. +// +// A direct base-cache hit returns the shared cache buffer itself, +// so the result may alias cache storage and must not be mutated; +// delta-applied results are freshly allocated. +// +// Labels: Life-Parent, Mut-No. +func (packed *Packed) unpackEntry(p *pack, offset int) (packfile.EntryType, []byte, error) { + var zero packfile.EntryType + + var ( + chain []deltaNode + baseType packfile.EntryType + base []byte + fromCache bool + ) + + // Drill down to the innermost base, + // stopping early at any cached base. + cur := offset + + for { + if cached, ok := packed.baseCache.Get(baseKey{pack: p, offset: cur}); ok { + baseType = cached.entryType + base = cached.content + fromCache = true + + break + } + + if len(chain) >= delta.MaxChainDepth { + return zero, nil, fmt.Errorf("%w: pack %q: delta chain too deep", ErrMalformedPackedStore, p.name) + } + + header, payload, err := p.entryHeaderAt(cur, packed.objectFormat) + if err != nil { + return zero, nil, err + } + + if header.Type.IsBase() { + base, err = inflate(payload, header.Size) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + baseType = header.Type + + break + } + + baseOffset, err := packed.deltaBaseOffset(p, cur, header) + if err != nil { + return zero, nil, err + } + + chain = append(chain, deltaNode{ + payload: payload, + size: header.Size, + baseOffset: baseOffset, + }) + + cur = baseOffset + } + + // A direct cache hit with no deltas to apply + // returns the shared cache buffer directly; + // callers are contractually Mut-No. + if len(chain) == 0 && fromCache { + return baseType, base, nil + } + + // Apply deltas back up the chain, caching each consumed base. + for i, node := range slices.Backward(chain) { + deltaData, err := inflate(node.payload, node.size) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + result, err := delta.Apply(base, deltaData) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + consumedFromCache := fromCache && i == len(chain)-1 + if !consumedFromCache { + packed.baseCache.Add( + baseKey{pack: p, offset: node.baseOffset}, + cachedBase{entryType: baseType, content: base}, + ) + } + + base = result + } + + return baseType, base, nil +} + +// deltaBaseOffset resolves a delta entry's base entry offset +// within the same pack. +func (packed *Packed) deltaBaseOffset(p *pack, offset int, header packfile.EntryHeader) (int, error) { + switch header.Type { + case packfile.EntryTypeOfsDelta: + dist, err := intconv.Uint64ToInt(header.OfsDistance) + if err != nil || dist == 0 || dist > offset { + return 0, fmt.Errorf("%w: pack %q: invalid ofs-delta distance", ErrMalformedPackedStore, p.name) + } + + return offset - dist, nil + case packfile.EntryTypeRefDelta: + refBase := header.RefBase[:packed.objectFormat.Size()] + + baseOffsetU, found, err := p.idx.Lookup(refBase) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + if !found { + baseID, idErr := packed.objectFormat.FromBytes(refBase) + if idErr != nil { + return 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, idErr) + } + + return 0, fmt.Errorf( + "%w: resolving ref-delta: %w", + ErrMalformedPackedStore, &errs.ObjectMissingError{OID: baseID}, + ) + } + + baseOffset, err := intconv.Uint64ToInt(baseOffsetU) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: ref-delta base offset overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + return baseOffset, nil + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } + + panic("object/store/packed: deltaBaseOffset on non-delta entry") +} + +// resolveType walks one delta chain +// to find the chained base object entry type, +// without inflating any content. +func (packed *Packed) resolveType(p *pack, offset int, entryHeader packfile.EntryHeader) (packfile.EntryType, error) { + var zero packfile.EntryType + + depth := 0 + + for entryHeader.Type.IsDelta() { + if cached, ok := packed.baseCache.Peek(baseKey{pack: p, offset: offset}); ok { + return cached.entryType, nil + } + + depth++ + if depth > delta.MaxChainDepth { + return zero, fmt.Errorf("%w: pack %q: delta chain too deep", ErrMalformedPackedStore, p.name) + } + + baseOffset, err := packed.deltaBaseOffset(p, offset, entryHeader) + if err != nil { + return zero, err + } + + offset = baseOffset + + entryHeader, _, err = p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return zero, err + } + } + + return entryHeader.Type, nil +} + +// deltaResultSize reads the declared result size +// from one compressed delta payload prefix. +func deltaResultSize(payload []byte, deltaSize uint64) (int, error) { + zr, err := zlib.NewReaderBytes(payload) + if err != nil { + return 0, fmt.Errorf("reading delta header: %w", err) + } + + defer func() { _ = zr.Close() }() + + buf := deltaHeaderPool.Get() + defer deltaHeaderPool.Put(buf) + + prefixLen := min(uint64(delta.MaxHeaderSizesLen), deltaSize) + + prefix := buf[:prefixLen] + + _, err = io.ReadFull(zr, prefix) + if err != nil { + return 0, fmt.Errorf("reading delta header: %w", err) + } + + _, resultSize, _, err := delta.ParseHeaderSizes(prefix) + if err != nil { + return 0, fmt.Errorf("reading delta header: %w", err) + } + + size, err := intconv.Uint64ToInt(resultSize) + if err != nil { + return 0, fmt.Errorf("reading delta header: result size overflows int: %w", err) + } + + return size, nil +} diff --git a/object/store/packed/doc.go b/object/store/packed/doc.go new file mode 100644 index 00000000..8c2a5bdc --- /dev/null +++ b/object/store/packed/doc.go @@ -0,0 +1,4 @@ +// Package packed provides Git object reading from, +// and pack writing to, +// an objects/pack directory. +package packed diff --git a/object/store/packed/entry.go b/object/store/packed/entry.go new file mode 100644 index 00000000..908afad0 --- /dev/null +++ b/object/store/packed/entry.go @@ -0,0 +1,73 @@ +package packed + +import ( + "errors" + "fmt" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +var errPayloadOverlong = errors.New("entry payload longer than declared") + +// entryHeaderAt parses the entry header at offset, +// returning it together with the entry's compressed payload view. +// +// The entry header only contains the inflated length, +// so payload slice extends to the end of the pack; +// the compressed data length is determined by the zlib stream end, +// not the slice length. +// +// Labels: Life-Parent, Mut-No. +func (pack *pack) entryHeaderAt(offset int, objectFormat id.ObjectFormat) (packfile.EntryHeader, []byte, error) { + var zero packfile.EntryHeader + + pos := offset + if pos < 0 || pos >= len(pack.data) { + return zero, nil, fmt.Errorf("%w: pack %q: entry offset out of bounds", ErrMalformedPackedStore, pack.name) + } + + header, err := packfile.ParseEntryHeader(pack.data[pos:], objectFormat.Size()) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, pack.name, err) + } + + return header, pack.data[pos+header.HeaderLen:], nil +} + +// inflate decompresses one entry payload of expectedSize bytes, +// rejecting payloads whose inflated size differs. +// +// Labels: Life-Independent. +func inflate(payload []byte, expectedSize uint64) ([]byte, error) { + size, err := intconv.Uint64ToInt(expectedSize) + if err != nil { + return nil, fmt.Errorf("declared size: %w", err) + } + + zr, err := zlib.NewReaderBytes(payload) + if err != nil { + return nil, fmt.Errorf("inflating entry payload: %w", err) + } + + defer func() { _ = zr.Close() }() + + out := make([]byte, size) + + _, err = io.ReadFull(zr, out) + if err != nil { + return nil, fmt.Errorf("inflating entry payload: %w", err) + } + + var probe [1]byte + + n, err := zr.Read(probe[:]) + if n != 0 || !errors.Is(err, io.EOF) { + return nil, errPayloadOverlong + } + + return out, nil +} diff --git a/object/store/packed/helpers_test.go b/object/store/packed/helpers_test.go new file mode 100644 index 00000000..9cd95ab8 --- /dev/null +++ b/object/store/packed/helpers_test.go @@ -0,0 +1,82 @@ +package packed_test + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store/packed" +) + +// makeGitPack seeds a repository, +// packs the seeded objects with git pack-objects, +// and returns the repository, the artifact path prefix, +// and the seeded objects. +func makeGitPack(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, string, testgit.Seeded) { + t.Helper() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + prefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{}) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + return repo, prefix, seeded +} + +// requireDeltas asserts that the pack at prefix +// contains at least one deltified entry, +// so that tests really do exercise delta resolution. +func requireDeltas(t *testing.T, repo *testgit.Repo, prefix string, objectFormat id.ObjectFormat) { + t.Helper() + + out, err := repo.VerifyPack(t, prefix+".idx") + if err != nil { + t.Fatalf("VerifyPack: %v", err) + } + + hexLen := objectFormat.HexLen() + + for line := range strings.Lines(string(out)) { + fields := strings.Fields(line) + if len(fields) >= 7 && len(fields[0]) == hexLen { + return + } + } + + t.Fatalf("fixture pack contains no deltified entries") +} + +// openPackedStore opens a packed store +// over the directory containing prefix's pack artifacts. +func openPackedStore(t *testing.T, prefix string, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(filepath.Dir(prefix)) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} diff --git a/object/store/packed/internal/ingest/doc.go b/object/store/packed/internal/ingest/doc.go new file mode 100644 index 00000000..67be037b --- /dev/null +++ b/object/store/packed/internal/ingest/doc.go @@ -0,0 +1,11 @@ +// Package ingest writes one incoming pack stream +// into an objects/pack directory +// as a finalized pack, index, and reverse index. +// +// WritePack streams the pack to a temporary file +// while scanning its entries, +// resolves every delta against in-pack bases, +// optionally completes thin packs from an external base reader, +// and publishes the artifacts under content-addressed names +// derived from the pack trailer hash. +package ingest diff --git a/object/store/packed/internal/ingest/errors.go b/object/store/packed/internal/ingest/errors.go new file mode 100644 index 00000000..e268182e --- /dev/null +++ b/object/store/packed/internal/ingest/errors.go @@ -0,0 +1,36 @@ +package ingest + +import ( + "errors" + "fmt" + + "lindenii.org/go/furgit/object/id" +) + +// ErrMalformedPack reports that +// the incoming pack stream is truncated, +// inconsistent, or otherwise unparseable. +var ErrMalformedPack = errors.New("object/store/packed/internal/ingest: malformed pack") + +// ErrThinPackNotPermitted reports that +// the incoming pack is thin, +// referencing bases not contained within it, +// but no external base reader was supplied to complete it. +var ErrThinPackNotPermitted = errors.New("object/store/packed/internal/ingest: thin pack not permitted: no thin base supplied") + +// ThinBasesMissingError reports that +// an incoming thin pack references base objects +// that the supplied thin base reader does not contain, +// so the pack cannot be completed. +type ThinBasesMissingError struct { + // OIDs holds the missing base object IDs, sorted. + OIDs []id.ObjectID +} + +// Error implements error. +func (e *ThinBasesMissingError) Error() string { + return fmt.Sprintf( + "object/store/packed/internal/ingest: thin pack references %d missing base objects", + len(e.OIDs), + ) +} diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go new file mode 100644 index 00000000..afed996c --- /dev/null +++ b/object/store/packed/internal/ingest/finalize.go @@ -0,0 +1,198 @@ +package ingest + +import ( + "errors" + "fmt" + "io" + "io/fs" + "slices" + + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/format/packrev" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// finalize writes the index and reverse index, +// then links the pack, reverse index, and index +// to their content-addressed names. +func (ingestion *ingestion) finalize() (Result, error) { + entries, positions, err := ingestion.indexEntries() + if err != nil { + return Result{}, err + } + + packHash := ingestion.packHash.Bytes() + + idxTmp, err := ingestion.writeTemp("tmp_idx_", func(w io.Writer) error { + return packidx.Write(w, ingestion.objectFormat, entries, packHash) + }) + if err != nil { + return Result{}, err + } + + revTmp, err := ingestion.writeTemp("tmp_rev_", func(w io.Writer) error { + return packrev.Write(w, ingestion.objectFormat, positions, packHash) + }) + if err != nil { + return Result{}, err + } + + bloomBuilder, err := ingestion.buildBloom(entries, packHash) + if err != nil { + return Result{}, err + } + + bloomTmp, err := ingestion.writeTemp("tmp_bloom_", func(w io.Writer) error { + _, err := w.Write(bloomBuilder.Bytes()) + + return err + }) + if err != nil { + return Result{}, err + } + + base := "pack-" + ingestion.packHash.String() + packFinal := base + ".pack" + idxFinal := base + ".idx" + revFinal := base + ".rev" + bloomFinal := base + ".bloom" + + // Link the pack, reverse index, and Bloom filter before the index, + // since the index is what publishes the pack to readers. + err = ingestion.link(ingestion.packTmp, packFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(revTmp, revFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(bloomTmp, bloomFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(idxTmp, idxFinal) + if err != nil { + return Result{}, err + } + + objectCount, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return Result{ + PackName: packFinal, + IdxName: idxFinal, + RevName: revFinal, + BloomName: bloomFinal, + PackHash: ingestion.packHash, + ObjectCount: objectCount, + ThinFixed: ingestion.thinFixed, + }, nil +} + +// buildBloom builds a Bloom filter over the index entries' object IDs, +// bound to packHash. +func (ingestion *ingestion) buildBloom(entries []packidx.Entry, packHash []byte) (*bloom.Builder, error) { + bucketCount, k, err := bloom.RecommendParams(ingestion.objectFormat, len(entries)) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + builder, err := bloom.NewBuilder(ingestion.objectFormat, bucketCount, k, packHash) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + size := ingestion.objectFormat.Size() + for i := range entries { + builder.Add(entries[i].OID[:size]) + } + + return builder, nil +} + +// indexEntries returns the index entries in object-ID order +// and, for each record in pack order, its position in that index order. +func (ingestion *ingestion) indexEntries() ([]packidx.Entry, []uint32, error) { + order := make([]int, len(ingestion.records)) + for i := range order { + order[i] = i + } + + slices.SortFunc(order, func(left, right int) int { + return ingestion.records[left].oid.Compare(ingestion.records[right].oid) + }) + + entries := make([]packidx.Entry, len(order)) + positions := make([]uint32, len(ingestion.records)) + + for indexPosition, recordIndex := range order { + rec := &ingestion.records[recordIndex] + + var oidBytes [id.MaxObjectIDSize]byte + copy(oidBytes[:], rec.oid.RawBytes()) + + offset, err := intconv.IntToUint64(rec.offset) + if err != nil { + return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + entries[indexPosition] = packidx.Entry{ + OID: oidBytes, + Offset: offset, + CRC32: rec.crc32, + } + + position, err := intconv.IntToUint32(indexPosition) + if err != nil { + return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + positions[recordIndex] = position + } + + return entries, positions, nil +} + +// writeTemp creates a temporary file, +// writes it via write, syncs it, and returns its name. +func (ingestion *ingestion) writeTemp(prefix string, write func(io.Writer) error) (string, error) { + name, file, err := ingestion.createTemp(prefix) + if err != nil { + return "", err + } + + defer func() { _ = file.Close() }() + + err = write(file) + if err != nil { + return "", err + } + + err = file.Sync() + if err != nil { + return "", fmt.Errorf("object/store/packed/internal/ingest: syncing %q: %w", name, err) + } + + return name, nil +} + +// link hard-links tmp to final, +// treating an already-present destination as success. +func (ingestion *ingestion) link(tmp, final string) error { + err := ingestion.root.Link(tmp, final) + if err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err) + } + + _ = ingestion.root.Remove(tmp) + + return nil +} diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go new file mode 100644 index 00000000..5422b4af --- /dev/null +++ b/object/store/packed/internal/ingest/ingest.go @@ -0,0 +1,250 @@ +package ingest + +import ( + "bytes" + "crypto/rand" + "errors" + "fmt" + "io" + "io/fs" + "os" + + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names") + +// ingestion holds the state for one WritePack call. +type ingestion struct { + // root is the destination objects/pack directory. + root *os.Root + + // objectFormat is the pack's object format. + objectFormat id.ObjectFormat + + // opts carries the thin base reader and progress sink. + opts store.PackWriteOptions + + // src is the pack stream, positioned just past the header. + src io.Reader + + // packFile is the temporary pack file being written, + // and packTmp is its destination-relative name. + packFile *os.File + packTmp string + + // temps lists every temporary file to remove on failure. + temps []string + + // scanner streams src into packFile while scanning entries. + scanner *scanner + + // records holds one entry per object, in pack-offset order. + records []record + + // byOffset maps an entry offset to its record index, + // and byOID maps a resolved object ID to its record index. + byOffset map[int]int + byOID map[id.ObjectID]int + + // headerCount is the object count declared by the pack header. + headerCount int + + // deltaCount counts delta records, accumulated during scanning. + deltaCount int + + // deltasResolved counts resolved delta records, for progress. + deltasResolved int + + // packHash is the final pack trailer hash. + packHash id.ObjectID + + // thinFixed reports whether thin completion appended local bases. + thinFixed bool + + // committed suppresses temporary file removal once artifacts are published. + committed bool +} + +// WritePack ingests one pack stream into root, +// publishing a pack, index, and reverse index +// under content-addressed names derived from the pack trailer hash. +// +// WritePack consumes the pack stream through its trailer and stops there. +// It does not require src to reach EOF afterward, +// so it is safe on a still-open transport connection, +// such as receive-pack, +// whose peer keeps the connection open to read the response. +// +// The pack must be the last thing the peer sends before that response: +// any bytes arriving immediately after the trailer +// are rejected as a malformed pack. +func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) { + if objectFormat.Size() == 0 { + return Result{}, id.ErrInvalidObjectFormat + } + + headerRaw, count, err := readPackHeader(src) + if err != nil { + return Result{}, err + } + + ingestion := &ingestion{ + root: root, + objectFormat: objectFormat, + opts: opts, + src: src, + packFile: nil, + packTmp: "", + temps: nil, + scanner: nil, + records: nil, + byOffset: make(map[int]int), + byOID: make(map[id.ObjectID]int), + headerCount: count, + deltaCount: 0, + deltasResolved: 0, + packHash: id.ObjectID{}, + thinFixed: false, + committed: false, + } + + defer ingestion.cleanup() + + if count == 0 { + return ingestion.finishEmpty(headerRaw) + } + + err = ingestion.openPackTemp(headerRaw) + if err != nil { + return Result{}, err + } + + err = ingestion.streamAndScan() + if err != nil { + return Result{}, err + } + + err = ingestion.resolveDeltas() + if err != nil { + return Result{}, err + } + + err = ingestion.packFile.Sync() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: syncing pack: %w", err) + } + + result, err := ingestion.finalize() + if err != nil { + return Result{}, err + } + + ingestion.committed = true + + return result, nil +} + +// finishEmpty verifies the trailer of a zero-object pack +// and returns success without writing any artifacts. +func (ingestion *ingestion) finishEmpty(headerRaw [packfile.HeaderLen]byte) (Result, error) { + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + trailer := make([]byte, ingestion.objectFormat.Size()) + + _, err = io.ReadFull(ingestion.src, trailer) + if err != nil { + return Result{}, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if !bytes.Equal(hashImpl.Sum(nil), trailer) { + return Result{}, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return Result{ + PackName: "", + IdxName: "", + RevName: "", + PackHash: packHash, + ObjectCount: 0, + ThinFixed: false, + }, nil +} + +// openPackTemp creates the temporary pack file, +// writes the validated header to it, +// and builds the stream scanner seeded with that header. +func (ingestion *ingestion) openPackTemp(headerRaw [packfile.HeaderLen]byte) error { + name, file, err := ingestion.createTemp("tmp_pack_") + if err != nil { + return err + } + + ingestion.packTmp = name + ingestion.packFile = file + + _, err = file.Write(headerRaw[:]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack header: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + ingestion.scanner = newScanner(ingestion.src, ingestion.packFile, hashImpl) + + return nil +} + +// createTemp creates one temporary file under root, +// recording its name for cleanup on failure. +func (ingestion *ingestion) createTemp(prefix string) (string, *os.File, error) { + for range 32 { + name := prefix + rand.Text() + + file, err := ingestion.root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644) + if err == nil { + ingestion.temps = append(ingestion.temps, name) + + return name, file, nil + } + + if !errors.Is(err, fs.ErrExist) { + return "", nil, fmt.Errorf("object/store/packed/internal/ingest: creating temp file: %w", err) + } + } + + return "", nil, errTempNamesExhausted +} + +// cleanup closes the pack file +// and removes temporary files unless the write committed. +func (ingestion *ingestion) cleanup() { + if ingestion.packFile != nil { + _ = ingestion.packFile.Close() + } + + if ingestion.committed { + return + } + + for _, name := range ingestion.temps { + _ = ingestion.root.Remove(name) + } +} diff --git a/object/store/packed/internal/ingest/record.go b/object/store/packed/internal/ingest/record.go new file mode 100644 index 00000000..69101293 --- /dev/null +++ b/object/store/packed/internal/ingest/record.go @@ -0,0 +1,55 @@ +package ingest + +import ( + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" +) + +// record is the scanned metadata for one packed entry, +// completed in place as deltas are resolved. +// +// Records are appended in pack-offset order, +// so a record's index in the slice is also its pack order. +type record struct { + // offset is the entry's start offset in the pack. + offset int + + // headerLen is the entry header length in bytes, + // so the zlib payload begins at offset+headerLen. + headerLen int + + // packedLen is the total on-disk entry length in bytes, + // covering the header and the compressed payload. + packedLen int + + // crc32 is the CRC32 of the entry's packed bytes. + crc32 uint32 + + // packedType is the entry type as encoded in the pack. + packedType packfile.EntryType + + // declaredSize is the declared inflated payload size. + declaredSize int + + // baseOffset is the base entry offset for an ofs-delta. + baseOffset int + + // baseOID is the base object ID for a ref-delta. + baseOID id.ObjectID + + // objectType is the resolved object type, + // meaningful once resolved is true. + objectType packfile.EntryType + + // oid is the resolved object ID, + // meaningful once resolved is true. + oid id.ObjectID + + // resolved reports whether oid and objectType are final. + resolved bool +} + +// dataOffset returns the entry's compressed payload start offset. +func (record *record) dataOffset() int { + return record.offset + record.headerLen +} diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go new file mode 100644 index 00000000..77b0fa0f --- /dev/null +++ b/object/store/packed/internal/ingest/resolve.go @@ -0,0 +1,294 @@ +package ingest + +import ( + "fmt" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packfile/delta" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" +) + +// adjacency maps each resolvable base to its delta children: +// ofs-deltas keyed by base offset, ref-deltas keyed by base object ID. +type adjacency struct { + byOffset map[int][]int + byOID map[id.ObjectID][]int +} + +// resolveDeltas resolves every delta record into a final object ID and type, +// completing thin packs from the external base reader when required. +func (ingestion *ingestion) resolveDeltas() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "resolving deltas", + Total: ingestion.countDeltas(), + Delay: 0, + Sparse: false, + Throughput: false, + }) + + adjacency := ingestion.buildAdjacency() + + err := ingestion.resolveFrom(ingestion.resolvedRoots(), adjacency, meter) + if err != nil { + return err + } + + external := ingestion.unresolvedExternalBases() + + switch { + case len(external) == 0 && ingestion.countUnresolved() > 0: + return fmt.Errorf("%w: unresolvable delta entries", ErrMalformedPack) + case len(external) > 0: + err = ingestion.fixThin(external, adjacency, meter) + if err != nil { + return err + } + } + + meter.Stop("done") + + return nil +} + +// buildAdjacency indexes every delta record by its base, +// so a resolved base can find the children that delta against it. +func (ingestion *ingestion) buildAdjacency() adjacency { + out := adjacency{ + byOffset: make(map[int][]int), + byOID: make(map[id.ObjectID][]int), + } + + for index := range ingestion.records { + rec := &ingestion.records[index] + + switch rec.packedType { + case packfile.EntryTypeOfsDelta: + out.byOffset[rec.baseOffset] = append(out.byOffset[rec.baseOffset], index) + case packfile.EntryTypeRefDelta: + out.byOID[rec.baseOID] = append(out.byOID[rec.baseOID], index) + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } + } + + return out +} + +// resolveFrom resolves the delta subtree rooted at each resolved record. +func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error { + for _, root := range roots { + content, err := ingestion.inflateRecord(root) + if err != nil { + return err + } + + err = ingestion.resolveSubtree(root, content, ingestion.records[root].objectType, 0, adjacency, meter) + if err != nil { + return err + } + } + + return nil +} + +// resolveSubtree resolves every delta child of one resolved record at depth, +// holding the record's content as the base for its children. +func (ingestion *ingestion) resolveSubtree( + index int, + content []byte, + objectType packfile.EntryType, + depth int, + adjacency adjacency, + meter *progress.Meter, +) error { + rec := &ingestion.records[index] + + for _, child := range adjacency.byOffset[rec.offset] { + err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) + if err != nil { + return err + } + } + + for _, child := range adjacency.byOID[rec.oid] { + err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) + if err != nil { + return err + } + } + + return nil +} + +// resolveChild applies one delta record at depth against its base content, +// finalizes the record, and recurses into its own children. +func (ingestion *ingestion) resolveChild( + index int, + baseContent []byte, + baseType packfile.EntryType, + depth int, + adjacency adjacency, + meter *progress.Meter, +) error { + rec := &ingestion.records[index] + if rec.resolved { + return nil + } + + if depth > delta.MaxChainDepth { + return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset) + } + + deltaPayload, err := ingestion.inflateRecord(index) + if err != nil { + return err + } + + baseSize, resultSize, _, err := delta.ParseHeaderSizes(deltaPayload) + if err != nil { + return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + if baseSize != uint64(len(baseContent)) { + return fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset) + } + + content, err := delta.Apply(baseContent, deltaPayload) + if err != nil { + return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + if uint64(len(content)) != resultSize { + return fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset) + } + + oid, err := ingestion.hashObject(baseType, content) + if err != nil { + return err + } + + rec.objectType = baseType + rec.oid = oid + rec.resolved = true + ingestion.byOID[oid] = index + + ingestion.deltasResolved++ + meter.Set(ingestion.deltasResolved, 0) + + return ingestion.resolveSubtree(index, content, baseType, depth, adjacency, meter) +} + +// inflateRecord inflates one record's payload from the temporary pack file. +func (ingestion *ingestion) inflateRecord(index int) ([]byte, error) { + rec := &ingestion.records[index] + + offset := int64(rec.dataOffset()) + compressedLen := int64(rec.packedLen - rec.headerLen) + size := rec.declaredSize + + zr, err := zlib.NewReader(io.NewSectionReader(ingestion.packFile, offset, compressedLen)) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + out := make([]byte, size) + + _, err = io.ReadFull(zr, out) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + return out, nil +} + +// hashObject computes the object ID of one resolved object. +func (ingestion *ingestion) hashObject(objectType packfile.EntryType, content []byte) (id.ObjectID, error) { + var zero id.ObjectID + + ty, err := objectType.ObjectType() + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, ty, len(content))) + _, _ = hashImpl.Write(content) + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return oid, nil +} + +// resolvedRoots returns the indices of every currently resolved record. +func (ingestion *ingestion) resolvedRoots() []int { + var roots []int + + for index := range ingestion.records { + if ingestion.records[index].resolved { + roots = append(roots, index) + } + } + + return roots +} + +// countDeltas returns the number of delta records. +func (ingestion *ingestion) countDeltas() int { + return ingestion.deltaCount +} + +// countUnresolved returns the number of records that remain unresolved. +// +// Every base is resolved during scanning or thin completion, +// so the unresolved records are exactly the unresolved deltas: +// the delta records minus those already resolved. +func (ingestion *ingestion) countUnresolved() int { + return ingestion.deltaCount - ingestion.deltasResolved +} + +// unresolvedExternalBases returns the unique base object IDs +// of unresolved ref-deltas whose base is not present in the pack, +// in first-reference order. +func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID { + seen := make(map[id.ObjectID]struct{}) + + out := make([]id.ObjectID, 0, ingestion.deltaCount-ingestion.deltasResolved) + + for index := range ingestion.records { + rec := &ingestion.records[index] + if rec.resolved || rec.packedType != packfile.EntryTypeRefDelta { + continue + } + + if _, ok := ingestion.byOID[rec.baseOID]; ok { + continue + } + + if _, ok := seen[rec.baseOID]; ok { + continue + } + + seen[rec.baseOID] = struct{}{} + out = append(out, rec.baseOID) + } + + return out +} diff --git a/object/store/packed/internal/ingest/result.go b/object/store/packed/internal/ingest/result.go new file mode 100644 index 00000000..9cd6ef1d --- /dev/null +++ b/object/store/packed/internal/ingest/result.go @@ -0,0 +1,29 @@ +package ingest + +import "lindenii.org/go/furgit/object/id" + +// Result describes one finalized pack write. +type Result struct { + // PackName is the destination-relative name of the written pack. + PackName string + + // IdxName is the destination-relative name of the written index. + IdxName string + + // RevName is the destination-relative name of the written reverse index. + RevName string + + // BloomName is the destination-relative name of the written Bloom filter. + BloomName string + + // PackHash is the pack trailer hash + // shared by the pack, index, and reverse index. + PackHash id.ObjectID + + // ObjectCount is the number of objects in the finalized pack, + // including any bases appended during thin completion. + ObjectCount uint32 + + // ThinFixed reports whether thin completion appended local bases. + ThinFixed bool +} diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go new file mode 100644 index 00000000..6b3b73b7 --- /dev/null +++ b/object/store/packed/internal/ingest/scan.go @@ -0,0 +1,463 @@ +package ingest + +import ( + "bytes" + "errors" + "fmt" + "hash" + "hash/crc32" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// scanBufferSize is the stream scanner's fixed input window size. +const scanBufferSize = 64 << 10 + +// scanner reads one pack stream, +// mirroring consumed bytes into the destination pack file +// while maintaining the running pack hash and a per-entry CRC. +// +// It implements [io.Reader] and [io.ByteReader] +// so a zlib reader can consume an entry payload through it +// without reading past the end of that compressed stream. +type scanner struct { + src io.Reader + dst io.Writer + + // buf[off:n] is the unread window. + buf []byte + off int + n int + + // consumed counts stream bytes consumed so far. + consumed int + + // hash accumulates the pack hash over consumed bytes + // while hashing is true. + hash hash.Hash + hashing bool + + // crc accumulates the CRC of the current entry + // while crcing is true. + crc uint32 + crcing bool +} + +// newScanner constructs one scanner mirroring src into dst, +// seeding the running hash from the already-consumed pack header. +func newScanner(src io.Reader, dst io.Writer, packHash hash.Hash) *scanner { + return &scanner{ + src: src, + dst: dst, + buf: make([]byte, scanBufferSize), + consumed: packfile.HeaderLen, + hash: packHash, + hashing: true, + crc: 0, + crcing: false, + } +} + +// readPackHeader reads and validates the pack header from src, +// returning the raw header and its declared object count. +func readPackHeader(src io.Reader) ([packfile.HeaderLen]byte, int, error) { + var raw [packfile.HeaderLen]byte + + _, err := io.ReadFull(src, raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: reading header: %w", ErrMalformedPack, err) + } + + packHeader, err := packfile.ParseHeader(raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: %w", ErrMalformedPack, err) + } + + count, err := intconv.Uint32ToInt(packHeader.ObjectCount) + if err != nil { + return raw, 0, fmt.Errorf("%w: object count: %w", ErrMalformedPack, err) + } + + return raw, count, nil +} + +// Read implements [io.Reader]. +func (scanner *scanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + read := min(len(dst), scanner.n-scanner.off) + + copy(dst, scanner.buf[scanner.off:scanner.off+read]) + + err = scanner.use(read) + if err != nil { + return 0, err + } + + return read, nil +} + +// ReadByte implements [io.ByteReader] without allocation. +func (scanner *scanner) ReadByte() (byte, error) { + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + b := scanner.buf[scanner.off] + + err = scanner.use(1) + if err != nil { + return 0, err + } + + return b, nil +} + +// ensureAvailable makes at least one unread byte available, +// returning [io.EOF] once the source is exhausted. +func (scanner *scanner) ensureAvailable() error { + for scanner.n-scanner.off == 0 { + err := scanner.flushPrefix() + if err != nil { + return err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + if scanner.n-scanner.off == 0 { + return io.EOF + } + + return nil + } + + return fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 && scanner.n-scanner.off == 0 { + return io.ErrNoProgress + } + } + + return nil +} + +// peekHeader returns the unread window grown to at most maxLen bytes +// without consuming, tolerating an early end of stream. +func (scanner *scanner) peekHeader(maxLen int) ([]byte, error) { + maxLen = min(maxLen, len(scanner.buf)) + + for scanner.n-scanner.off < maxLen { + err := scanner.flushPrefix() + if err != nil { + return nil, err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 { + break + } + } + + if scanner.n-scanner.off == 0 { + return nil, fmt.Errorf("%w: unexpected end of stream", ErrMalformedPack) + } + + return scanner.buf[scanner.off:scanner.n], nil +} + +// use consumes n unread bytes, +// folding them into the running hash and entry CRC as enabled. +func (scanner *scanner) use(n int) error { + chunk := scanner.buf[scanner.off : scanner.off+n] + + if scanner.hashing { + _, err := scanner.hash.Write(chunk) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: hashing pack: %w", err) + } + } + + if scanner.crcing { + scanner.crc = crc32.Update(scanner.crc, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += n + + return nil +} + +// flushPrefix writes the consumed buffer prefix to the destination +// and compacts the unread window to the start of the buffer. +func (scanner *scanner) flushPrefix() error { + if scanner.off == 0 { + return nil + } + + _, err := scanner.dst.Write(scanner.buf[:scanner.off]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack: %w", err) + } + + unread := scanner.n - scanner.off + + copy(scanner.buf, scanner.buf[scanner.off:scanner.n]) + + scanner.off = 0 + scanner.n = unread + + return nil +} + +// beginCRC starts CRC accumulation for one entry. +func (scanner *scanner) beginCRC() { + scanner.crc = 0 + scanner.crcing = true +} + +// endCRC ends CRC accumulation and returns the entry CRC. +func (scanner *scanner) endCRC() uint32 { + crc := scanner.crc + scanner.crc = 0 + scanner.crcing = false + + return crc +} + +// finishTrailer reads and verifies the pack trailer hash, +// flushing the remaining buffered pack bytes to the destination. +// +// The trailer is mirrored to the destination but excluded from the pack hash. +func (scanner *scanner) finishTrailer(hashSize int) ([]byte, error) { + trailer := make([]byte, hashSize) + + scanner.hashing = false + + _, err := io.ReadFull(scanner, trailer) + if err != nil { + return nil, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if scanner.n-scanner.off > 0 { + return nil, fmt.Errorf("%w: trailing data after pack", ErrMalformedPack) + } + + if !bytes.Equal(scanner.hash.Sum(nil), trailer) { + return nil, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + err = scanner.flushPrefix() + if err != nil { + return nil, err + } + + return trailer, nil +} + +// streamAndScan streams the pack body to the temporary pack file, +// scanning one record per declared object and verifying the trailer. +func (ingestion *ingestion) streamAndScan() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "receiving objects", + Total: ingestion.headerCount, + Delay: 0, + Sparse: false, + Throughput: true, + }) + + for done := range ingestion.headerCount { + err := ingestion.scanEntry(ingestion.scanner.consumed) + if err != nil { + return err + } + + meter.Set(done+1, ingestion.scanner.consumed) + } + + meter.Stop("done") + + trailer, err := ingestion.scanner.finishTrailer(ingestion.objectFormat.Size()) + if err != nil { + return err + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// scanEntry scans the entry beginning at start into one record. +func (ingestion *ingestion) scanEntry(start int) error { + ingestion.scanner.beginCRC() + + rec, err := ingestion.scanHeader(start) + if err != nil { + return err + } + + inflated, oid, err := ingestion.drainPayload(&rec) + if err != nil { + return err + } + + if inflated != int64(rec.declaredSize) { + return fmt.Errorf( + "%w: entry at %d: inflated size %d differs from declared %d", + ErrMalformedPack, start, inflated, rec.declaredSize, + ) + } + + rec.packedLen = ingestion.scanner.consumed - start + rec.crc32 = ingestion.scanner.endCRC() + + if rec.packedType.IsBase() { + rec.objectType = rec.packedType + rec.oid = oid + rec.resolved = true + } else { + ingestion.deltaCount++ + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[rec.offset] = index + + if rec.resolved { + ingestion.byOID[rec.oid] = index + } + + return nil +} + +// scanHeader parses and consumes the entry header at start. +func (ingestion *ingestion) scanHeader(start int) (record, error) { + var rec record + + rec.offset = start + + window, err := ingestion.scanner.peekHeader(packfile.MaxEntryHeaderLen(ingestion.objectFormat.Size())) + if err != nil { + return rec, err + } + + entryHeader, err := packfile.ParseEntryHeader(window, ingestion.objectFormat.Size()) + if err != nil { + return rec, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, start, err) + } + + declaredSize, err := intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return rec, fmt.Errorf("%w: entry at %d: declared size overflows int: %w", ErrMalformedPack, start, err) + } + + rec.packedType = entryHeader.Type + rec.declaredSize = declaredSize + rec.headerLen = entryHeader.HeaderLen + + switch entryHeader.Type { + case packfile.EntryTypeCommit, packfile.EntryTypeTree, packfile.EntryTypeBlob, packfile.EntryTypeTag: + case packfile.EntryTypeOfsDelta: + dist, err := intconv.Uint64ToInt(entryHeader.OfsDistance) + if err != nil || dist == 0 || dist > start { + return rec, fmt.Errorf("%w: entry at %d: ofs-delta base out of bounds", ErrMalformedPack, start) + } + + rec.baseOffset = start - dist + case packfile.EntryTypeRefDelta: + baseID, err := ingestion.objectFormat.FromBytes(entryHeader.RefBase[:ingestion.objectFormat.Size()]) + if err != nil { + return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + rec.baseOID = baseID + case packfile.EntryTypeInvalid, packfile.EntryTypeFuture: + return rec, fmt.Errorf("%w: entry at %d: unsupported entry type", ErrMalformedPack, start) + } + + err = ingestion.scanner.use(entryHeader.HeaderLen) + if err != nil { + return rec, err + } + + return rec, nil +} + +// drainPayload consumes one entry's compressed payload from the stream, +// returning its inflated length and, for base entries, its object ID. +func (ingestion *ingestion) drainPayload(rec *record) (int64, id.ObjectID, error) { + var zero id.ObjectID + + zr, err := zlib.NewReader(ingestion.scanner) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + if !rec.packedType.IsBase() { + read, err := io.Copy(io.Discard, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + return read, zero, nil + } + + objectType, err := rec.packedType.ObjectType() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, objectType, rec.declaredSize)) + + read, err := io.Copy(hashImpl, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return read, oid, nil +} diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go new file mode 100644 index 00000000..fa125f2f --- /dev/null +++ b/object/store/packed/internal/ingest/thin.go @@ -0,0 +1,213 @@ +package ingest + +import ( + "errors" + "fmt" + "hash/crc32" + "io" + "os" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" + "lindenii.org/go/lgo/intconv" +) + +// fixThin completes a thin pack +// by appending the external bases it references, +// rewriting the pack header and trailer, +// and resolving the deltas reached from the appended bases. +func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency, meter *progress.Meter) error { + if ingestion.opts.ThinBase == nil { + return ErrThinPackNotPermitted + } + + hashSize := ingestion.objectFormat.Size() + if ingestion.scanner.consumed < packfile.HeaderLen+hashSize { + return fmt.Errorf("%w: pack shorter than trailer", ErrMalformedPack) + } + + // Drop the trailer from the write cursor. + ingestion.scanner.consumed -= hashSize + + appended := make([]int, 0, len(external)) + + for _, baseOID := range external { + ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID) + if errors.Is(err, store.ErrObjectNotFound) { + continue + } + + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: reading thin base %s: %w", baseOID, err) + } + + index, err := ingestion.appendBaseObject(baseOID, ty, content) + if err != nil { + return err + } + + appended = append(appended, index) + } + + err := ingestion.rewriteHeaderTrailer() + if err != nil { + return err + } + + err = ingestion.resolveFrom(appended, adjacency, meter) + if err != nil { + return err + } + + missing := ingestion.unresolvedExternalBases() + if len(missing) > 0 { + return &ThinBasesMissingError{OIDs: missing} + } + + if ingestion.countUnresolved() > 0 { + return fmt.Errorf("%w: unresolvable delta entries after thin completion", ErrMalformedPack) + } + + ingestion.thinFixed = len(appended) > 0 + + return nil +} + +// appendBaseObject appends one external thin base +// as a non-delta pack entry at the current write cursor, +// verifying that its content hashes to the requested object ID. +func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType typ.Type, content []byte) (int, error) { + entryType, err := packfile.EntryTypeFromObjectType(objectType) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + computed, err := ingestion.hashObject(entryType, content) + if err != nil { + return 0, err + } + + if computed != objectID { + return 0, fmt.Errorf("%w: thin base %s content hashes to %s", ErrMalformedPack, objectID, computed) + } + + start := ingestion.scanner.consumed + startOffset := int64(start) + + headerBytes := packfile.AppendTypeSize(nil, entryType, uint64(len(content))) + + _, err = ingestion.packFile.WriteAt(headerBytes, startOffset) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: writing thin base header: %w", err) + } + + crc := crc32.NewIEEE() + _, _ = crc.Write(headerBytes) + + dataOffset := startOffset + int64(len(headerBytes)) + writer := &offsetWriter{file: ingestion.packFile, offset: dataOffset} + + zw := zlib.NewWriter(io.MultiWriter(writer, crc)) + + _, err = zw.Write(content) + if err != nil { + _ = zw.Close() + + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + err = zw.Close() + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + headerLen := len(headerBytes) + packedLen := headerLen + writer.written + ingestion.scanner.consumed = start + packedLen + + rec := record{ + offset: start, + headerLen: headerLen, + packedLen: packedLen, + crc32: crc.Sum32(), + packedType: entryType, + declaredSize: len(content), + baseOffset: 0, + baseOID: id.ObjectID{}, + objectType: entryType, + oid: objectID, + resolved: true, + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[start] = index + ingestion.byOID[objectID] = index + + return index, nil +} + +// rewriteHeaderTrailer updates the pack object count +// and recomputes the pack trailer hash +// over the entries left after thin completion. +func (ingestion *ingestion) rewriteHeaderTrailer() error { + count, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = ingestion.packFile.WriteAt(packfile.AppendHeader(nil, count), 0) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rewriting header: %w", err) + } + + bodyEnd := int64(ingestion.scanner.consumed) + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = io.Copy(hashImpl, io.NewSectionReader(ingestion.packFile, 0, bodyEnd)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rehashing pack: %w", err) + } + + sum := hashImpl.Sum(nil) + + _, err = ingestion.packFile.WriteAt(sum, bodyEnd) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing trailer: %w", err) + } + + packHash, err := ingestion.objectFormat.FromBytes(sum) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// offsetWriter writes to a file via WriteAt, +// advancing sequentially from a base offset +// and counting the bytes written. +type offsetWriter struct { + file *os.File + offset int64 + written int +} + +// Write implements [io.Writer]. +func (writer *offsetWriter) Write(p []byte) (int, error) { + n, err := writer.file.WriteAt(p, writer.offset) + writer.offset += int64(n) + writer.written += n + + return n, err //nolint:wrapcheck +} diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go new file mode 100644 index 00000000..adc0ba35 --- /dev/null +++ b/object/store/packed/internal/ingest/writepack_test.go @@ -0,0 +1,494 @@ +package ingest_test + +import ( + "bytes" + "errors" + "io" + "os" + "path/filepath" + "testing" + + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/store/packed/internal/ingest" +) + +// TestWritePackMatchesGit verifies that ingesting a normal pack +// matches git's own pack, index, and reverse index. +// +// The pack is streamed through verbatim, +// and the index and reverse index are regenerated deterministically, +// so a successful match also confirms that scanning and delta resolution +// recovered every object ID, offset, and CRC that git recorded. +func TestWritePackMatchesGit(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if want := filepath.Base(gitPrefix) + ".pack"; result.PackName != want { + t.Fatalf("PackName = %q, want %q", result.PackName, want) + } + + for _, artifact := range []struct { + kind string + ours string + want string + }{ + {"pack", result.PackName, gitPrefix + ".pack"}, + {"idx", result.IdxName, gitPrefix + ".idx"}, + {"rev", result.RevName, gitPrefix + ".rev"}, + } { + ours, err := os.ReadFile(filepath.Join(dir, artifact.ours)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile %s: %v", artifact.kind, err) + } + + want, err := os.ReadFile(artifact.want) + if err != nil { + t.Fatalf("ReadFile git %s: %v", artifact.kind, err) + } + + if !bytes.Equal(ours, want) { + t.Errorf("%s differs from git: %d bytes vs %d", artifact.kind, len(ours), len(want)) + } + } + }) + } +} + +// TestWritePackBloom verifies that ingesting a pack writes a Bloom filter +// that reports every object in the pack as present. +func TestWritePackBloom(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if result.BloomName == "" { + t.Fatal("BloomName is empty") + } + + bloomBytes, err := os.ReadFile(filepath.Join(dir, result.BloomName)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile bloom: %v", err) + } + + filter, err := bloom.Parse(bloomBytes, objectFormat) + if err != nil { + t.Fatalf("bloom.Parse: %v", err) + } + + idxBytes, err := os.ReadFile(filepath.Join(dir, result.IdxName)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile idx: %v", err) + } + + index, err := packidx.Parse(idxBytes, objectFormat.Size()) + if err != nil { + t.Fatalf("packidx.Parse: %v", err) + } + + if !bytes.Equal(filter.PackHash(), index.PackHash()) { + t.Fatalf("filter pack hash %x, want %x", filter.PackHash(), index.PackHash()) + } + + for pos := range index.NumObjects() { + if !filter.MayContain(index.OIDAt(pos)) { + t.Fatalf("filter rejects object at index position %d", pos) + } + } + }) + } +} + +// TestWritePackEmpty verifies that a zero-object pack +// succeeds without writing any artifacts. +func TestWritePackEmpty(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + stream, err := repo.PackObjectsStdout(t, nil, testgit.PackObjectsStdoutOptions{ + Revs: false, + Thin: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if result.ObjectCount != 0 { + t.Fatalf("ObjectCount = %d, want 0", result.ObjectCount) + } + + if result.PackName != "" { + t.Fatalf("PackName = %q, want empty", result.PackName) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("ReadDir: %v", err) + } + + if len(entries) != 0 { + t.Fatalf("empty pack wrote %d files, want 0", len(entries)) + } + }) + } +} + +// TestWritePackIdempotent verifies that ingesting the same pack twice +// into one store succeeds and leaves the artifacts in place. +func TestWritePackIdempotent(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + first, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("first WritePack: %v", err) + } + + second, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("second WritePack: %v", err) + } + + if second.PackName != first.PackName { + t.Fatalf("second PackName = %q, want %q", second.PackName, first.PackName) + } + + for _, name := range []string{first.PackName, first.IdxName, first.RevName} { + _, err := os.Stat(filepath.Join(dir, name)) + if err != nil { + t.Fatalf("missing %q after re-write: %v", name, err) + } + } + }) + } +} + +// writePack ingests src into a fresh store directory, +// returning the directory and the ingest result. +func writePack( + t *testing.T, + objectFormat id.ObjectFormat, + src io.Reader, + opts store.PackWriteOptions, +) (string, ingest.Result) { + t.Helper() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + result, err := ingest.WritePack(root, objectFormat, src, opts) + if err != nil { + t.Fatalf("WritePack: %v", err) + } + + return dir, result +} + +// TestWritePackThin verifies that a thin pack is completed from the thin base +// and that git accepts the resulting self-contained pack. +func TestWritePackThin(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + thinBase := fullStore(t, repo, objectFormat, seeded) + stream := thinStream(t, repo, seeded) + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: thinBase, + Progress: nil, + }) + + if !result.ThinFixed { + t.Fatalf("ThinFixed = false, want true (pack was not thin)") + } + + _, err := repo.VerifyPack(t, filepath.Join(dir, result.IdxName)) + if err != nil { + t.Fatalf("VerifyPack on completed pack: %v", err) + } + }) + } +} + +// TestWritePackThinWithoutBase verifies that a thin pack is rejected +// when no thin base is supplied. +func TestWritePackThinWithoutBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if !errors.Is(err, ingest.ErrThinPackNotPermitted) { + t.Fatalf("err = %v, want ErrThinPackNotPermitted", err) + } + }) + } +} + +// TestWritePackThinMissingBase verifies that a thin pack +// whose bases are absent from the thin base +// reports the missing object IDs. +func TestWritePackThinMissingBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + emptyBase := emptyStore(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: emptyBase, + Progress: nil, + }) + + missing, ok := errors.AsType[*ingest.ThinBasesMissingError](err) + if !ok { + t.Fatalf("err = %v, want *ThinBasesMissingError", err) + } + + if len(missing.OIDs) == 0 { + t.Fatalf("ThinBasesMissingError reported no object IDs") + } + }) + } +} + +// seedHistory creates one repository with a seeded history. +func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) { + t.Helper() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + return repo, seeded +} + +// thinStream produces a thin pack of the tip commit excluding its parent, +// so its deltas reference the omitted parent objects. +func thinStream(t *testing.T, repo *testgit.Repo, seeded testgit.Seeded) []byte { + t.Helper() + + tip := seeded.Commits[len(seeded.Commits)-1] + parent := seeded.Commits[len(seeded.Commits)-2] + + stream, err := repo.PackObjectsStdout(t, []id.ObjectID{tip}, testgit.PackObjectsStdoutOptions{ + Revs: true, + Thin: true, + Exclude: []id.ObjectID{parent}, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + return stream +} + +// fullStore opens a packed store over a pack of every seeded object. +func fullStore(t *testing.T, repo *testgit.Repo, objectFormat id.ObjectFormat, seeded testgit.Seeded) *packed.Packed { + t.Helper() + + prefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: false, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + return openStore(t, filepath.Dir(prefix), objectFormat) +} + +// emptyStore opens a packed store over an empty directory. +func emptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + return openStore(t, t.TempDir(), objectFormat) +} + +// openStore opens a packed store over dir. +func openStore(t *testing.T, dir string, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} + +// freshRoot opens a writable root over a fresh temporary directory. +func freshRoot(t *testing.T) *os.Root { + t.Helper() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + return root +} diff --git a/object/store/packed/lookup.go b/object/store/packed/lookup.go new file mode 100644 index 00000000..e06870a9 --- /dev/null +++ b/object/store/packed/lookup.go @@ -0,0 +1,51 @@ +package packed + +import ( + "fmt" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/lgo/intconv" +) + +// lookup finds the pack containing objectID +// and the entry offset within it, +// probing packs in most-recently-used-ish order. +// +// Labels: Life-Parent. +func (packed *Packed) lookup(objectID id.ObjectID) (*pack, int, error) { + if objectID.ObjectFormat() != packed.objectFormat { + return nil, 0, fmt.Errorf( + "%w: got %s want %s", + id.ErrInvalidObjectFormat, objectID.ObjectFormat(), packed.objectFormat, + ) + } + + oid := objectID.RawBytes() + + for _, p := range packed.order.Keys() { + if p.filter != nil && !p.filter.MayContain(oid) { + continue + } + + offsetU, found, err := p.idx.Lookup(oid) + if err != nil { + return nil, 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + if !found { + continue + } + + offset, err := intconv.Uint64ToInt(offsetU) + if err != nil { + return nil, 0, fmt.Errorf("%w: pack %q: entry offset overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + packed.order.Touch(p) + + return p, offset, nil + } + + return nil, 0, store.ErrObjectNotFound +} diff --git a/object/store/packed/lookup_test.go b/object/store/packed/lookup_test.go new file mode 100644 index 00000000..d9f3dff9 --- /dev/null +++ b/object/store/packed/lookup_test.go @@ -0,0 +1,35 @@ +package packed_test + +import ( + "errors" + "testing" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +func TestLookupMissing(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + _, prefix, seeded := makeGitPack(t, objectFormat) + packedStore := openPackedStore(t, prefix, objectFormat) + + raw := seeded.Blobs[0].Bytes() + raw[len(raw)-1] ^= 0xff + + missing, err := objectFormat.FromBytes(raw) + if err != nil { + t.Fatalf("FromBytes: %v", err) + } + + _, _, err = packedStore.ReadBytesContent(missing) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("ReadBytesContent error = %v, want ErrObjectNotFound", err) + } + }) + } +} diff --git a/object/store/packed/pack.go b/object/store/packed/pack.go new file mode 100644 index 00000000..9cd6162b --- /dev/null +++ b/object/store/packed/pack.go @@ -0,0 +1,170 @@ +package packed + +import ( + "bytes" + "errors" + "fmt" + "os" + + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/mmap" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +var ( + errPackTruncated = errors.New("truncated") + errPackMalformedHeader = errors.New("malformed header") + errPackCountMismatch = errors.New("object count differs from index") + errPackTrailerMismatch = errors.New("trailer hash differs from index") +) + +// pack is one discovered pack: +// its base name, its parsed index, and its mapped data. +// All fields are immutable after openPack. +type pack struct { + // name is the pack base name, like "pack-<hash>". + name string + + // idxMapping owns the mapped pack index bytes, + // and idx is the parsed index view over them. + idxMapping *mmap.Mmap + idx packidx.Packidx + + // dataMapping owns the mapped pack data bytes, + // and data aliases them. + dataMapping *mmap.Mmap + data []byte + + bloomMapping *mmap.Mmap + filter *bloom.Bloom +} + +// openPack opens, maps, and validates +// one pack index and its pack data +// by pack base name. +func openPack(root *os.Root, name string, objectFormat id.ObjectFormat) (*pack, error) { + idxMapping, err := mapFile(root, name+".idx") + if err != nil { + return nil, err + } + + idx, err := packidx.Parse(idxMapping.Data(), objectFormat.Size()) + if err != nil { + _ = idxMapping.Close() + + return nil, fmt.Errorf("%w: index %q: %w", ErrMalformedPackedStore, name, err) + } + + dataMapping, err := mapFile(root, name+".pack") + if err != nil { + _ = idxMapping.Close() + + return nil, err + } + + err = validatePackData(dataMapping.Data(), &idx, objectFormat.Size()) + if err != nil { + _ = idxMapping.Close() + _ = dataMapping.Close() + + return nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, name, err) + } + + bloomMapping, filter := openBloom(root, name, objectFormat, idx.PackHash()) + + return &pack{ + name: name, + idxMapping: idxMapping, + idx: idx, + dataMapping: dataMapping, + data: dataMapping.Data(), + bloomMapping: bloomMapping, + filter: filter, + }, nil +} + +func openBloom(root *os.Root, name string, objectFormat id.ObjectFormat, packHash []byte) (*mmap.Mmap, *bloom.Bloom) { + mapping, err := mapFile(root, name+".bloom") + if err != nil { + return nil, nil + } + + filter, err := bloom.Parse(mapping.Data(), objectFormat) + if err != nil { + _ = mapping.Close() + + return nil, nil + } + + if !bytes.Equal(filter.PackHash(), packHash) { + _ = mapping.Close() + + return nil, nil + } + + return mapping, &filter +} + +// mapFile opens and maps one file under root. +func mapFile(root *os.Root, name string) (*mmap.Mmap, error) { + file, err := root.Open(name) + if err != nil { + return nil, fmt.Errorf("object/store/packed: %w", err) + } + + defer func() { _ = file.Close() }() + + mapping, err := mmap.Open(file) + if err != nil { + return nil, fmt.Errorf("object/store/packed: %q: %w", name, err) + } + + return mapping, nil +} + +// validatePackData checks one mapped pack +// against the pack format and its index. +func validatePackData(data []byte, idx *packidx.Packidx, hashSize int) error { + if len(data) < packfile.HeaderLen+hashSize { + return errPackTruncated + } + + header, err := packfile.ParseHeader(data) + if err != nil { + return fmt.Errorf("%w: %w", errPackMalformedHeader, err) + } + + count := uint64(header.ObjectCount) + + numObjects, err := intconv.IntToUint64(idx.NumObjects()) + if err != nil { + return fmt.Errorf("object count: %w", err) + } + + if count != numObjects { + return errPackCountMismatch + } + + if !bytes.Equal(data[len(data)-hashSize:], idx.PackHash()) { + return errPackTrailerMismatch + } + + return nil +} + +// close releases the pack data, index, and filter mappings. +func (pack *pack) close() error { + errs := []error{ + pack.dataMapping.Close(), + pack.idxMapping.Close(), + } + + if pack.bloomMapping != nil { + errs = append(errs, pack.bloomMapping.Close()) + } + + return errors.Join(errs...) +} diff --git a/object/store/packed/packed.go b/object/store/packed/packed.go new file mode 100644 index 00000000..897b3b98 --- /dev/null +++ b/object/store/packed/packed.go @@ -0,0 +1,101 @@ +package packed + +import ( + "errors" + "os" + "sync" + + "lindenii.org/go/furgit/internal/cache/clock" + "lindenii.org/go/furgit/internal/mru" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +// ErrMalformedPackedStore reports that +// a pack or pack index in the store is +// truncated, inconsistent, or otherwise corrupt. +var ErrMalformedPackedStore = errors.New("object/store/packed: malformed packed store") + +// Packed reads Git objects from pack/index files +// under an objects/pack root. +// +// Packs appearing after construction are only visible +// after an explicit [Packed.Refresh]. +// +// Labels: Close-Caller. +type Packed struct { + // root is the objects/pack directory + // used for all pack and index file access. + root *os.Root + + // objectFormat is the expected object format for lookups. + objectFormat id.ObjectFormat + + // order contains the packs to probe, MRU-first. + order *mru.Order[*pack] + + // baseCache caches delta bases consumed during resolution. + baseCache *clock.Clock[baseKey, cachedBase] + + // refreshMu serializes Refresh. + // Readers uses none of these. + refreshMu sync.Mutex + + // byName supports reusing surviving packs across Refresh, + // and retired holds dropped packs until Close, + // since concurrent readers may still use them. + byName map[string]*pack + + retired []*pack +} + +var _ store.ObjectReader = (*Packed)(nil) + +// New creates a packed-object store rooted at an objects/pack directory, +// performing an initial Refresh. +// +// Labels: Deps-Borrowed, Life-Parent. +func New(root *os.Root, objectFormat id.ObjectFormat) (*Packed, error) { + if objectFormat.Size() == 0 { + return nil, id.ErrInvalidObjectFormat + } + + packed := &Packed{ + root: root, + objectFormat: objectFormat, + order: mru.New[*pack](mru.Options{Interval: 48}), + baseCache: newBaseCache(), + refreshMu: sync.Mutex{}, + byName: nil, + retired: nil, + } + + err := packed.Refresh() + if err != nil { + return nil, err + } + + return packed, nil +} + +// Close releases mapped pack/index resources associated with the store. +// +// Labels: MT-Unsafe. +func (packed *Packed) Close() error { + errs := make([]error, 0, len(packed.byName)+len(packed.retired)) + + for _, p := range packed.byName { + errs = append(errs, p.close()) + } + + for _, p := range packed.retired { + errs = append(errs, p.close()) + } + + packed.byName = nil + packed.retired = nil + + packed.baseCache.Clear() + + return errors.Join(errs...) +} diff --git a/object/store/packed/quarantine.go b/object/store/packed/quarantine.go new file mode 100644 index 00000000..977a9543 --- /dev/null +++ b/object/store/packed/quarantine.go @@ -0,0 +1,166 @@ +package packed + +import ( + "crypto/rand" + "errors" + "fmt" + "io/fs" + "os" + "slices" + "strings" + + "lindenii.org/go/furgit/object/store" +) + +var ( + _ store.PackQuarantiner = (*Packed)(nil) + _ store.PackQuarantine = (*packQuarantine)(nil) +) + +var errQuarantineNamesExhausted = errors.New("object/store/packed: exhausted quarantine directory names") + +// packQuarantine is one quarantined packed store +// rooted privately beneath a destination pack root. +type packQuarantine struct { + *Packed + + parent *Packed + + tempName string + tempRoot *os.Root +} + +// BeginPackQuarantine creates one quarantined packed store +// rooted privately beneath the destination pack root. +// +// Labels: Deps-Borrowed, Life-Parent. +func (packed *Packed) BeginPackQuarantine(_ store.PackQuarantineOptions) (store.PackQuarantine, error) { //nolint:ireturn + tempName, tempRoot, err := createPackQuarantineRoot(packed.root) + if err != nil { + return nil, err + } + + quarantineStore, err := New(tempRoot, packed.objectFormat) + if err != nil { + _ = tempRoot.Close() + _ = packed.root.RemoveAll(tempName) + + return nil, err + } + + return &packQuarantine{ + Packed: quarantineStore, + parent: packed, + tempName: tempName, + tempRoot: tempRoot, + }, nil +} + +// Promote publishes the quarantined pack artifacts into the parent store, +// refreshes the parent so the objects become available, +// and invalidates the receiver. +func (quarantine *packQuarantine) Promote() error { + closeErr := quarantine.Close() + promoteErr := quarantine.promoteAll() + + var refreshErr error + if promoteErr == nil { + refreshErr = quarantine.parent.Refresh() + } + + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + return errors.Join(closeErr, promoteErr, refreshErr, tempRootErr, removeErr) +} + +// Discard removes the quarantine and invalidates the receiver. +func (quarantine *packQuarantine) Discard() error { + closeErr := quarantine.Close() + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + return errors.Join(closeErr, tempRootErr, removeErr) +} + +// promoteAll links every pack artifact in the quarantine into the parent store, +// in pack/rev/idx dependency order. +func (quarantine *packQuarantine) promoteAll() error { + entries, err := fs.ReadDir(quarantine.tempRoot.FS(), ".") + if err != nil { + return fmt.Errorf("object/store/packed: %w", err) + } + + slices.SortFunc(entries, func(left, right fs.DirEntry) int { + return packPromotionPriority(left.Name()) - packPromotionPriority(right.Name()) + }) + + for _, entry := range entries { + err := quarantine.promoteFile(entry.Name()) + if err != nil { + return err + } + } + + return nil +} + +// promoteFile links one quarantined artifact into the parent store, +// treating an already-present destination as success. +func (quarantine *packQuarantine) promoteFile(name string) error { + src := quarantine.tempName + "/" + name + + err := quarantine.parent.root.Link(src, name) + if err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("object/store/packed: promoting %q: %w", name, err) + } + + _ = quarantine.parent.root.Remove(src) + + return nil +} + +// createPackQuarantineRoot creates a private quarantine directory beneath parent +// and returns its name and an os.Root over it. +func createPackQuarantineRoot(parent *os.Root) (string, *os.Root, error) { + for range 32 { + name := "tmp_packq_" + rand.Text() + + err := parent.Mkdir(name, 0o700) + if err != nil { + if errors.Is(err, fs.ErrExist) { + continue + } + + return "", nil, fmt.Errorf("object/store/packed: %w", err) + } + + root, err := parent.OpenRoot(name) + if err != nil { + _ = parent.RemoveAll(name) + + return "", nil, fmt.Errorf("object/store/packed: %w", err) + } + + return name, root, nil + } + + return "", nil, errQuarantineNamesExhausted +} + +// packPromotionPriority orders pack artifacts +// so that data files are linked before the index that publishes them. +func packPromotionPriority(name string) int { + switch { + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".pack"): + return 1 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".rev"): + return 2 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".bloom"): + return 2 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".idx"): + return 3 + default: + return 0 + } +} diff --git a/object/store/packed/read_test.go b/object/store/packed/read_test.go new file mode 100644 index 00000000..64faaf5b --- /dev/null +++ b/object/store/packed/read_test.go @@ -0,0 +1,140 @@ +package packed_test + +import ( + "bytes" + "io" + "testing" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/typ" +) + +func TestReadGitPack(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, prefix, seeded := makeGitPack(t, objectFormat) + requireDeltas(t, repo, prefix, objectFormat) + + packedStore := openPackedStore(t, prefix, objectFormat) + + groups := []struct { + ty typ.Type + oids []id.ObjectID + }{ + {ty: typ.Blob, oids: seeded.Blobs}, + {ty: typ.Tree, oids: seeded.Trees}, + {ty: typ.Commit, oids: seeded.Commits}, + {ty: typ.Tag, oids: seeded.Tags}, + } + + for _, group := range groups { + for _, oid := range group.oids { + wantContent, err := repo.CatFile(t, group.ty, oid) + if err != nil { + t.Fatalf("CatFile(%s): %v", oid, err) + } + + ty, content, err := packedStore.ReadBytesContent(oid) + if err != nil { + t.Fatalf("ReadBytesContent(%s): %v", oid, err) + } + + if ty != group.ty { + t.Fatalf("ReadBytesContent(%s) type = %v, want %v", oid, ty, group.ty) + } + + if !bytes.Equal(content, wantContent) { + t.Fatalf("ReadBytesContent(%s) content mismatch", oid) + } + + raw, err := packedStore.ReadBytesFull(oid) + if err != nil { + t.Fatalf("ReadBytesFull(%s): %v", oid, err) + } + + if got := objectFormat.Sum(raw); got != oid { + t.Fatalf("ReadBytesFull(%s) hashes to %s", oid, got) + } + + ty, size, err := packedStore.ReadHeader(oid) + if err != nil { + t.Fatalf("ReadHeader(%s): %v", oid, err) + } + + if ty != group.ty { + t.Fatalf("ReadHeader(%s) type = %v, want %v", oid, ty, group.ty) + } + + if size != len(wantContent) { + t.Fatalf("ReadHeader(%s) size = %d, want %d", oid, size, len(wantContent)) + } + + size, err = packedStore.ReadSize(oid) + if err != nil { + t.Fatalf("ReadSize(%s): %v", oid, err) + } + + if size != len(wantContent) { + t.Fatalf("ReadSize(%s) = %d, want %d", oid, size, len(wantContent)) + } + + checkReaderContent(t, packedStore, oid, group.ty, wantContent) + checkReaderFull(t, packedStore, oid, objectFormat) + } + } + }) + } +} + +func checkReaderContent(t *testing.T, packedStore *packed.Packed, oid id.ObjectID, wantType typ.Type, wantContent []byte) { + t.Helper() + + ty, size, reader, err := packedStore.ReadReaderContent(oid) + if err != nil { + t.Fatalf("ReadReaderContent(%s): %v", oid, err) + } + + defer func() { _ = reader.Close() }() + + if ty != wantType { + t.Fatalf("ReadReaderContent(%s) type = %v, want %v", oid, ty, wantType) + } + + if size != len(wantContent) { + t.Fatalf("ReadReaderContent(%s) size = %d, want %d", oid, size, len(wantContent)) + } + + content, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("ReadReaderContent(%s) read: %v", oid, err) + } + + if !bytes.Equal(content, wantContent) { + t.Fatalf("ReadReaderContent(%s) content mismatch", oid) + } +} + +func checkReaderFull(t *testing.T, packedStore *packed.Packed, oid id.ObjectID, objectFormat id.ObjectFormat) { + t.Helper() + + reader, err := packedStore.ReadReaderFull(oid) + if err != nil { + t.Fatalf("ReadReaderFull(%s): %v", oid, err) + } + + defer func() { _ = reader.Close() }() + + raw, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("ReadReaderFull(%s) read: %v", oid, err) + } + + if got := objectFormat.Sum(raw); got != oid { + t.Fatalf("ReadReaderFull(%s) hashes to %s", oid, got) + } +} diff --git a/object/store/packed/reader.go b/object/store/packed/reader.go new file mode 100644 index 00000000..cf433cfc --- /dev/null +++ b/object/store/packed/reader.go @@ -0,0 +1,226 @@ +package packed + +import ( + "bytes" + "fmt" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/iolimit" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/typ" + "lindenii.org/go/lgo/intconv" +) + +// ReadBytesContent reads an object's type and content bytes, +// fully resolving delta chains. +func (packed *Packed) ReadBytesContent(objectID id.ObjectID) (typ.Type, []byte, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return typ.Unknown, nil, err + } + + entryType, content, err := packed.unpackEntry(p, offset) + if err != nil { + return typ.Unknown, nil, err + } + + ty, err := objectTypeOf(entryType) + if err != nil { + return typ.Unknown, nil, err + } + + return ty, content, nil +} + +// ReadBytesFull reads a full serialized object as "type size\x00content", +// fully resolving delta chains. +func (packed *Packed) ReadBytesFull(objectID id.ObjectID) ([]byte, error) { + ty, content, err := packed.ReadBytesContent(objectID) + if err != nil { + return nil, err + } + + raw := header.Append(make([]byte, 0, len(content)+32), ty, len(content)) + + return append(raw, content...), nil +} + +// ReadHeader reads an object's type and declared content length. +// +// For delta entries this resolves the chained base type +// and the declared delta result size, +// without reconstructing content. +func (packed *Packed) ReadHeader(objectID id.ObjectID) (typ.Type, int, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return typ.Unknown, 0, err + } + + entryHeader, payload, err := p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return typ.Unknown, 0, err + } + + var size int + + if entryHeader.Type.IsDelta() { + size, err = deltaResultSize(payload, entryHeader.Size) + if err != nil { + return typ.Unknown, 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + } else { + size, err = intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return typ.Unknown, 0, fmt.Errorf("%w: pack %q: object size overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + } + + entryType, err := packed.resolveType(p, offset, entryHeader) + if err != nil { + return typ.Unknown, 0, err + } + + ty, err := objectTypeOf(entryType) + if err != nil { + return typ.Unknown, 0, err + } + + return ty, size, nil +} + +// ReadSize reads an object's declared content length. +// +// Unlike ReadHeader, +// this never walks delta chains. +func (packed *Packed) ReadSize(objectID id.ObjectID) (int, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return 0, err + } + + entryHeader, payload, err := p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return 0, err + } + + if !entryHeader.Type.IsDelta() { + size, err := intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: object size overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + return size, nil + } + + size, err := deltaResultSize(payload, entryHeader.Size) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + return size, nil +} + +// ReadReaderContent reads an object's type, +// declared content length, and content stream. +// +// Non-delta entries stream directly from the pack; +// delta entries are fully resolved in memory first. +// Close releases resources only +// and does not drain unread data for additional validation. +func (packed *Packed) ReadReaderContent(objectID id.ObjectID) (typ.Type, int, io.ReadCloser, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return typ.Unknown, 0, nil, err + } + + entryHeader, payload, err := p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return typ.Unknown, 0, nil, err + } + + if !entryHeader.Type.IsBase() { + entryType, content, err := packed.unpackEntry(p, offset) + if err != nil { + return typ.Unknown, 0, nil, err + } + + ty, err := objectTypeOf(entryType) + if err != nil { + return typ.Unknown, 0, nil, err + } + + return ty, len(content), io.NopCloser(bytes.NewReader(content)), nil + } + + ty, err := objectTypeOf(entryHeader.Type) + if err != nil { + return typ.Unknown, 0, nil, err + } + + size, err := intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return typ.Unknown, 0, nil, fmt.Errorf("%w: pack %q: object size overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + zr, err := zlib.NewReaderBytes(payload) + if err != nil { + return typ.Unknown, 0, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + return ty, size, &objectReader{ + reader: iolimit.ExpectLengthReader(zr, size), + zr: zr, + }, nil +} + +// ReadReaderFull reads a full serialized object stream +// as "type size\x00content". +// +// Non-delta entries stream directly from the pack; +// delta entries are fully resolved in memory first. +// Close releases resources only +// and does not drain unread data for additional validation. +func (packed *Packed) ReadReaderFull(objectID id.ObjectID) (io.ReadCloser, error) { + ty, size, reader, err := packed.ReadReaderContent(objectID) + if err != nil { + return nil, err + } + + headerBytes := header.Append(nil, ty, size) + + return &objectReader{ + reader: io.MultiReader(bytes.NewReader(headerBytes), reader), + zr: reader, + }, nil +} + +// objectTypeOf converts one packfile entry type +// into an ordinary object type. +func objectTypeOf(entryType packfile.EntryType) (typ.Type, error) { + ty, err := entryType.ObjectType() + if err != nil { + return typ.Unknown, fmt.Errorf("%w: %w", ErrMalformedPackedStore, err) + } + + return ty, nil +} + +// objectReader streams one packed object payload +// and owns the underlying decompressor. +type objectReader struct { + // reader is the stream exposed by Read. + reader io.Reader + // zr is closed by Close. + zr io.Closer +} + +func (reader *objectReader) Read(dst []byte) (int, error) { + return reader.reader.Read(dst) +} + +func (reader *objectReader) Close() error { + return reader.zr.Close() +} diff --git a/object/store/packed/refresh.go b/object/store/packed/refresh.go new file mode 100644 index 00000000..f06e9859 --- /dev/null +++ b/object/store/packed/refresh.go @@ -0,0 +1,69 @@ +package packed + +import ( + "fmt" + "io/fs" + "strings" +) + +// Refresh rescans the pack directory +// and replaces the store's view of available packs. +// +// Every index found must parse +// and have its pack data present and consistent; +// otherwise Refresh fails without changing the view. +func (packed *Packed) Refresh() error { + packed.refreshMu.Lock() + defer packed.refreshMu.Unlock() + + dirEntries, err := fs.ReadDir(packed.root.FS(), ".") + if err != nil { + return fmt.Errorf("object/store/packed: %w", err) + } + + next := make(map[string]*pack, len(packed.byName)) + + opened := make([]*pack, 0, len(dirEntries)) + + for _, dirEntry := range dirEntries { + name, ok := strings.CutSuffix(dirEntry.Name(), ".idx") + if !ok || dirEntry.IsDir() { + continue + } + + if existing, ok := packed.byName[name]; ok { + next[name] = existing + + continue + } + + p, err := openPack(packed.root, name, packed.objectFormat) + if err != nil { + for _, p := range opened { + _ = p.close() + } + + return err + } + + opened = append(opened, p) + next[name] = p + } + + for name, p := range packed.byName { + if _, ok := next[name]; !ok { + packed.retired = append(packed.retired, p) + } + } + + packed.byName = next + + present := make(map[*pack]struct{}, len(next)) + for _, p := range next { + present[p] = struct{}{} + } + + packed.order.Sync(present) + + return nil +} diff --git a/object/store/packed/refresh_test.go b/object/store/packed/refresh_test.go new file mode 100644 index 00000000..e54dc97d --- /dev/null +++ b/object/store/packed/refresh_test.go @@ -0,0 +1,108 @@ +package packed_test + +import ( + "errors" + "os" + "path/filepath" + "testing" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" +) + +func TestRefreshIsExplicit(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + _, prefix, seeded := makeGitPack(t, objectFormat) + oids := seeded.All() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + _, _, err = packedStore.ReadBytesContent(oids[0]) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("empty store read error = %v, want ErrObjectNotFound", err) + } + + t.Helper() + + base := filepath.Base(prefix) + cp(t, prefix+".pack", filepath.Join(dir, base+".pack")) + cp(t, prefix+".idx", filepath.Join(dir, base+".idx")) + + // New packs must stay invisible until an explicit Refresh. + _, _, err = packedStore.ReadBytesContent(oids[0]) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("pre-Refresh read error = %v, want ErrObjectNotFound", err) + } + + err = packedStore.Refresh() + if err != nil { + t.Fatalf("Refresh: %v", err) + } + + for _, oid := range oids { + _, _, err := packedStore.ReadBytesContent(oid) + if err != nil { + t.Fatalf("post-Refresh ReadBytesContent(%s): %v", oid, err) + } + } + }) + } +} + +func TestRefreshRejectsIndexWithoutPack(t *testing.T) { + t.Parallel() + + objectFormat := id.ObjectFormatSHA256 + + _, prefix, _ := makeGitPack(t, objectFormat) + + dir := t.TempDir() + cp(t, prefix+".idx", filepath.Join(dir, filepath.Base(prefix)+".idx")) + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + _, err = packed.New(root, objectFormat) + if err == nil { + t.Fatalf("New with orphan index: expected error") + } +} + +// cp copies one file from src to dst. +func cp(t *testing.T, src, dst string) { + t.Helper() + + data, err := os.ReadFile(src) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + + err = os.WriteFile(dst, data, 0o600) //#nosec G703 + if err != nil { + t.Fatalf("WriteFile: %v", err) + } +} diff --git a/object/store/packed/writer.go b/object/store/packed/writer.go new file mode 100644 index 00000000..59309c24 --- /dev/null +++ b/object/store/packed/writer.go @@ -0,0 +1,38 @@ +package packed + +import ( + "fmt" + "io" + + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed/internal/ingest" +) + +var _ store.PackWriter = (*Packed)(nil) + +// WritePack ingests one pack stream into the pack store, +// publishing a pack, index, and reverse index +// under content-addressed names derived from the pack trailer hash. +// +// WritePack consumes the pack stream through its trailer and stops there. +// It does not require src to reach EOF afterward, +// so it is safe on a still-open transport connection, +// such as receive-pack, +// whose peer keeps the connection open to read the response. +// +// The pack must be the last thing the peer sends before that response: +// any bytes arriving immediately after the trailer +// are rejected as a malformed pack. +func (packed *Packed) WritePack(src io.Reader, opts store.PackWriteOptions) error { + _, err := ingest.WritePack(packed.root, packed.objectFormat, src, opts) + if err != nil { + return err //nolint:wrapcheck + } + + err = packed.Refresh() + if err != nil { + return fmt.Errorf("object/store/packed: refresh after pack write: %w", err) + } + + return nil +} diff --git a/object/store/packed/writer_test.go b/object/store/packed/writer_test.go new file mode 100644 index 00000000..8227caa7 --- /dev/null +++ b/object/store/packed/writer_test.go @@ -0,0 +1,105 @@ +package packed_test + +import ( + "bytes" + "os" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/typ" +) + +// TestWritePack verifies that writing a pack through the store +// makes its objects readable without a manual refresh. +func TestWritePack(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + stream, err := repo.PackObjectsStdout(t, seeded.All(), testgit.PackObjectsStdoutOptions{ + Revs: false, + Thin: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + packedStore := openEmptyStore(t, objectFormat) + + err = packedStore.WritePack(bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("WritePack: %v", err) + } + + probes := []struct { + ty typ.Type + oid id.ObjectID + }{ + {typ.Blob, seeded.Blobs[0]}, + {typ.Tree, seeded.Trees[0]}, + {typ.Commit, seeded.Commits[len(seeded.Commits)-1]}, + {typ.Tag, seeded.Tags[0]}, + } + + for _, probe := range probes { + want, err := repo.CatFile(t, probe.ty, probe.oid) + if err != nil { + t.Fatalf("CatFile(%s): %v", probe.oid, err) + } + + ty, content, err := packedStore.ReadBytesContent(probe.oid) + if err != nil { + t.Fatalf("ReadBytesContent(%s): %v", probe.oid, err) + } + + if ty != probe.ty { + t.Fatalf("ReadBytesContent(%s) type = %v, want %v", probe.oid, ty, probe.ty) + } + + if !bytes.Equal(content, want) { + t.Fatalf("ReadBytesContent(%s) content mismatch", probe.oid) + } + } + }) + } +} + +// openEmptyStore opens a packed store over a fresh empty directory. +func openEmptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} diff --git a/object/store/reader.go b/object/store/reader.go index e8829b87..bbfe1fe8 100644 --- a/object/store/reader.go +++ b/object/store/reader.go @@ -23,12 +23,21 @@ type ObjectReader interface { // Users should treat this as an invariant; // implementations should not re-verify it on every read. // - // Labels: Life-Parent. + // The returned slice may alias storage owned by the backend, + // such as a memory-mapped pack or a shared cache buffer. + // Callers must not mutate it + // and must not retain it past the backend's lifetime. + // + // Labels: Life-Parent, Mut-No. ReadBytesFull(id id.ObjectID) ([]byte, error) // ReadBytesContent reads an object's type and content bytes. // - // Labels: Life-Parent. + // The returned slice may alias backend-owned storage. + // Callers must not mutate it + // and must not retain it past the backend's lifetime. + // + // Labels: Life-Parent, Mut-No. ReadBytesContent(id id.ObjectID) (typ.Type, []byte, error) // ReadReaderFull reads a full serialized object stream @@ -41,17 +50,17 @@ type ObjectReader interface { // declared content length, and content stream. // // Labels: Life-Parent, Close-Caller. - ReadReaderContent(id id.ObjectID) (typ.Type, uint64, io.ReadCloser, error) + ReadReaderContent(id id.ObjectID) (typ.Type, int, io.ReadCloser, error) // ReadSize reads an object's declared content length. // // This returns the same size as the second result of [ObjectReader.ReadHeader]; // for some implementations, this may be cheaper than ReadHeader // when callers do not need the object type. - ReadSize(id id.ObjectID) (uint64, error) + ReadSize(id id.ObjectID) (int, error) // ReadHeader reads an object's type and declared content length. - ReadHeader(id id.ObjectID) (typ.Type, uint64, error) + ReadHeader(id id.ObjectID) (typ.Type, int, error) // Refresh updates any backend-local discovery/cache view of on-disk objects. // diff --git a/object/store/writer.go b/object/store/writer.go index ce3284d2..d83eec6a 100644 --- a/object/store/writer.go +++ b/object/store/writer.go @@ -24,7 +24,7 @@ type ObjectWriter interface { WriteReaderFull(src io.Reader) (id.ObjectID, error) // WriteReaderContent writes one typed object content stream. - WriteReaderContent(ty typ.Type, size uint64, src io.Reader) (id.ObjectID, error) + WriteReaderContent(ty typ.Type, size int, src io.Reader) (id.ObjectID, error) } // PackWriter writes Git pack streams. |
