diff options
Diffstat (limited to 'object/store/packed')
26 files changed, 3695 insertions, 0 deletions
diff --git a/object/store/packed/basecache.go b/object/store/packed/basecache.go new file mode 100644 index 00000000..88597404 --- /dev/null +++ b/object/store/packed/basecache.go @@ -0,0 +1,38 @@ +package packed + +import ( + "lindenii.org/go/furgit/internal/cache/clock" + "lindenii.org/go/furgit/internal/format/packfile" +) + +// baseCacheMaxWeight bounds the delta base cache weight. +const baseCacheMaxWeight = 96 << 20 + +// baseKey addresses an entry in one pack +// as a delta base cache key. +type baseKey struct { + pack *pack + offset int +} + +// cachedBase is a cached delta base, i.e., +// its resolved object entry type and full content. +// +// content is shared with concurrent users; +// it may only be returned to callers as a copy. +// +// Labels: Mut-No. +type cachedBase struct { + entryType packfile.EntryType + content []byte +} + +// newBaseCache creates the delta base cache. +func newBaseCache() *clock.Clock[baseKey, cachedBase] { + return clock.New(baseCacheMaxWeight, baseWeight) +} + +// baseWeight weighs one cached delta base. +func baseWeight(_ baseKey, base cachedBase) uint64 { + return uint64(len(base.content)) + 32 +} diff --git a/object/store/packed/delta.go b/object/store/packed/delta.go new file mode 100644 index 00000000..5b538221 --- /dev/null +++ b/object/store/packed/delta.go @@ -0,0 +1,246 @@ +package packed + +import ( + "fmt" + "io" + "slices" + + "lindenii.org/go/furgit/errs" + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packfile/delta" + "lindenii.org/go/lgo/intconv" + "lindenii.org/go/lgo/sync" +) + +//nolint:gochecknoglobals +var deltaHeaderPool = sync.NewPool(func() *[delta.MaxHeaderSizesLen]byte { + return new([delta.MaxHeaderSizesLen]byte) +}) + +// deltaNode is a delta entry on a resolution chain. +type deltaNode struct { + // payload is the entry's compressed delta payload view. + payload []byte + + // size is the entry's declared inflated delta size. + size uint64 + + // baseOffset is the entry's base entry offset. + baseOffset int +} + +// unpackEntry reconstructs the object stored at offset in p, +// following ref- and ofs-delta chains within the pack. +// +// A direct base-cache hit returns the shared cache buffer itself, +// so the result may alias cache storage and must not be mutated; +// delta-applied results are freshly allocated. +// +// Labels: Life-Parent, Mut-No. +func (packed *Packed) unpackEntry(p *pack, offset int) (packfile.EntryType, []byte, error) { + var zero packfile.EntryType + + var ( + chain []deltaNode + baseType packfile.EntryType + base []byte + fromCache bool + ) + + // Drill down to the innermost base, + // stopping early at any cached base. + cur := offset + + for { + if cached, ok := packed.baseCache.Get(baseKey{pack: p, offset: cur}); ok { + baseType = cached.entryType + base = cached.content + fromCache = true + + break + } + + if len(chain) >= delta.MaxChainDepth { + return zero, nil, fmt.Errorf("%w: pack %q: delta chain too deep", ErrMalformedPackedStore, p.name) + } + + header, payload, err := p.entryHeaderAt(cur, packed.objectFormat) + if err != nil { + return zero, nil, err + } + + if header.Type.IsBase() { + base, err = inflate(payload, header.Size) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + baseType = header.Type + + break + } + + baseOffset, err := packed.deltaBaseOffset(p, cur, header) + if err != nil { + return zero, nil, err + } + + chain = append(chain, deltaNode{ + payload: payload, + size: header.Size, + baseOffset: baseOffset, + }) + + cur = baseOffset + } + + // A direct cache hit with no deltas to apply + // returns the shared cache buffer directly; + // callers are contractually Mut-No. + if len(chain) == 0 && fromCache { + return baseType, base, nil + } + + // Apply deltas back up the chain, caching each consumed base. + for i, node := range slices.Backward(chain) { + deltaData, err := inflate(node.payload, node.size) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + result, err := delta.Apply(base, deltaData) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + consumedFromCache := fromCache && i == len(chain)-1 + if !consumedFromCache { + packed.baseCache.Add( + baseKey{pack: p, offset: node.baseOffset}, + cachedBase{entryType: baseType, content: base}, + ) + } + + base = result + } + + return baseType, base, nil +} + +// deltaBaseOffset resolves a delta entry's base entry offset +// within the same pack. +func (packed *Packed) deltaBaseOffset(p *pack, offset int, header packfile.EntryHeader) (int, error) { + switch header.Type { + case packfile.EntryTypeOfsDelta: + dist, err := intconv.Uint64ToInt(header.OfsDistance) + if err != nil || dist == 0 || dist > offset { + return 0, fmt.Errorf("%w: pack %q: invalid ofs-delta distance", ErrMalformedPackedStore, p.name) + } + + return offset - dist, nil + case packfile.EntryTypeRefDelta: + refBase := header.RefBase[:packed.objectFormat.Size()] + + baseOffsetU, found, err := p.idx.Lookup(refBase) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + if !found { + baseID, idErr := packed.objectFormat.FromBytes(refBase) + if idErr != nil { + return 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, idErr) + } + + return 0, fmt.Errorf( + "%w: resolving ref-delta: %w", + ErrMalformedPackedStore, &errs.ObjectMissingError{OID: baseID}, + ) + } + + baseOffset, err := intconv.Uint64ToInt(baseOffsetU) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: ref-delta base offset overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + return baseOffset, nil + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } + + panic("object/store/packed: deltaBaseOffset on non-delta entry") +} + +// resolveType walks one delta chain +// to find the chained base object entry type, +// without inflating any content. +func (packed *Packed) resolveType(p *pack, offset int, entryHeader packfile.EntryHeader) (packfile.EntryType, error) { + var zero packfile.EntryType + + depth := 0 + + for entryHeader.Type.IsDelta() { + if cached, ok := packed.baseCache.Peek(baseKey{pack: p, offset: offset}); ok { + return cached.entryType, nil + } + + depth++ + if depth > delta.MaxChainDepth { + return zero, fmt.Errorf("%w: pack %q: delta chain too deep", ErrMalformedPackedStore, p.name) + } + + baseOffset, err := packed.deltaBaseOffset(p, offset, entryHeader) + if err != nil { + return zero, err + } + + offset = baseOffset + + entryHeader, _, err = p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return zero, err + } + } + + return entryHeader.Type, nil +} + +// deltaResultSize reads the declared result size +// from one compressed delta payload prefix. +func deltaResultSize(payload []byte, deltaSize uint64) (int, error) { + zr, err := zlib.NewReaderBytes(payload) + if err != nil { + return 0, fmt.Errorf("reading delta header: %w", err) + } + + defer func() { _ = zr.Close() }() + + buf := deltaHeaderPool.Get() + defer deltaHeaderPool.Put(buf) + + prefixLen := min(uint64(delta.MaxHeaderSizesLen), deltaSize) + + prefix := buf[:prefixLen] + + _, err = io.ReadFull(zr, prefix) + if err != nil { + return 0, fmt.Errorf("reading delta header: %w", err) + } + + _, resultSize, _, err := delta.ParseHeaderSizes(prefix) + if err != nil { + return 0, fmt.Errorf("reading delta header: %w", err) + } + + size, err := intconv.Uint64ToInt(resultSize) + if err != nil { + return 0, fmt.Errorf("reading delta header: result size overflows int: %w", err) + } + + return size, nil +} diff --git a/object/store/packed/doc.go b/object/store/packed/doc.go new file mode 100644 index 00000000..8c2a5bdc --- /dev/null +++ b/object/store/packed/doc.go @@ -0,0 +1,4 @@ +// Package packed provides Git object reading from, +// and pack writing to, +// an objects/pack directory. +package packed diff --git a/object/store/packed/entry.go b/object/store/packed/entry.go new file mode 100644 index 00000000..908afad0 --- /dev/null +++ b/object/store/packed/entry.go @@ -0,0 +1,73 @@ +package packed + +import ( + "errors" + "fmt" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +var errPayloadOverlong = errors.New("entry payload longer than declared") + +// entryHeaderAt parses the entry header at offset, +// returning it together with the entry's compressed payload view. +// +// The entry header only contains the inflated length, +// so payload slice extends to the end of the pack; +// the compressed data length is determined by the zlib stream end, +// not the slice length. +// +// Labels: Life-Parent, Mut-No. +func (pack *pack) entryHeaderAt(offset int, objectFormat id.ObjectFormat) (packfile.EntryHeader, []byte, error) { + var zero packfile.EntryHeader + + pos := offset + if pos < 0 || pos >= len(pack.data) { + return zero, nil, fmt.Errorf("%w: pack %q: entry offset out of bounds", ErrMalformedPackedStore, pack.name) + } + + header, err := packfile.ParseEntryHeader(pack.data[pos:], objectFormat.Size()) + if err != nil { + return zero, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, pack.name, err) + } + + return header, pack.data[pos+header.HeaderLen:], nil +} + +// inflate decompresses one entry payload of expectedSize bytes, +// rejecting payloads whose inflated size differs. +// +// Labels: Life-Independent. +func inflate(payload []byte, expectedSize uint64) ([]byte, error) { + size, err := intconv.Uint64ToInt(expectedSize) + if err != nil { + return nil, fmt.Errorf("declared size: %w", err) + } + + zr, err := zlib.NewReaderBytes(payload) + if err != nil { + return nil, fmt.Errorf("inflating entry payload: %w", err) + } + + defer func() { _ = zr.Close() }() + + out := make([]byte, size) + + _, err = io.ReadFull(zr, out) + if err != nil { + return nil, fmt.Errorf("inflating entry payload: %w", err) + } + + var probe [1]byte + + n, err := zr.Read(probe[:]) + if n != 0 || !errors.Is(err, io.EOF) { + return nil, errPayloadOverlong + } + + return out, nil +} diff --git a/object/store/packed/helpers_test.go b/object/store/packed/helpers_test.go new file mode 100644 index 00000000..9cd95ab8 --- /dev/null +++ b/object/store/packed/helpers_test.go @@ -0,0 +1,82 @@ +package packed_test + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store/packed" +) + +// makeGitPack seeds a repository, +// packs the seeded objects with git pack-objects, +// and returns the repository, the artifact path prefix, +// and the seeded objects. +func makeGitPack(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, string, testgit.Seeded) { + t.Helper() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + prefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{}) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + return repo, prefix, seeded +} + +// requireDeltas asserts that the pack at prefix +// contains at least one deltified entry, +// so that tests really do exercise delta resolution. +func requireDeltas(t *testing.T, repo *testgit.Repo, prefix string, objectFormat id.ObjectFormat) { + t.Helper() + + out, err := repo.VerifyPack(t, prefix+".idx") + if err != nil { + t.Fatalf("VerifyPack: %v", err) + } + + hexLen := objectFormat.HexLen() + + for line := range strings.Lines(string(out)) { + fields := strings.Fields(line) + if len(fields) >= 7 && len(fields[0]) == hexLen { + return + } + } + + t.Fatalf("fixture pack contains no deltified entries") +} + +// openPackedStore opens a packed store +// over the directory containing prefix's pack artifacts. +func openPackedStore(t *testing.T, prefix string, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(filepath.Dir(prefix)) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} diff --git a/object/store/packed/internal/ingest/doc.go b/object/store/packed/internal/ingest/doc.go new file mode 100644 index 00000000..67be037b --- /dev/null +++ b/object/store/packed/internal/ingest/doc.go @@ -0,0 +1,11 @@ +// Package ingest writes one incoming pack stream +// into an objects/pack directory +// as a finalized pack, index, and reverse index. +// +// WritePack streams the pack to a temporary file +// while scanning its entries, +// resolves every delta against in-pack bases, +// optionally completes thin packs from an external base reader, +// and publishes the artifacts under content-addressed names +// derived from the pack trailer hash. +package ingest diff --git a/object/store/packed/internal/ingest/errors.go b/object/store/packed/internal/ingest/errors.go new file mode 100644 index 00000000..e268182e --- /dev/null +++ b/object/store/packed/internal/ingest/errors.go @@ -0,0 +1,36 @@ +package ingest + +import ( + "errors" + "fmt" + + "lindenii.org/go/furgit/object/id" +) + +// ErrMalformedPack reports that +// the incoming pack stream is truncated, +// inconsistent, or otherwise unparseable. +var ErrMalformedPack = errors.New("object/store/packed/internal/ingest: malformed pack") + +// ErrThinPackNotPermitted reports that +// the incoming pack is thin, +// referencing bases not contained within it, +// but no external base reader was supplied to complete it. +var ErrThinPackNotPermitted = errors.New("object/store/packed/internal/ingest: thin pack not permitted: no thin base supplied") + +// ThinBasesMissingError reports that +// an incoming thin pack references base objects +// that the supplied thin base reader does not contain, +// so the pack cannot be completed. +type ThinBasesMissingError struct { + // OIDs holds the missing base object IDs, sorted. + OIDs []id.ObjectID +} + +// Error implements error. +func (e *ThinBasesMissingError) Error() string { + return fmt.Sprintf( + "object/store/packed/internal/ingest: thin pack references %d missing base objects", + len(e.OIDs), + ) +} diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go new file mode 100644 index 00000000..afed996c --- /dev/null +++ b/object/store/packed/internal/ingest/finalize.go @@ -0,0 +1,198 @@ +package ingest + +import ( + "errors" + "fmt" + "io" + "io/fs" + "slices" + + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/format/packrev" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// finalize writes the index and reverse index, +// then links the pack, reverse index, and index +// to their content-addressed names. +func (ingestion *ingestion) finalize() (Result, error) { + entries, positions, err := ingestion.indexEntries() + if err != nil { + return Result{}, err + } + + packHash := ingestion.packHash.Bytes() + + idxTmp, err := ingestion.writeTemp("tmp_idx_", func(w io.Writer) error { + return packidx.Write(w, ingestion.objectFormat, entries, packHash) + }) + if err != nil { + return Result{}, err + } + + revTmp, err := ingestion.writeTemp("tmp_rev_", func(w io.Writer) error { + return packrev.Write(w, ingestion.objectFormat, positions, packHash) + }) + if err != nil { + return Result{}, err + } + + bloomBuilder, err := ingestion.buildBloom(entries, packHash) + if err != nil { + return Result{}, err + } + + bloomTmp, err := ingestion.writeTemp("tmp_bloom_", func(w io.Writer) error { + _, err := w.Write(bloomBuilder.Bytes()) + + return err + }) + if err != nil { + return Result{}, err + } + + base := "pack-" + ingestion.packHash.String() + packFinal := base + ".pack" + idxFinal := base + ".idx" + revFinal := base + ".rev" + bloomFinal := base + ".bloom" + + // Link the pack, reverse index, and Bloom filter before the index, + // since the index is what publishes the pack to readers. + err = ingestion.link(ingestion.packTmp, packFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(revTmp, revFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(bloomTmp, bloomFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(idxTmp, idxFinal) + if err != nil { + return Result{}, err + } + + objectCount, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return Result{ + PackName: packFinal, + IdxName: idxFinal, + RevName: revFinal, + BloomName: bloomFinal, + PackHash: ingestion.packHash, + ObjectCount: objectCount, + ThinFixed: ingestion.thinFixed, + }, nil +} + +// buildBloom builds a Bloom filter over the index entries' object IDs, +// bound to packHash. +func (ingestion *ingestion) buildBloom(entries []packidx.Entry, packHash []byte) (*bloom.Builder, error) { + bucketCount, k, err := bloom.RecommendParams(ingestion.objectFormat, len(entries)) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + builder, err := bloom.NewBuilder(ingestion.objectFormat, bucketCount, k, packHash) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + size := ingestion.objectFormat.Size() + for i := range entries { + builder.Add(entries[i].OID[:size]) + } + + return builder, nil +} + +// indexEntries returns the index entries in object-ID order +// and, for each record in pack order, its position in that index order. +func (ingestion *ingestion) indexEntries() ([]packidx.Entry, []uint32, error) { + order := make([]int, len(ingestion.records)) + for i := range order { + order[i] = i + } + + slices.SortFunc(order, func(left, right int) int { + return ingestion.records[left].oid.Compare(ingestion.records[right].oid) + }) + + entries := make([]packidx.Entry, len(order)) + positions := make([]uint32, len(ingestion.records)) + + for indexPosition, recordIndex := range order { + rec := &ingestion.records[recordIndex] + + var oidBytes [id.MaxObjectIDSize]byte + copy(oidBytes[:], rec.oid.RawBytes()) + + offset, err := intconv.IntToUint64(rec.offset) + if err != nil { + return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + entries[indexPosition] = packidx.Entry{ + OID: oidBytes, + Offset: offset, + CRC32: rec.crc32, + } + + position, err := intconv.IntToUint32(indexPosition) + if err != nil { + return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + positions[recordIndex] = position + } + + return entries, positions, nil +} + +// writeTemp creates a temporary file, +// writes it via write, syncs it, and returns its name. +func (ingestion *ingestion) writeTemp(prefix string, write func(io.Writer) error) (string, error) { + name, file, err := ingestion.createTemp(prefix) + if err != nil { + return "", err + } + + defer func() { _ = file.Close() }() + + err = write(file) + if err != nil { + return "", err + } + + err = file.Sync() + if err != nil { + return "", fmt.Errorf("object/store/packed/internal/ingest: syncing %q: %w", name, err) + } + + return name, nil +} + +// link hard-links tmp to final, +// treating an already-present destination as success. +func (ingestion *ingestion) link(tmp, final string) error { + err := ingestion.root.Link(tmp, final) + if err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err) + } + + _ = ingestion.root.Remove(tmp) + + return nil +} diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go new file mode 100644 index 00000000..5422b4af --- /dev/null +++ b/object/store/packed/internal/ingest/ingest.go @@ -0,0 +1,250 @@ +package ingest + +import ( + "bytes" + "crypto/rand" + "errors" + "fmt" + "io" + "io/fs" + "os" + + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names") + +// ingestion holds the state for one WritePack call. +type ingestion struct { + // root is the destination objects/pack directory. + root *os.Root + + // objectFormat is the pack's object format. + objectFormat id.ObjectFormat + + // opts carries the thin base reader and progress sink. + opts store.PackWriteOptions + + // src is the pack stream, positioned just past the header. + src io.Reader + + // packFile is the temporary pack file being written, + // and packTmp is its destination-relative name. + packFile *os.File + packTmp string + + // temps lists every temporary file to remove on failure. + temps []string + + // scanner streams src into packFile while scanning entries. + scanner *scanner + + // records holds one entry per object, in pack-offset order. + records []record + + // byOffset maps an entry offset to its record index, + // and byOID maps a resolved object ID to its record index. + byOffset map[int]int + byOID map[id.ObjectID]int + + // headerCount is the object count declared by the pack header. + headerCount int + + // deltaCount counts delta records, accumulated during scanning. + deltaCount int + + // deltasResolved counts resolved delta records, for progress. + deltasResolved int + + // packHash is the final pack trailer hash. + packHash id.ObjectID + + // thinFixed reports whether thin completion appended local bases. + thinFixed bool + + // committed suppresses temporary file removal once artifacts are published. + committed bool +} + +// WritePack ingests one pack stream into root, +// publishing a pack, index, and reverse index +// under content-addressed names derived from the pack trailer hash. +// +// WritePack consumes the pack stream through its trailer and stops there. +// It does not require src to reach EOF afterward, +// so it is safe on a still-open transport connection, +// such as receive-pack, +// whose peer keeps the connection open to read the response. +// +// The pack must be the last thing the peer sends before that response: +// any bytes arriving immediately after the trailer +// are rejected as a malformed pack. +func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) { + if objectFormat.Size() == 0 { + return Result{}, id.ErrInvalidObjectFormat + } + + headerRaw, count, err := readPackHeader(src) + if err != nil { + return Result{}, err + } + + ingestion := &ingestion{ + root: root, + objectFormat: objectFormat, + opts: opts, + src: src, + packFile: nil, + packTmp: "", + temps: nil, + scanner: nil, + records: nil, + byOffset: make(map[int]int), + byOID: make(map[id.ObjectID]int), + headerCount: count, + deltaCount: 0, + deltasResolved: 0, + packHash: id.ObjectID{}, + thinFixed: false, + committed: false, + } + + defer ingestion.cleanup() + + if count == 0 { + return ingestion.finishEmpty(headerRaw) + } + + err = ingestion.openPackTemp(headerRaw) + if err != nil { + return Result{}, err + } + + err = ingestion.streamAndScan() + if err != nil { + return Result{}, err + } + + err = ingestion.resolveDeltas() + if err != nil { + return Result{}, err + } + + err = ingestion.packFile.Sync() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: syncing pack: %w", err) + } + + result, err := ingestion.finalize() + if err != nil { + return Result{}, err + } + + ingestion.committed = true + + return result, nil +} + +// finishEmpty verifies the trailer of a zero-object pack +// and returns success without writing any artifacts. +func (ingestion *ingestion) finishEmpty(headerRaw [packfile.HeaderLen]byte) (Result, error) { + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + trailer := make([]byte, ingestion.objectFormat.Size()) + + _, err = io.ReadFull(ingestion.src, trailer) + if err != nil { + return Result{}, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if !bytes.Equal(hashImpl.Sum(nil), trailer) { + return Result{}, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return Result{ + PackName: "", + IdxName: "", + RevName: "", + PackHash: packHash, + ObjectCount: 0, + ThinFixed: false, + }, nil +} + +// openPackTemp creates the temporary pack file, +// writes the validated header to it, +// and builds the stream scanner seeded with that header. +func (ingestion *ingestion) openPackTemp(headerRaw [packfile.HeaderLen]byte) error { + name, file, err := ingestion.createTemp("tmp_pack_") + if err != nil { + return err + } + + ingestion.packTmp = name + ingestion.packFile = file + + _, err = file.Write(headerRaw[:]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack header: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + ingestion.scanner = newScanner(ingestion.src, ingestion.packFile, hashImpl) + + return nil +} + +// createTemp creates one temporary file under root, +// recording its name for cleanup on failure. +func (ingestion *ingestion) createTemp(prefix string) (string, *os.File, error) { + for range 32 { + name := prefix + rand.Text() + + file, err := ingestion.root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644) + if err == nil { + ingestion.temps = append(ingestion.temps, name) + + return name, file, nil + } + + if !errors.Is(err, fs.ErrExist) { + return "", nil, fmt.Errorf("object/store/packed/internal/ingest: creating temp file: %w", err) + } + } + + return "", nil, errTempNamesExhausted +} + +// cleanup closes the pack file +// and removes temporary files unless the write committed. +func (ingestion *ingestion) cleanup() { + if ingestion.packFile != nil { + _ = ingestion.packFile.Close() + } + + if ingestion.committed { + return + } + + for _, name := range ingestion.temps { + _ = ingestion.root.Remove(name) + } +} diff --git a/object/store/packed/internal/ingest/record.go b/object/store/packed/internal/ingest/record.go new file mode 100644 index 00000000..69101293 --- /dev/null +++ b/object/store/packed/internal/ingest/record.go @@ -0,0 +1,55 @@ +package ingest + +import ( + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" +) + +// record is the scanned metadata for one packed entry, +// completed in place as deltas are resolved. +// +// Records are appended in pack-offset order, +// so a record's index in the slice is also its pack order. +type record struct { + // offset is the entry's start offset in the pack. + offset int + + // headerLen is the entry header length in bytes, + // so the zlib payload begins at offset+headerLen. + headerLen int + + // packedLen is the total on-disk entry length in bytes, + // covering the header and the compressed payload. + packedLen int + + // crc32 is the CRC32 of the entry's packed bytes. + crc32 uint32 + + // packedType is the entry type as encoded in the pack. + packedType packfile.EntryType + + // declaredSize is the declared inflated payload size. + declaredSize int + + // baseOffset is the base entry offset for an ofs-delta. + baseOffset int + + // baseOID is the base object ID for a ref-delta. + baseOID id.ObjectID + + // objectType is the resolved object type, + // meaningful once resolved is true. + objectType packfile.EntryType + + // oid is the resolved object ID, + // meaningful once resolved is true. + oid id.ObjectID + + // resolved reports whether oid and objectType are final. + resolved bool +} + +// dataOffset returns the entry's compressed payload start offset. +func (record *record) dataOffset() int { + return record.offset + record.headerLen +} diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go new file mode 100644 index 00000000..77b0fa0f --- /dev/null +++ b/object/store/packed/internal/ingest/resolve.go @@ -0,0 +1,294 @@ +package ingest + +import ( + "fmt" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packfile/delta" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" +) + +// adjacency maps each resolvable base to its delta children: +// ofs-deltas keyed by base offset, ref-deltas keyed by base object ID. +type adjacency struct { + byOffset map[int][]int + byOID map[id.ObjectID][]int +} + +// resolveDeltas resolves every delta record into a final object ID and type, +// completing thin packs from the external base reader when required. +func (ingestion *ingestion) resolveDeltas() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "resolving deltas", + Total: ingestion.countDeltas(), + Delay: 0, + Sparse: false, + Throughput: false, + }) + + adjacency := ingestion.buildAdjacency() + + err := ingestion.resolveFrom(ingestion.resolvedRoots(), adjacency, meter) + if err != nil { + return err + } + + external := ingestion.unresolvedExternalBases() + + switch { + case len(external) == 0 && ingestion.countUnresolved() > 0: + return fmt.Errorf("%w: unresolvable delta entries", ErrMalformedPack) + case len(external) > 0: + err = ingestion.fixThin(external, adjacency, meter) + if err != nil { + return err + } + } + + meter.Stop("done") + + return nil +} + +// buildAdjacency indexes every delta record by its base, +// so a resolved base can find the children that delta against it. +func (ingestion *ingestion) buildAdjacency() adjacency { + out := adjacency{ + byOffset: make(map[int][]int), + byOID: make(map[id.ObjectID][]int), + } + + for index := range ingestion.records { + rec := &ingestion.records[index] + + switch rec.packedType { + case packfile.EntryTypeOfsDelta: + out.byOffset[rec.baseOffset] = append(out.byOffset[rec.baseOffset], index) + case packfile.EntryTypeRefDelta: + out.byOID[rec.baseOID] = append(out.byOID[rec.baseOID], index) + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } + } + + return out +} + +// resolveFrom resolves the delta subtree rooted at each resolved record. +func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error { + for _, root := range roots { + content, err := ingestion.inflateRecord(root) + if err != nil { + return err + } + + err = ingestion.resolveSubtree(root, content, ingestion.records[root].objectType, 0, adjacency, meter) + if err != nil { + return err + } + } + + return nil +} + +// resolveSubtree resolves every delta child of one resolved record at depth, +// holding the record's content as the base for its children. +func (ingestion *ingestion) resolveSubtree( + index int, + content []byte, + objectType packfile.EntryType, + depth int, + adjacency adjacency, + meter *progress.Meter, +) error { + rec := &ingestion.records[index] + + for _, child := range adjacency.byOffset[rec.offset] { + err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) + if err != nil { + return err + } + } + + for _, child := range adjacency.byOID[rec.oid] { + err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) + if err != nil { + return err + } + } + + return nil +} + +// resolveChild applies one delta record at depth against its base content, +// finalizes the record, and recurses into its own children. +func (ingestion *ingestion) resolveChild( + index int, + baseContent []byte, + baseType packfile.EntryType, + depth int, + adjacency adjacency, + meter *progress.Meter, +) error { + rec := &ingestion.records[index] + if rec.resolved { + return nil + } + + if depth > delta.MaxChainDepth { + return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset) + } + + deltaPayload, err := ingestion.inflateRecord(index) + if err != nil { + return err + } + + baseSize, resultSize, _, err := delta.ParseHeaderSizes(deltaPayload) + if err != nil { + return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + if baseSize != uint64(len(baseContent)) { + return fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset) + } + + content, err := delta.Apply(baseContent, deltaPayload) + if err != nil { + return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + if uint64(len(content)) != resultSize { + return fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset) + } + + oid, err := ingestion.hashObject(baseType, content) + if err != nil { + return err + } + + rec.objectType = baseType + rec.oid = oid + rec.resolved = true + ingestion.byOID[oid] = index + + ingestion.deltasResolved++ + meter.Set(ingestion.deltasResolved, 0) + + return ingestion.resolveSubtree(index, content, baseType, depth, adjacency, meter) +} + +// inflateRecord inflates one record's payload from the temporary pack file. +func (ingestion *ingestion) inflateRecord(index int) ([]byte, error) { + rec := &ingestion.records[index] + + offset := int64(rec.dataOffset()) + compressedLen := int64(rec.packedLen - rec.headerLen) + size := rec.declaredSize + + zr, err := zlib.NewReader(io.NewSectionReader(ingestion.packFile, offset, compressedLen)) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + out := make([]byte, size) + + _, err = io.ReadFull(zr, out) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + return out, nil +} + +// hashObject computes the object ID of one resolved object. +func (ingestion *ingestion) hashObject(objectType packfile.EntryType, content []byte) (id.ObjectID, error) { + var zero id.ObjectID + + ty, err := objectType.ObjectType() + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, ty, len(content))) + _, _ = hashImpl.Write(content) + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return oid, nil +} + +// resolvedRoots returns the indices of every currently resolved record. +func (ingestion *ingestion) resolvedRoots() []int { + var roots []int + + for index := range ingestion.records { + if ingestion.records[index].resolved { + roots = append(roots, index) + } + } + + return roots +} + +// countDeltas returns the number of delta records. +func (ingestion *ingestion) countDeltas() int { + return ingestion.deltaCount +} + +// countUnresolved returns the number of records that remain unresolved. +// +// Every base is resolved during scanning or thin completion, +// so the unresolved records are exactly the unresolved deltas: +// the delta records minus those already resolved. +func (ingestion *ingestion) countUnresolved() int { + return ingestion.deltaCount - ingestion.deltasResolved +} + +// unresolvedExternalBases returns the unique base object IDs +// of unresolved ref-deltas whose base is not present in the pack, +// in first-reference order. +func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID { + seen := make(map[id.ObjectID]struct{}) + + out := make([]id.ObjectID, 0, ingestion.deltaCount-ingestion.deltasResolved) + + for index := range ingestion.records { + rec := &ingestion.records[index] + if rec.resolved || rec.packedType != packfile.EntryTypeRefDelta { + continue + } + + if _, ok := ingestion.byOID[rec.baseOID]; ok { + continue + } + + if _, ok := seen[rec.baseOID]; ok { + continue + } + + seen[rec.baseOID] = struct{}{} + out = append(out, rec.baseOID) + } + + return out +} diff --git a/object/store/packed/internal/ingest/result.go b/object/store/packed/internal/ingest/result.go new file mode 100644 index 00000000..9cd6ef1d --- /dev/null +++ b/object/store/packed/internal/ingest/result.go @@ -0,0 +1,29 @@ +package ingest + +import "lindenii.org/go/furgit/object/id" + +// Result describes one finalized pack write. +type Result struct { + // PackName is the destination-relative name of the written pack. + PackName string + + // IdxName is the destination-relative name of the written index. + IdxName string + + // RevName is the destination-relative name of the written reverse index. + RevName string + + // BloomName is the destination-relative name of the written Bloom filter. + BloomName string + + // PackHash is the pack trailer hash + // shared by the pack, index, and reverse index. + PackHash id.ObjectID + + // ObjectCount is the number of objects in the finalized pack, + // including any bases appended during thin completion. + ObjectCount uint32 + + // ThinFixed reports whether thin completion appended local bases. + ThinFixed bool +} diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go new file mode 100644 index 00000000..6b3b73b7 --- /dev/null +++ b/object/store/packed/internal/ingest/scan.go @@ -0,0 +1,463 @@ +package ingest + +import ( + "bytes" + "errors" + "fmt" + "hash" + "hash/crc32" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// scanBufferSize is the stream scanner's fixed input window size. +const scanBufferSize = 64 << 10 + +// scanner reads one pack stream, +// mirroring consumed bytes into the destination pack file +// while maintaining the running pack hash and a per-entry CRC. +// +// It implements [io.Reader] and [io.ByteReader] +// so a zlib reader can consume an entry payload through it +// without reading past the end of that compressed stream. +type scanner struct { + src io.Reader + dst io.Writer + + // buf[off:n] is the unread window. + buf []byte + off int + n int + + // consumed counts stream bytes consumed so far. + consumed int + + // hash accumulates the pack hash over consumed bytes + // while hashing is true. + hash hash.Hash + hashing bool + + // crc accumulates the CRC of the current entry + // while crcing is true. + crc uint32 + crcing bool +} + +// newScanner constructs one scanner mirroring src into dst, +// seeding the running hash from the already-consumed pack header. +func newScanner(src io.Reader, dst io.Writer, packHash hash.Hash) *scanner { + return &scanner{ + src: src, + dst: dst, + buf: make([]byte, scanBufferSize), + consumed: packfile.HeaderLen, + hash: packHash, + hashing: true, + crc: 0, + crcing: false, + } +} + +// readPackHeader reads and validates the pack header from src, +// returning the raw header and its declared object count. +func readPackHeader(src io.Reader) ([packfile.HeaderLen]byte, int, error) { + var raw [packfile.HeaderLen]byte + + _, err := io.ReadFull(src, raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: reading header: %w", ErrMalformedPack, err) + } + + packHeader, err := packfile.ParseHeader(raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: %w", ErrMalformedPack, err) + } + + count, err := intconv.Uint32ToInt(packHeader.ObjectCount) + if err != nil { + return raw, 0, fmt.Errorf("%w: object count: %w", ErrMalformedPack, err) + } + + return raw, count, nil +} + +// Read implements [io.Reader]. +func (scanner *scanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + read := min(len(dst), scanner.n-scanner.off) + + copy(dst, scanner.buf[scanner.off:scanner.off+read]) + + err = scanner.use(read) + if err != nil { + return 0, err + } + + return read, nil +} + +// ReadByte implements [io.ByteReader] without allocation. +func (scanner *scanner) ReadByte() (byte, error) { + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + b := scanner.buf[scanner.off] + + err = scanner.use(1) + if err != nil { + return 0, err + } + + return b, nil +} + +// ensureAvailable makes at least one unread byte available, +// returning [io.EOF] once the source is exhausted. +func (scanner *scanner) ensureAvailable() error { + for scanner.n-scanner.off == 0 { + err := scanner.flushPrefix() + if err != nil { + return err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + if scanner.n-scanner.off == 0 { + return io.EOF + } + + return nil + } + + return fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 && scanner.n-scanner.off == 0 { + return io.ErrNoProgress + } + } + + return nil +} + +// peekHeader returns the unread window grown to at most maxLen bytes +// without consuming, tolerating an early end of stream. +func (scanner *scanner) peekHeader(maxLen int) ([]byte, error) { + maxLen = min(maxLen, len(scanner.buf)) + + for scanner.n-scanner.off < maxLen { + err := scanner.flushPrefix() + if err != nil { + return nil, err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 { + break + } + } + + if scanner.n-scanner.off == 0 { + return nil, fmt.Errorf("%w: unexpected end of stream", ErrMalformedPack) + } + + return scanner.buf[scanner.off:scanner.n], nil +} + +// use consumes n unread bytes, +// folding them into the running hash and entry CRC as enabled. +func (scanner *scanner) use(n int) error { + chunk := scanner.buf[scanner.off : scanner.off+n] + + if scanner.hashing { + _, err := scanner.hash.Write(chunk) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: hashing pack: %w", err) + } + } + + if scanner.crcing { + scanner.crc = crc32.Update(scanner.crc, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += n + + return nil +} + +// flushPrefix writes the consumed buffer prefix to the destination +// and compacts the unread window to the start of the buffer. +func (scanner *scanner) flushPrefix() error { + if scanner.off == 0 { + return nil + } + + _, err := scanner.dst.Write(scanner.buf[:scanner.off]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack: %w", err) + } + + unread := scanner.n - scanner.off + + copy(scanner.buf, scanner.buf[scanner.off:scanner.n]) + + scanner.off = 0 + scanner.n = unread + + return nil +} + +// beginCRC starts CRC accumulation for one entry. +func (scanner *scanner) beginCRC() { + scanner.crc = 0 + scanner.crcing = true +} + +// endCRC ends CRC accumulation and returns the entry CRC. +func (scanner *scanner) endCRC() uint32 { + crc := scanner.crc + scanner.crc = 0 + scanner.crcing = false + + return crc +} + +// finishTrailer reads and verifies the pack trailer hash, +// flushing the remaining buffered pack bytes to the destination. +// +// The trailer is mirrored to the destination but excluded from the pack hash. +func (scanner *scanner) finishTrailer(hashSize int) ([]byte, error) { + trailer := make([]byte, hashSize) + + scanner.hashing = false + + _, err := io.ReadFull(scanner, trailer) + if err != nil { + return nil, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if scanner.n-scanner.off > 0 { + return nil, fmt.Errorf("%w: trailing data after pack", ErrMalformedPack) + } + + if !bytes.Equal(scanner.hash.Sum(nil), trailer) { + return nil, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + err = scanner.flushPrefix() + if err != nil { + return nil, err + } + + return trailer, nil +} + +// streamAndScan streams the pack body to the temporary pack file, +// scanning one record per declared object and verifying the trailer. +func (ingestion *ingestion) streamAndScan() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "receiving objects", + Total: ingestion.headerCount, + Delay: 0, + Sparse: false, + Throughput: true, + }) + + for done := range ingestion.headerCount { + err := ingestion.scanEntry(ingestion.scanner.consumed) + if err != nil { + return err + } + + meter.Set(done+1, ingestion.scanner.consumed) + } + + meter.Stop("done") + + trailer, err := ingestion.scanner.finishTrailer(ingestion.objectFormat.Size()) + if err != nil { + return err + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// scanEntry scans the entry beginning at start into one record. +func (ingestion *ingestion) scanEntry(start int) error { + ingestion.scanner.beginCRC() + + rec, err := ingestion.scanHeader(start) + if err != nil { + return err + } + + inflated, oid, err := ingestion.drainPayload(&rec) + if err != nil { + return err + } + + if inflated != int64(rec.declaredSize) { + return fmt.Errorf( + "%w: entry at %d: inflated size %d differs from declared %d", + ErrMalformedPack, start, inflated, rec.declaredSize, + ) + } + + rec.packedLen = ingestion.scanner.consumed - start + rec.crc32 = ingestion.scanner.endCRC() + + if rec.packedType.IsBase() { + rec.objectType = rec.packedType + rec.oid = oid + rec.resolved = true + } else { + ingestion.deltaCount++ + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[rec.offset] = index + + if rec.resolved { + ingestion.byOID[rec.oid] = index + } + + return nil +} + +// scanHeader parses and consumes the entry header at start. +func (ingestion *ingestion) scanHeader(start int) (record, error) { + var rec record + + rec.offset = start + + window, err := ingestion.scanner.peekHeader(packfile.MaxEntryHeaderLen(ingestion.objectFormat.Size())) + if err != nil { + return rec, err + } + + entryHeader, err := packfile.ParseEntryHeader(window, ingestion.objectFormat.Size()) + if err != nil { + return rec, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, start, err) + } + + declaredSize, err := intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return rec, fmt.Errorf("%w: entry at %d: declared size overflows int: %w", ErrMalformedPack, start, err) + } + + rec.packedType = entryHeader.Type + rec.declaredSize = declaredSize + rec.headerLen = entryHeader.HeaderLen + + switch entryHeader.Type { + case packfile.EntryTypeCommit, packfile.EntryTypeTree, packfile.EntryTypeBlob, packfile.EntryTypeTag: + case packfile.EntryTypeOfsDelta: + dist, err := intconv.Uint64ToInt(entryHeader.OfsDistance) + if err != nil || dist == 0 || dist > start { + return rec, fmt.Errorf("%w: entry at %d: ofs-delta base out of bounds", ErrMalformedPack, start) + } + + rec.baseOffset = start - dist + case packfile.EntryTypeRefDelta: + baseID, err := ingestion.objectFormat.FromBytes(entryHeader.RefBase[:ingestion.objectFormat.Size()]) + if err != nil { + return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + rec.baseOID = baseID + case packfile.EntryTypeInvalid, packfile.EntryTypeFuture: + return rec, fmt.Errorf("%w: entry at %d: unsupported entry type", ErrMalformedPack, start) + } + + err = ingestion.scanner.use(entryHeader.HeaderLen) + if err != nil { + return rec, err + } + + return rec, nil +} + +// drainPayload consumes one entry's compressed payload from the stream, +// returning its inflated length and, for base entries, its object ID. +func (ingestion *ingestion) drainPayload(rec *record) (int64, id.ObjectID, error) { + var zero id.ObjectID + + zr, err := zlib.NewReader(ingestion.scanner) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + if !rec.packedType.IsBase() { + read, err := io.Copy(io.Discard, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + return read, zero, nil + } + + objectType, err := rec.packedType.ObjectType() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, objectType, rec.declaredSize)) + + read, err := io.Copy(hashImpl, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return read, oid, nil +} diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go new file mode 100644 index 00000000..fa125f2f --- /dev/null +++ b/object/store/packed/internal/ingest/thin.go @@ -0,0 +1,213 @@ +package ingest + +import ( + "errors" + "fmt" + "hash/crc32" + "io" + "os" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" + "lindenii.org/go/lgo/intconv" +) + +// fixThin completes a thin pack +// by appending the external bases it references, +// rewriting the pack header and trailer, +// and resolving the deltas reached from the appended bases. +func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency, meter *progress.Meter) error { + if ingestion.opts.ThinBase == nil { + return ErrThinPackNotPermitted + } + + hashSize := ingestion.objectFormat.Size() + if ingestion.scanner.consumed < packfile.HeaderLen+hashSize { + return fmt.Errorf("%w: pack shorter than trailer", ErrMalformedPack) + } + + // Drop the trailer from the write cursor. + ingestion.scanner.consumed -= hashSize + + appended := make([]int, 0, len(external)) + + for _, baseOID := range external { + ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID) + if errors.Is(err, store.ErrObjectNotFound) { + continue + } + + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: reading thin base %s: %w", baseOID, err) + } + + index, err := ingestion.appendBaseObject(baseOID, ty, content) + if err != nil { + return err + } + + appended = append(appended, index) + } + + err := ingestion.rewriteHeaderTrailer() + if err != nil { + return err + } + + err = ingestion.resolveFrom(appended, adjacency, meter) + if err != nil { + return err + } + + missing := ingestion.unresolvedExternalBases() + if len(missing) > 0 { + return &ThinBasesMissingError{OIDs: missing} + } + + if ingestion.countUnresolved() > 0 { + return fmt.Errorf("%w: unresolvable delta entries after thin completion", ErrMalformedPack) + } + + ingestion.thinFixed = len(appended) > 0 + + return nil +} + +// appendBaseObject appends one external thin base +// as a non-delta pack entry at the current write cursor, +// verifying that its content hashes to the requested object ID. +func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType typ.Type, content []byte) (int, error) { + entryType, err := packfile.EntryTypeFromObjectType(objectType) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + computed, err := ingestion.hashObject(entryType, content) + if err != nil { + return 0, err + } + + if computed != objectID { + return 0, fmt.Errorf("%w: thin base %s content hashes to %s", ErrMalformedPack, objectID, computed) + } + + start := ingestion.scanner.consumed + startOffset := int64(start) + + headerBytes := packfile.AppendTypeSize(nil, entryType, uint64(len(content))) + + _, err = ingestion.packFile.WriteAt(headerBytes, startOffset) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: writing thin base header: %w", err) + } + + crc := crc32.NewIEEE() + _, _ = crc.Write(headerBytes) + + dataOffset := startOffset + int64(len(headerBytes)) + writer := &offsetWriter{file: ingestion.packFile, offset: dataOffset} + + zw := zlib.NewWriter(io.MultiWriter(writer, crc)) + + _, err = zw.Write(content) + if err != nil { + _ = zw.Close() + + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + err = zw.Close() + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + headerLen := len(headerBytes) + packedLen := headerLen + writer.written + ingestion.scanner.consumed = start + packedLen + + rec := record{ + offset: start, + headerLen: headerLen, + packedLen: packedLen, + crc32: crc.Sum32(), + packedType: entryType, + declaredSize: len(content), + baseOffset: 0, + baseOID: id.ObjectID{}, + objectType: entryType, + oid: objectID, + resolved: true, + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[start] = index + ingestion.byOID[objectID] = index + + return index, nil +} + +// rewriteHeaderTrailer updates the pack object count +// and recomputes the pack trailer hash +// over the entries left after thin completion. +func (ingestion *ingestion) rewriteHeaderTrailer() error { + count, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = ingestion.packFile.WriteAt(packfile.AppendHeader(nil, count), 0) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rewriting header: %w", err) + } + + bodyEnd := int64(ingestion.scanner.consumed) + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = io.Copy(hashImpl, io.NewSectionReader(ingestion.packFile, 0, bodyEnd)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rehashing pack: %w", err) + } + + sum := hashImpl.Sum(nil) + + _, err = ingestion.packFile.WriteAt(sum, bodyEnd) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing trailer: %w", err) + } + + packHash, err := ingestion.objectFormat.FromBytes(sum) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// offsetWriter writes to a file via WriteAt, +// advancing sequentially from a base offset +// and counting the bytes written. +type offsetWriter struct { + file *os.File + offset int64 + written int +} + +// Write implements [io.Writer]. +func (writer *offsetWriter) Write(p []byte) (int, error) { + n, err := writer.file.WriteAt(p, writer.offset) + writer.offset += int64(n) + writer.written += n + + return n, err //nolint:wrapcheck +} diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go new file mode 100644 index 00000000..adc0ba35 --- /dev/null +++ b/object/store/packed/internal/ingest/writepack_test.go @@ -0,0 +1,494 @@ +package ingest_test + +import ( + "bytes" + "errors" + "io" + "os" + "path/filepath" + "testing" + + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/store/packed/internal/ingest" +) + +// TestWritePackMatchesGit verifies that ingesting a normal pack +// matches git's own pack, index, and reverse index. +// +// The pack is streamed through verbatim, +// and the index and reverse index are regenerated deterministically, +// so a successful match also confirms that scanning and delta resolution +// recovered every object ID, offset, and CRC that git recorded. +func TestWritePackMatchesGit(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if want := filepath.Base(gitPrefix) + ".pack"; result.PackName != want { + t.Fatalf("PackName = %q, want %q", result.PackName, want) + } + + for _, artifact := range []struct { + kind string + ours string + want string + }{ + {"pack", result.PackName, gitPrefix + ".pack"}, + {"idx", result.IdxName, gitPrefix + ".idx"}, + {"rev", result.RevName, gitPrefix + ".rev"}, + } { + ours, err := os.ReadFile(filepath.Join(dir, artifact.ours)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile %s: %v", artifact.kind, err) + } + + want, err := os.ReadFile(artifact.want) + if err != nil { + t.Fatalf("ReadFile git %s: %v", artifact.kind, err) + } + + if !bytes.Equal(ours, want) { + t.Errorf("%s differs from git: %d bytes vs %d", artifact.kind, len(ours), len(want)) + } + } + }) + } +} + +// TestWritePackBloom verifies that ingesting a pack writes a Bloom filter +// that reports every object in the pack as present. +func TestWritePackBloom(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if result.BloomName == "" { + t.Fatal("BloomName is empty") + } + + bloomBytes, err := os.ReadFile(filepath.Join(dir, result.BloomName)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile bloom: %v", err) + } + + filter, err := bloom.Parse(bloomBytes, objectFormat) + if err != nil { + t.Fatalf("bloom.Parse: %v", err) + } + + idxBytes, err := os.ReadFile(filepath.Join(dir, result.IdxName)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile idx: %v", err) + } + + index, err := packidx.Parse(idxBytes, objectFormat.Size()) + if err != nil { + t.Fatalf("packidx.Parse: %v", err) + } + + if !bytes.Equal(filter.PackHash(), index.PackHash()) { + t.Fatalf("filter pack hash %x, want %x", filter.PackHash(), index.PackHash()) + } + + for pos := range index.NumObjects() { + if !filter.MayContain(index.OIDAt(pos)) { + t.Fatalf("filter rejects object at index position %d", pos) + } + } + }) + } +} + +// TestWritePackEmpty verifies that a zero-object pack +// succeeds without writing any artifacts. +func TestWritePackEmpty(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + stream, err := repo.PackObjectsStdout(t, nil, testgit.PackObjectsStdoutOptions{ + Revs: false, + Thin: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if result.ObjectCount != 0 { + t.Fatalf("ObjectCount = %d, want 0", result.ObjectCount) + } + + if result.PackName != "" { + t.Fatalf("PackName = %q, want empty", result.PackName) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("ReadDir: %v", err) + } + + if len(entries) != 0 { + t.Fatalf("empty pack wrote %d files, want 0", len(entries)) + } + }) + } +} + +// TestWritePackIdempotent verifies that ingesting the same pack twice +// into one store succeeds and leaves the artifacts in place. +func TestWritePackIdempotent(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + first, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("first WritePack: %v", err) + } + + second, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("second WritePack: %v", err) + } + + if second.PackName != first.PackName { + t.Fatalf("second PackName = %q, want %q", second.PackName, first.PackName) + } + + for _, name := range []string{first.PackName, first.IdxName, first.RevName} { + _, err := os.Stat(filepath.Join(dir, name)) + if err != nil { + t.Fatalf("missing %q after re-write: %v", name, err) + } + } + }) + } +} + +// writePack ingests src into a fresh store directory, +// returning the directory and the ingest result. +func writePack( + t *testing.T, + objectFormat id.ObjectFormat, + src io.Reader, + opts store.PackWriteOptions, +) (string, ingest.Result) { + t.Helper() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + result, err := ingest.WritePack(root, objectFormat, src, opts) + if err != nil { + t.Fatalf("WritePack: %v", err) + } + + return dir, result +} + +// TestWritePackThin verifies that a thin pack is completed from the thin base +// and that git accepts the resulting self-contained pack. +func TestWritePackThin(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + thinBase := fullStore(t, repo, objectFormat, seeded) + stream := thinStream(t, repo, seeded) + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: thinBase, + Progress: nil, + }) + + if !result.ThinFixed { + t.Fatalf("ThinFixed = false, want true (pack was not thin)") + } + + _, err := repo.VerifyPack(t, filepath.Join(dir, result.IdxName)) + if err != nil { + t.Fatalf("VerifyPack on completed pack: %v", err) + } + }) + } +} + +// TestWritePackThinWithoutBase verifies that a thin pack is rejected +// when no thin base is supplied. +func TestWritePackThinWithoutBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if !errors.Is(err, ingest.ErrThinPackNotPermitted) { + t.Fatalf("err = %v, want ErrThinPackNotPermitted", err) + } + }) + } +} + +// TestWritePackThinMissingBase verifies that a thin pack +// whose bases are absent from the thin base +// reports the missing object IDs. +func TestWritePackThinMissingBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + emptyBase := emptyStore(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: emptyBase, + Progress: nil, + }) + + missing, ok := errors.AsType[*ingest.ThinBasesMissingError](err) + if !ok { + t.Fatalf("err = %v, want *ThinBasesMissingError", err) + } + + if len(missing.OIDs) == 0 { + t.Fatalf("ThinBasesMissingError reported no object IDs") + } + }) + } +} + +// seedHistory creates one repository with a seeded history. +func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) { + t.Helper() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + return repo, seeded +} + +// thinStream produces a thin pack of the tip commit excluding its parent, +// so its deltas reference the omitted parent objects. +func thinStream(t *testing.T, repo *testgit.Repo, seeded testgit.Seeded) []byte { + t.Helper() + + tip := seeded.Commits[len(seeded.Commits)-1] + parent := seeded.Commits[len(seeded.Commits)-2] + + stream, err := repo.PackObjectsStdout(t, []id.ObjectID{tip}, testgit.PackObjectsStdoutOptions{ + Revs: true, + Thin: true, + Exclude: []id.ObjectID{parent}, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + return stream +} + +// fullStore opens a packed store over a pack of every seeded object. +func fullStore(t *testing.T, repo *testgit.Repo, objectFormat id.ObjectFormat, seeded testgit.Seeded) *packed.Packed { + t.Helper() + + prefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: false, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + return openStore(t, filepath.Dir(prefix), objectFormat) +} + +// emptyStore opens a packed store over an empty directory. +func emptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + return openStore(t, t.TempDir(), objectFormat) +} + +// openStore opens a packed store over dir. +func openStore(t *testing.T, dir string, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} + +// freshRoot opens a writable root over a fresh temporary directory. +func freshRoot(t *testing.T) *os.Root { + t.Helper() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + return root +} diff --git a/object/store/packed/lookup.go b/object/store/packed/lookup.go new file mode 100644 index 00000000..e06870a9 --- /dev/null +++ b/object/store/packed/lookup.go @@ -0,0 +1,51 @@ +package packed + +import ( + "fmt" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/lgo/intconv" +) + +// lookup finds the pack containing objectID +// and the entry offset within it, +// probing packs in most-recently-used-ish order. +// +// Labels: Life-Parent. +func (packed *Packed) lookup(objectID id.ObjectID) (*pack, int, error) { + if objectID.ObjectFormat() != packed.objectFormat { + return nil, 0, fmt.Errorf( + "%w: got %s want %s", + id.ErrInvalidObjectFormat, objectID.ObjectFormat(), packed.objectFormat, + ) + } + + oid := objectID.RawBytes() + + for _, p := range packed.order.Keys() { + if p.filter != nil && !p.filter.MayContain(oid) { + continue + } + + offsetU, found, err := p.idx.Lookup(oid) + if err != nil { + return nil, 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + if !found { + continue + } + + offset, err := intconv.Uint64ToInt(offsetU) + if err != nil { + return nil, 0, fmt.Errorf("%w: pack %q: entry offset overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + packed.order.Touch(p) + + return p, offset, nil + } + + return nil, 0, store.ErrObjectNotFound +} diff --git a/object/store/packed/lookup_test.go b/object/store/packed/lookup_test.go new file mode 100644 index 00000000..d9f3dff9 --- /dev/null +++ b/object/store/packed/lookup_test.go @@ -0,0 +1,35 @@ +package packed_test + +import ( + "errors" + "testing" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +func TestLookupMissing(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + _, prefix, seeded := makeGitPack(t, objectFormat) + packedStore := openPackedStore(t, prefix, objectFormat) + + raw := seeded.Blobs[0].Bytes() + raw[len(raw)-1] ^= 0xff + + missing, err := objectFormat.FromBytes(raw) + if err != nil { + t.Fatalf("FromBytes: %v", err) + } + + _, _, err = packedStore.ReadBytesContent(missing) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("ReadBytesContent error = %v, want ErrObjectNotFound", err) + } + }) + } +} diff --git a/object/store/packed/pack.go b/object/store/packed/pack.go new file mode 100644 index 00000000..9cd6162b --- /dev/null +++ b/object/store/packed/pack.go @@ -0,0 +1,170 @@ +package packed + +import ( + "bytes" + "errors" + "fmt" + "os" + + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packidx/bloom" + "lindenii.org/go/furgit/internal/mmap" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +var ( + errPackTruncated = errors.New("truncated") + errPackMalformedHeader = errors.New("malformed header") + errPackCountMismatch = errors.New("object count differs from index") + errPackTrailerMismatch = errors.New("trailer hash differs from index") +) + +// pack is one discovered pack: +// its base name, its parsed index, and its mapped data. +// All fields are immutable after openPack. +type pack struct { + // name is the pack base name, like "pack-<hash>". + name string + + // idxMapping owns the mapped pack index bytes, + // and idx is the parsed index view over them. + idxMapping *mmap.Mmap + idx packidx.Packidx + + // dataMapping owns the mapped pack data bytes, + // and data aliases them. + dataMapping *mmap.Mmap + data []byte + + bloomMapping *mmap.Mmap + filter *bloom.Bloom +} + +// openPack opens, maps, and validates +// one pack index and its pack data +// by pack base name. +func openPack(root *os.Root, name string, objectFormat id.ObjectFormat) (*pack, error) { + idxMapping, err := mapFile(root, name+".idx") + if err != nil { + return nil, err + } + + idx, err := packidx.Parse(idxMapping.Data(), objectFormat.Size()) + if err != nil { + _ = idxMapping.Close() + + return nil, fmt.Errorf("%w: index %q: %w", ErrMalformedPackedStore, name, err) + } + + dataMapping, err := mapFile(root, name+".pack") + if err != nil { + _ = idxMapping.Close() + + return nil, err + } + + err = validatePackData(dataMapping.Data(), &idx, objectFormat.Size()) + if err != nil { + _ = idxMapping.Close() + _ = dataMapping.Close() + + return nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, name, err) + } + + bloomMapping, filter := openBloom(root, name, objectFormat, idx.PackHash()) + + return &pack{ + name: name, + idxMapping: idxMapping, + idx: idx, + dataMapping: dataMapping, + data: dataMapping.Data(), + bloomMapping: bloomMapping, + filter: filter, + }, nil +} + +func openBloom(root *os.Root, name string, objectFormat id.ObjectFormat, packHash []byte) (*mmap.Mmap, *bloom.Bloom) { + mapping, err := mapFile(root, name+".bloom") + if err != nil { + return nil, nil + } + + filter, err := bloom.Parse(mapping.Data(), objectFormat) + if err != nil { + _ = mapping.Close() + + return nil, nil + } + + if !bytes.Equal(filter.PackHash(), packHash) { + _ = mapping.Close() + + return nil, nil + } + + return mapping, &filter +} + +// mapFile opens and maps one file under root. +func mapFile(root *os.Root, name string) (*mmap.Mmap, error) { + file, err := root.Open(name) + if err != nil { + return nil, fmt.Errorf("object/store/packed: %w", err) + } + + defer func() { _ = file.Close() }() + + mapping, err := mmap.Open(file) + if err != nil { + return nil, fmt.Errorf("object/store/packed: %q: %w", name, err) + } + + return mapping, nil +} + +// validatePackData checks one mapped pack +// against the pack format and its index. +func validatePackData(data []byte, idx *packidx.Packidx, hashSize int) error { + if len(data) < packfile.HeaderLen+hashSize { + return errPackTruncated + } + + header, err := packfile.ParseHeader(data) + if err != nil { + return fmt.Errorf("%w: %w", errPackMalformedHeader, err) + } + + count := uint64(header.ObjectCount) + + numObjects, err := intconv.IntToUint64(idx.NumObjects()) + if err != nil { + return fmt.Errorf("object count: %w", err) + } + + if count != numObjects { + return errPackCountMismatch + } + + if !bytes.Equal(data[len(data)-hashSize:], idx.PackHash()) { + return errPackTrailerMismatch + } + + return nil +} + +// close releases the pack data, index, and filter mappings. +func (pack *pack) close() error { + errs := []error{ + pack.dataMapping.Close(), + pack.idxMapping.Close(), + } + + if pack.bloomMapping != nil { + errs = append(errs, pack.bloomMapping.Close()) + } + + return errors.Join(errs...) +} diff --git a/object/store/packed/packed.go b/object/store/packed/packed.go new file mode 100644 index 00000000..897b3b98 --- /dev/null +++ b/object/store/packed/packed.go @@ -0,0 +1,101 @@ +package packed + +import ( + "errors" + "os" + "sync" + + "lindenii.org/go/furgit/internal/cache/clock" + "lindenii.org/go/furgit/internal/mru" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +// ErrMalformedPackedStore reports that +// a pack or pack index in the store is +// truncated, inconsistent, or otherwise corrupt. +var ErrMalformedPackedStore = errors.New("object/store/packed: malformed packed store") + +// Packed reads Git objects from pack/index files +// under an objects/pack root. +// +// Packs appearing after construction are only visible +// after an explicit [Packed.Refresh]. +// +// Labels: Close-Caller. +type Packed struct { + // root is the objects/pack directory + // used for all pack and index file access. + root *os.Root + + // objectFormat is the expected object format for lookups. + objectFormat id.ObjectFormat + + // order contains the packs to probe, MRU-first. + order *mru.Order[*pack] + + // baseCache caches delta bases consumed during resolution. + baseCache *clock.Clock[baseKey, cachedBase] + + // refreshMu serializes Refresh. + // Readers uses none of these. + refreshMu sync.Mutex + + // byName supports reusing surviving packs across Refresh, + // and retired holds dropped packs until Close, + // since concurrent readers may still use them. + byName map[string]*pack + + retired []*pack +} + +var _ store.ObjectReader = (*Packed)(nil) + +// New creates a packed-object store rooted at an objects/pack directory, +// performing an initial Refresh. +// +// Labels: Deps-Borrowed, Life-Parent. +func New(root *os.Root, objectFormat id.ObjectFormat) (*Packed, error) { + if objectFormat.Size() == 0 { + return nil, id.ErrInvalidObjectFormat + } + + packed := &Packed{ + root: root, + objectFormat: objectFormat, + order: mru.New[*pack](mru.Options{Interval: 48}), + baseCache: newBaseCache(), + refreshMu: sync.Mutex{}, + byName: nil, + retired: nil, + } + + err := packed.Refresh() + if err != nil { + return nil, err + } + + return packed, nil +} + +// Close releases mapped pack/index resources associated with the store. +// +// Labels: MT-Unsafe. +func (packed *Packed) Close() error { + errs := make([]error, 0, len(packed.byName)+len(packed.retired)) + + for _, p := range packed.byName { + errs = append(errs, p.close()) + } + + for _, p := range packed.retired { + errs = append(errs, p.close()) + } + + packed.byName = nil + packed.retired = nil + + packed.baseCache.Clear() + + return errors.Join(errs...) +} diff --git a/object/store/packed/quarantine.go b/object/store/packed/quarantine.go new file mode 100644 index 00000000..977a9543 --- /dev/null +++ b/object/store/packed/quarantine.go @@ -0,0 +1,166 @@ +package packed + +import ( + "crypto/rand" + "errors" + "fmt" + "io/fs" + "os" + "slices" + "strings" + + "lindenii.org/go/furgit/object/store" +) + +var ( + _ store.PackQuarantiner = (*Packed)(nil) + _ store.PackQuarantine = (*packQuarantine)(nil) +) + +var errQuarantineNamesExhausted = errors.New("object/store/packed: exhausted quarantine directory names") + +// packQuarantine is one quarantined packed store +// rooted privately beneath a destination pack root. +type packQuarantine struct { + *Packed + + parent *Packed + + tempName string + tempRoot *os.Root +} + +// BeginPackQuarantine creates one quarantined packed store +// rooted privately beneath the destination pack root. +// +// Labels: Deps-Borrowed, Life-Parent. +func (packed *Packed) BeginPackQuarantine(_ store.PackQuarantineOptions) (store.PackQuarantine, error) { //nolint:ireturn + tempName, tempRoot, err := createPackQuarantineRoot(packed.root) + if err != nil { + return nil, err + } + + quarantineStore, err := New(tempRoot, packed.objectFormat) + if err != nil { + _ = tempRoot.Close() + _ = packed.root.RemoveAll(tempName) + + return nil, err + } + + return &packQuarantine{ + Packed: quarantineStore, + parent: packed, + tempName: tempName, + tempRoot: tempRoot, + }, nil +} + +// Promote publishes the quarantined pack artifacts into the parent store, +// refreshes the parent so the objects become available, +// and invalidates the receiver. +func (quarantine *packQuarantine) Promote() error { + closeErr := quarantine.Close() + promoteErr := quarantine.promoteAll() + + var refreshErr error + if promoteErr == nil { + refreshErr = quarantine.parent.Refresh() + } + + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + return errors.Join(closeErr, promoteErr, refreshErr, tempRootErr, removeErr) +} + +// Discard removes the quarantine and invalidates the receiver. +func (quarantine *packQuarantine) Discard() error { + closeErr := quarantine.Close() + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + return errors.Join(closeErr, tempRootErr, removeErr) +} + +// promoteAll links every pack artifact in the quarantine into the parent store, +// in pack/rev/idx dependency order. +func (quarantine *packQuarantine) promoteAll() error { + entries, err := fs.ReadDir(quarantine.tempRoot.FS(), ".") + if err != nil { + return fmt.Errorf("object/store/packed: %w", err) + } + + slices.SortFunc(entries, func(left, right fs.DirEntry) int { + return packPromotionPriority(left.Name()) - packPromotionPriority(right.Name()) + }) + + for _, entry := range entries { + err := quarantine.promoteFile(entry.Name()) + if err != nil { + return err + } + } + + return nil +} + +// promoteFile links one quarantined artifact into the parent store, +// treating an already-present destination as success. +func (quarantine *packQuarantine) promoteFile(name string) error { + src := quarantine.tempName + "/" + name + + err := quarantine.parent.root.Link(src, name) + if err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("object/store/packed: promoting %q: %w", name, err) + } + + _ = quarantine.parent.root.Remove(src) + + return nil +} + +// createPackQuarantineRoot creates a private quarantine directory beneath parent +// and returns its name and an os.Root over it. +func createPackQuarantineRoot(parent *os.Root) (string, *os.Root, error) { + for range 32 { + name := "tmp_packq_" + rand.Text() + + err := parent.Mkdir(name, 0o700) + if err != nil { + if errors.Is(err, fs.ErrExist) { + continue + } + + return "", nil, fmt.Errorf("object/store/packed: %w", err) + } + + root, err := parent.OpenRoot(name) + if err != nil { + _ = parent.RemoveAll(name) + + return "", nil, fmt.Errorf("object/store/packed: %w", err) + } + + return name, root, nil + } + + return "", nil, errQuarantineNamesExhausted +} + +// packPromotionPriority orders pack artifacts +// so that data files are linked before the index that publishes them. +func packPromotionPriority(name string) int { + switch { + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".pack"): + return 1 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".rev"): + return 2 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".bloom"): + return 2 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".idx"): + return 3 + default: + return 0 + } +} diff --git a/object/store/packed/read_test.go b/object/store/packed/read_test.go new file mode 100644 index 00000000..64faaf5b --- /dev/null +++ b/object/store/packed/read_test.go @@ -0,0 +1,140 @@ +package packed_test + +import ( + "bytes" + "io" + "testing" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/typ" +) + +func TestReadGitPack(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, prefix, seeded := makeGitPack(t, objectFormat) + requireDeltas(t, repo, prefix, objectFormat) + + packedStore := openPackedStore(t, prefix, objectFormat) + + groups := []struct { + ty typ.Type + oids []id.ObjectID + }{ + {ty: typ.Blob, oids: seeded.Blobs}, + {ty: typ.Tree, oids: seeded.Trees}, + {ty: typ.Commit, oids: seeded.Commits}, + {ty: typ.Tag, oids: seeded.Tags}, + } + + for _, group := range groups { + for _, oid := range group.oids { + wantContent, err := repo.CatFile(t, group.ty, oid) + if err != nil { + t.Fatalf("CatFile(%s): %v", oid, err) + } + + ty, content, err := packedStore.ReadBytesContent(oid) + if err != nil { + t.Fatalf("ReadBytesContent(%s): %v", oid, err) + } + + if ty != group.ty { + t.Fatalf("ReadBytesContent(%s) type = %v, want %v", oid, ty, group.ty) + } + + if !bytes.Equal(content, wantContent) { + t.Fatalf("ReadBytesContent(%s) content mismatch", oid) + } + + raw, err := packedStore.ReadBytesFull(oid) + if err != nil { + t.Fatalf("ReadBytesFull(%s): %v", oid, err) + } + + if got := objectFormat.Sum(raw); got != oid { + t.Fatalf("ReadBytesFull(%s) hashes to %s", oid, got) + } + + ty, size, err := packedStore.ReadHeader(oid) + if err != nil { + t.Fatalf("ReadHeader(%s): %v", oid, err) + } + + if ty != group.ty { + t.Fatalf("ReadHeader(%s) type = %v, want %v", oid, ty, group.ty) + } + + if size != len(wantContent) { + t.Fatalf("ReadHeader(%s) size = %d, want %d", oid, size, len(wantContent)) + } + + size, err = packedStore.ReadSize(oid) + if err != nil { + t.Fatalf("ReadSize(%s): %v", oid, err) + } + + if size != len(wantContent) { + t.Fatalf("ReadSize(%s) = %d, want %d", oid, size, len(wantContent)) + } + + checkReaderContent(t, packedStore, oid, group.ty, wantContent) + checkReaderFull(t, packedStore, oid, objectFormat) + } + } + }) + } +} + +func checkReaderContent(t *testing.T, packedStore *packed.Packed, oid id.ObjectID, wantType typ.Type, wantContent []byte) { + t.Helper() + + ty, size, reader, err := packedStore.ReadReaderContent(oid) + if err != nil { + t.Fatalf("ReadReaderContent(%s): %v", oid, err) + } + + defer func() { _ = reader.Close() }() + + if ty != wantType { + t.Fatalf("ReadReaderContent(%s) type = %v, want %v", oid, ty, wantType) + } + + if size != len(wantContent) { + t.Fatalf("ReadReaderContent(%s) size = %d, want %d", oid, size, len(wantContent)) + } + + content, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("ReadReaderContent(%s) read: %v", oid, err) + } + + if !bytes.Equal(content, wantContent) { + t.Fatalf("ReadReaderContent(%s) content mismatch", oid) + } +} + +func checkReaderFull(t *testing.T, packedStore *packed.Packed, oid id.ObjectID, objectFormat id.ObjectFormat) { + t.Helper() + + reader, err := packedStore.ReadReaderFull(oid) + if err != nil { + t.Fatalf("ReadReaderFull(%s): %v", oid, err) + } + + defer func() { _ = reader.Close() }() + + raw, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("ReadReaderFull(%s) read: %v", oid, err) + } + + if got := objectFormat.Sum(raw); got != oid { + t.Fatalf("ReadReaderFull(%s) hashes to %s", oid, got) + } +} diff --git a/object/store/packed/reader.go b/object/store/packed/reader.go new file mode 100644 index 00000000..cf433cfc --- /dev/null +++ b/object/store/packed/reader.go @@ -0,0 +1,226 @@ +package packed + +import ( + "bytes" + "fmt" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/iolimit" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/typ" + "lindenii.org/go/lgo/intconv" +) + +// ReadBytesContent reads an object's type and content bytes, +// fully resolving delta chains. +func (packed *Packed) ReadBytesContent(objectID id.ObjectID) (typ.Type, []byte, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return typ.Unknown, nil, err + } + + entryType, content, err := packed.unpackEntry(p, offset) + if err != nil { + return typ.Unknown, nil, err + } + + ty, err := objectTypeOf(entryType) + if err != nil { + return typ.Unknown, nil, err + } + + return ty, content, nil +} + +// ReadBytesFull reads a full serialized object as "type size\x00content", +// fully resolving delta chains. +func (packed *Packed) ReadBytesFull(objectID id.ObjectID) ([]byte, error) { + ty, content, err := packed.ReadBytesContent(objectID) + if err != nil { + return nil, err + } + + raw := header.Append(make([]byte, 0, len(content)+32), ty, len(content)) + + return append(raw, content...), nil +} + +// ReadHeader reads an object's type and declared content length. +// +// For delta entries this resolves the chained base type +// and the declared delta result size, +// without reconstructing content. +func (packed *Packed) ReadHeader(objectID id.ObjectID) (typ.Type, int, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return typ.Unknown, 0, err + } + + entryHeader, payload, err := p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return typ.Unknown, 0, err + } + + var size int + + if entryHeader.Type.IsDelta() { + size, err = deltaResultSize(payload, entryHeader.Size) + if err != nil { + return typ.Unknown, 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + } else { + size, err = intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return typ.Unknown, 0, fmt.Errorf("%w: pack %q: object size overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + } + + entryType, err := packed.resolveType(p, offset, entryHeader) + if err != nil { + return typ.Unknown, 0, err + } + + ty, err := objectTypeOf(entryType) + if err != nil { + return typ.Unknown, 0, err + } + + return ty, size, nil +} + +// ReadSize reads an object's declared content length. +// +// Unlike ReadHeader, +// this never walks delta chains. +func (packed *Packed) ReadSize(objectID id.ObjectID) (int, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return 0, err + } + + entryHeader, payload, err := p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return 0, err + } + + if !entryHeader.Type.IsDelta() { + size, err := intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: object size overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + return size, nil + } + + size, err := deltaResultSize(payload, entryHeader.Size) + if err != nil { + return 0, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + return size, nil +} + +// ReadReaderContent reads an object's type, +// declared content length, and content stream. +// +// Non-delta entries stream directly from the pack; +// delta entries are fully resolved in memory first. +// Close releases resources only +// and does not drain unread data for additional validation. +func (packed *Packed) ReadReaderContent(objectID id.ObjectID) (typ.Type, int, io.ReadCloser, error) { + p, offset, err := packed.lookup(objectID) + if err != nil { + return typ.Unknown, 0, nil, err + } + + entryHeader, payload, err := p.entryHeaderAt(offset, packed.objectFormat) + if err != nil { + return typ.Unknown, 0, nil, err + } + + if !entryHeader.Type.IsBase() { + entryType, content, err := packed.unpackEntry(p, offset) + if err != nil { + return typ.Unknown, 0, nil, err + } + + ty, err := objectTypeOf(entryType) + if err != nil { + return typ.Unknown, 0, nil, err + } + + return ty, len(content), io.NopCloser(bytes.NewReader(content)), nil + } + + ty, err := objectTypeOf(entryHeader.Type) + if err != nil { + return typ.Unknown, 0, nil, err + } + + size, err := intconv.Uint64ToInt(entryHeader.Size) + if err != nil { + return typ.Unknown, 0, nil, fmt.Errorf("%w: pack %q: object size overflows int: %w", ErrMalformedPackedStore, p.name, err) + } + + zr, err := zlib.NewReaderBytes(payload) + if err != nil { + return typ.Unknown, 0, nil, fmt.Errorf("%w: pack %q: %w", ErrMalformedPackedStore, p.name, err) + } + + return ty, size, &objectReader{ + reader: iolimit.ExpectLengthReader(zr, size), + zr: zr, + }, nil +} + +// ReadReaderFull reads a full serialized object stream +// as "type size\x00content". +// +// Non-delta entries stream directly from the pack; +// delta entries are fully resolved in memory first. +// Close releases resources only +// and does not drain unread data for additional validation. +func (packed *Packed) ReadReaderFull(objectID id.ObjectID) (io.ReadCloser, error) { + ty, size, reader, err := packed.ReadReaderContent(objectID) + if err != nil { + return nil, err + } + + headerBytes := header.Append(nil, ty, size) + + return &objectReader{ + reader: io.MultiReader(bytes.NewReader(headerBytes), reader), + zr: reader, + }, nil +} + +// objectTypeOf converts one packfile entry type +// into an ordinary object type. +func objectTypeOf(entryType packfile.EntryType) (typ.Type, error) { + ty, err := entryType.ObjectType() + if err != nil { + return typ.Unknown, fmt.Errorf("%w: %w", ErrMalformedPackedStore, err) + } + + return ty, nil +} + +// objectReader streams one packed object payload +// and owns the underlying decompressor. +type objectReader struct { + // reader is the stream exposed by Read. + reader io.Reader + // zr is closed by Close. + zr io.Closer +} + +func (reader *objectReader) Read(dst []byte) (int, error) { + return reader.reader.Read(dst) +} + +func (reader *objectReader) Close() error { + return reader.zr.Close() +} diff --git a/object/store/packed/refresh.go b/object/store/packed/refresh.go new file mode 100644 index 00000000..f06e9859 --- /dev/null +++ b/object/store/packed/refresh.go @@ -0,0 +1,69 @@ +package packed + +import ( + "fmt" + "io/fs" + "strings" +) + +// Refresh rescans the pack directory +// and replaces the store's view of available packs. +// +// Every index found must parse +// and have its pack data present and consistent; +// otherwise Refresh fails without changing the view. +func (packed *Packed) Refresh() error { + packed.refreshMu.Lock() + defer packed.refreshMu.Unlock() + + dirEntries, err := fs.ReadDir(packed.root.FS(), ".") + if err != nil { + return fmt.Errorf("object/store/packed: %w", err) + } + + next := make(map[string]*pack, len(packed.byName)) + + opened := make([]*pack, 0, len(dirEntries)) + + for _, dirEntry := range dirEntries { + name, ok := strings.CutSuffix(dirEntry.Name(), ".idx") + if !ok || dirEntry.IsDir() { + continue + } + + if existing, ok := packed.byName[name]; ok { + next[name] = existing + + continue + } + + p, err := openPack(packed.root, name, packed.objectFormat) + if err != nil { + for _, p := range opened { + _ = p.close() + } + + return err + } + + opened = append(opened, p) + next[name] = p + } + + for name, p := range packed.byName { + if _, ok := next[name]; !ok { + packed.retired = append(packed.retired, p) + } + } + + packed.byName = next + + present := make(map[*pack]struct{}, len(next)) + for _, p := range next { + present[p] = struct{}{} + } + + packed.order.Sync(present) + + return nil +} diff --git a/object/store/packed/refresh_test.go b/object/store/packed/refresh_test.go new file mode 100644 index 00000000..e54dc97d --- /dev/null +++ b/object/store/packed/refresh_test.go @@ -0,0 +1,108 @@ +package packed_test + +import ( + "errors" + "os" + "path/filepath" + "testing" + + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" +) + +func TestRefreshIsExplicit(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + _, prefix, seeded := makeGitPack(t, objectFormat) + oids := seeded.All() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + _, _, err = packedStore.ReadBytesContent(oids[0]) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("empty store read error = %v, want ErrObjectNotFound", err) + } + + t.Helper() + + base := filepath.Base(prefix) + cp(t, prefix+".pack", filepath.Join(dir, base+".pack")) + cp(t, prefix+".idx", filepath.Join(dir, base+".idx")) + + // New packs must stay invisible until an explicit Refresh. + _, _, err = packedStore.ReadBytesContent(oids[0]) + if !errors.Is(err, store.ErrObjectNotFound) { + t.Fatalf("pre-Refresh read error = %v, want ErrObjectNotFound", err) + } + + err = packedStore.Refresh() + if err != nil { + t.Fatalf("Refresh: %v", err) + } + + for _, oid := range oids { + _, _, err := packedStore.ReadBytesContent(oid) + if err != nil { + t.Fatalf("post-Refresh ReadBytesContent(%s): %v", oid, err) + } + } + }) + } +} + +func TestRefreshRejectsIndexWithoutPack(t *testing.T) { + t.Parallel() + + objectFormat := id.ObjectFormatSHA256 + + _, prefix, _ := makeGitPack(t, objectFormat) + + dir := t.TempDir() + cp(t, prefix+".idx", filepath.Join(dir, filepath.Base(prefix)+".idx")) + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + _, err = packed.New(root, objectFormat) + if err == nil { + t.Fatalf("New with orphan index: expected error") + } +} + +// cp copies one file from src to dst. +func cp(t *testing.T, src, dst string) { + t.Helper() + + data, err := os.ReadFile(src) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + + err = os.WriteFile(dst, data, 0o600) //#nosec G703 + if err != nil { + t.Fatalf("WriteFile: %v", err) + } +} diff --git a/object/store/packed/writer.go b/object/store/packed/writer.go new file mode 100644 index 00000000..59309c24 --- /dev/null +++ b/object/store/packed/writer.go @@ -0,0 +1,38 @@ +package packed + +import ( + "fmt" + "io" + + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed/internal/ingest" +) + +var _ store.PackWriter = (*Packed)(nil) + +// WritePack ingests one pack stream into the pack store, +// publishing a pack, index, and reverse index +// under content-addressed names derived from the pack trailer hash. +// +// WritePack consumes the pack stream through its trailer and stops there. +// It does not require src to reach EOF afterward, +// so it is safe on a still-open transport connection, +// such as receive-pack, +// whose peer keeps the connection open to read the response. +// +// The pack must be the last thing the peer sends before that response: +// any bytes arriving immediately after the trailer +// are rejected as a malformed pack. +func (packed *Packed) WritePack(src io.Reader, opts store.PackWriteOptions) error { + _, err := ingest.WritePack(packed.root, packed.objectFormat, src, opts) + if err != nil { + return err //nolint:wrapcheck + } + + err = packed.Refresh() + if err != nil { + return fmt.Errorf("object/store/packed: refresh after pack write: %w", err) + } + + return nil +} diff --git a/object/store/packed/writer_test.go b/object/store/packed/writer_test.go new file mode 100644 index 00000000..8227caa7 --- /dev/null +++ b/object/store/packed/writer_test.go @@ -0,0 +1,105 @@ +package packed_test + +import ( + "bytes" + "os" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/typ" +) + +// TestWritePack verifies that writing a pack through the store +// makes its objects readable without a manual refresh. +func TestWritePack(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + stream, err := repo.PackObjectsStdout(t, seeded.All(), testgit.PackObjectsStdoutOptions{ + Revs: false, + Thin: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + packedStore := openEmptyStore(t, objectFormat) + + err = packedStore.WritePack(bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("WritePack: %v", err) + } + + probes := []struct { + ty typ.Type + oid id.ObjectID + }{ + {typ.Blob, seeded.Blobs[0]}, + {typ.Tree, seeded.Trees[0]}, + {typ.Commit, seeded.Commits[len(seeded.Commits)-1]}, + {typ.Tag, seeded.Tags[0]}, + } + + for _, probe := range probes { + want, err := repo.CatFile(t, probe.ty, probe.oid) + if err != nil { + t.Fatalf("CatFile(%s): %v", probe.oid, err) + } + + ty, content, err := packedStore.ReadBytesContent(probe.oid) + if err != nil { + t.Fatalf("ReadBytesContent(%s): %v", probe.oid, err) + } + + if ty != probe.ty { + t.Fatalf("ReadBytesContent(%s) type = %v, want %v", probe.oid, ty, probe.ty) + } + + if !bytes.Equal(content, want) { + t.Fatalf("ReadBytesContent(%s) content mismatch", probe.oid) + } + } + }) + } +} + +// openEmptyStore opens a packed store over a fresh empty directory. +func openEmptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} |
