diff options
| author | 2026-06-12 18:41:58 +0000 | |
|---|---|---|
| committer | 2026-06-12 18:41:58 +0000 | |
| commit | 7faa841b581dbbacf563a6ca3167efbfd697d37c (patch) | |
| tree | ab54845bcf708b1099f88a339d18bdf1cdb6f23f /object | |
| parent | object/store/packed: Add missing t.Helper (diff) | |
object/store/packed: Add basic ingestion
Diffstat (limited to 'object')
| -rw-r--r-- | object/store/packed/internal/ingest/finalize.go | 150 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/ingest.go | 250 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/resolve.go | 306 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/scan.go | 467 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/thin.go | 226 | ||||
| -rw-r--r-- | object/store/packed/internal/ingest/writepack_test.go | 417 | ||||
| -rw-r--r-- | object/store/packed/quarantine.go | 215 | ||||
| -rw-r--r-- | object/store/packed/writer.go | 40 | ||||
| -rw-r--r-- | object/store/packed/writer_test.go | 105 |
9 files changed, 2116 insertions, 60 deletions
diff --git a/object/store/packed/internal/ingest/finalize.go b/object/store/packed/internal/ingest/finalize.go new file mode 100644 index 00000000..7dca131a --- /dev/null +++ b/object/store/packed/internal/ingest/finalize.go @@ -0,0 +1,150 @@ +package ingest + +import ( + "errors" + "fmt" + "io" + "io/fs" + "slices" + + "lindenii.org/go/furgit/internal/format/packidx" + "lindenii.org/go/furgit/internal/format/packrev" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// finalize writes the index and reverse index, +// then links the pack, reverse index, and index +// to their content-addressed names. +func (ingestion *ingestion) finalize() (Result, error) { + entries, positions, err := ingestion.indexEntries() + if err != nil { + return Result{}, err + } + + packHash := ingestion.packHash.Bytes() + + idxTmp, err := ingestion.writeTemp("tmp_idx_", func(w io.Writer) error { + return packidx.Write(w, ingestion.objectFormat, entries, packHash) + }) + if err != nil { + return Result{}, err + } + + revTmp, err := ingestion.writeTemp("tmp_rev_", func(w io.Writer) error { + return packrev.Write(w, ingestion.objectFormat, positions, packHash) + }) + if err != nil { + return Result{}, err + } + + base := "pack-" + ingestion.packHash.String() + packFinal := base + ".pack" + idxFinal := base + ".idx" + revFinal := base + ".rev" + + // Link the pack and reverse index before the index, + // since the index is what publishes the pack to readers. + err = ingestion.link(ingestion.packTmp, packFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(revTmp, revFinal) + if err != nil { + return Result{}, err + } + + err = ingestion.link(idxTmp, idxFinal) + if err != nil { + return Result{}, err + } + + objectCount, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return Result{ + PackName: packFinal, + IdxName: idxFinal, + RevName: revFinal, + PackHash: ingestion.packHash, + ObjectCount: objectCount, + ThinFixed: ingestion.thinFixed, + }, nil +} + +// indexEntries returns the index entries in object-ID order +// and, for each record in pack order, its position in that index order. +func (ingestion *ingestion) indexEntries() ([]packidx.Entry, []uint32, error) { + order := make([]int, len(ingestion.records)) + for i := range order { + order[i] = i + } + + slices.SortFunc(order, func(left, right int) int { + return ingestion.records[left].oid.Compare(ingestion.records[right].oid) + }) + + entries := make([]packidx.Entry, len(order)) + positions := make([]uint32, len(ingestion.records)) + + for indexPosition, recordIndex := range order { + rec := &ingestion.records[recordIndex] + + var oidBytes [id.MaxObjectIDSize]byte + copy(oidBytes[:], rec.oid.RawBytes()) + + entries[indexPosition] = packidx.Entry{ + OID: oidBytes, + Offset: rec.offset, + CRC32: rec.crc32, + } + + position, err := intconv.IntToUint32(indexPosition) + if err != nil { + return nil, nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + positions[recordIndex] = position + } + + return entries, positions, nil +} + +// writeTemp creates a temporary file, +// writes it via write, syncs it, and returns its name. +func (ingestion *ingestion) writeTemp(prefix string, write func(io.Writer) error) (string, error) { + name, file, err := ingestion.createTemp(prefix) + if err != nil { + return "", err + } + + defer func() { _ = file.Close() }() + + err = write(file) + if err != nil { + return "", err + } + + err = file.Sync() + if err != nil { + return "", fmt.Errorf("object/store/packed/internal/ingest: syncing %q: %w", name, err) + } + + return name, nil +} + +// link hard-links tmp to final, +// treating an already-present destination as success. +func (ingestion *ingestion) link(tmp, final string) error { + err := ingestion.root.Link(tmp, final) + if err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("object/store/packed/internal/ingest: linking %q: %w", final, err) + } + + _ = ingestion.root.Remove(tmp) + + return nil +} diff --git a/object/store/packed/internal/ingest/ingest.go b/object/store/packed/internal/ingest/ingest.go new file mode 100644 index 00000000..324ed8ce --- /dev/null +++ b/object/store/packed/internal/ingest/ingest.go @@ -0,0 +1,250 @@ +package ingest + +import ( + "bytes" + "crypto/rand" + "errors" + "fmt" + "io" + "io/fs" + "os" + + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" +) + +var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names") + +// ingestion holds the state for one WritePack call. +type ingestion struct { + // root is the destination objects/pack directory. + root *os.Root + + // objectFormat is the pack's object format. + objectFormat id.ObjectFormat + + // opts carries the thin base reader and progress sink. + opts store.PackWriteOptions + + // src is the pack stream, positioned just past the header. + src io.Reader + + // packFile is the temporary pack file being written, + // and packTmp is its destination-relative name. + packFile *os.File + packTmp string + + // temps lists every temporary file to remove on failure. + temps []string + + // scanner streams src into packFile while scanning entries. + scanner *scanner + + // records holds one entry per object, in pack-offset order. + records []record + + // byOffset maps an entry offset to its record index, + // and byOID maps a resolved object ID to its record index. + byOffset map[uint64]int + byOID map[id.ObjectID]int + + // headerCount is the object count declared by the pack header. + headerCount uint32 + + // deltaCount counts delta records, accumulated during scanning. + deltaCount uint64 + + // deltasResolved counts resolved delta records, for progress. + deltasResolved uint64 + + // packHash is the final pack trailer hash. + packHash id.ObjectID + + // thinFixed reports whether thin completion appended local bases. + thinFixed bool + + // committed suppresses temporary file removal once artifacts are published. + committed bool +} + +// WritePack ingests one pack stream into root, +// publishing a pack, index, and reverse index +// under content-addressed names derived from the pack trailer hash. +// +// WritePack consumes the pack stream through its trailer and stops there. +// It does not require src to reach EOF afterward, +// so it is safe on a still-open transport connection, +// such as receive-pack, +// whose peer keeps the connection open to read the response. +// +// The pack must be the last thing the peer sends before that response: +// any bytes arriving immediately after the trailer +// are rejected as a malformed pack. +func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) { + if objectFormat.Size() == 0 { + return Result{}, id.ErrInvalidObjectFormat + } + + headerRaw, count, err := readPackHeader(src) + if err != nil { + return Result{}, err + } + + ingestion := &ingestion{ + root: root, + objectFormat: objectFormat, + opts: opts, + src: src, + packFile: nil, + packTmp: "", + temps: nil, + scanner: nil, + records: nil, + byOffset: make(map[uint64]int), + byOID: make(map[id.ObjectID]int), + headerCount: count, + deltaCount: 0, + deltasResolved: 0, + packHash: id.ObjectID{}, + thinFixed: false, + committed: false, + } + + defer ingestion.cleanup() + + if count == 0 { + return ingestion.finishEmpty(headerRaw) + } + + err = ingestion.openPackTemp(headerRaw) + if err != nil { + return Result{}, err + } + + err = ingestion.streamAndScan() + if err != nil { + return Result{}, err + } + + err = ingestion.resolveDeltas() + if err != nil { + return Result{}, err + } + + err = ingestion.packFile.Sync() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: syncing pack: %w", err) + } + + result, err := ingestion.finalize() + if err != nil { + return Result{}, err + } + + ingestion.committed = true + + return result, nil +} + +// finishEmpty verifies the trailer of a zero-object pack +// and returns success without writing any artifacts. +func (ingestion *ingestion) finishEmpty(headerRaw [packfile.HeaderLen]byte) (Result, error) { + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + trailer := make([]byte, ingestion.objectFormat.Size()) + + _, err = io.ReadFull(ingestion.src, trailer) + if err != nil { + return Result{}, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if !bytes.Equal(hashImpl.Sum(nil), trailer) { + return Result{}, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return Result{ + PackName: "", + IdxName: "", + RevName: "", + PackHash: packHash, + ObjectCount: 0, + ThinFixed: false, + }, nil +} + +// openPackTemp creates the temporary pack file, +// writes the validated header to it, +// and builds the stream scanner seeded with that header. +func (ingestion *ingestion) openPackTemp(headerRaw [packfile.HeaderLen]byte) error { + name, file, err := ingestion.createTemp("tmp_pack_") + if err != nil { + return err + } + + ingestion.packTmp = name + ingestion.packFile = file + + _, err = file.Write(headerRaw[:]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack header: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(headerRaw[:]) + + ingestion.scanner = newScanner(ingestion.src, ingestion.packFile, hashImpl) + + return nil +} + +// createTemp creates one temporary file under root, +// recording its name for cleanup on failure. +func (ingestion *ingestion) createTemp(prefix string) (string, *os.File, error) { + for range 32 { + name := prefix + rand.Text() + + file, err := ingestion.root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644) + if err == nil { + ingestion.temps = append(ingestion.temps, name) + + return name, file, nil + } + + if !errors.Is(err, fs.ErrExist) { + return "", nil, fmt.Errorf("object/store/packed/internal/ingest: creating temp file: %w", err) + } + } + + return "", nil, errTempNamesExhausted +} + +// cleanup closes the pack file +// and removes temporary files unless the write committed. +func (ingestion *ingestion) cleanup() { + if ingestion.packFile != nil { + _ = ingestion.packFile.Close() + } + + if ingestion.committed { + return + } + + for _, name := range ingestion.temps { + _ = ingestion.root.Remove(name) + } +} diff --git a/object/store/packed/internal/ingest/resolve.go b/object/store/packed/internal/ingest/resolve.go new file mode 100644 index 00000000..7e163f2d --- /dev/null +++ b/object/store/packed/internal/ingest/resolve.go @@ -0,0 +1,306 @@ +package ingest + +import ( + "fmt" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/format/packfile/delta" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// adjacency maps each resolvable base to its delta children: +// ofs-deltas keyed by base offset, ref-deltas keyed by base object ID. +type adjacency struct { + byOffset map[uint64][]int + byOID map[id.ObjectID][]int +} + +// resolveDeltas resolves every delta record into a final object ID and type, +// completing thin packs from the external base reader when required. +func (ingestion *ingestion) resolveDeltas() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "resolving deltas", + Total: ingestion.countDeltas(), + Delay: 0, + Sparse: false, + Throughput: false, + }) + + adjacency := ingestion.buildAdjacency() + + err := ingestion.resolveFrom(ingestion.resolvedRoots(), adjacency, meter) + if err != nil { + return err + } + + external := ingestion.unresolvedExternalBases() + + switch { + case len(external) == 0 && ingestion.countUnresolved() > 0: + return fmt.Errorf("%w: unresolvable delta entries", ErrMalformedPack) + case len(external) > 0: + err = ingestion.fixThin(external, adjacency, meter) + if err != nil { + return err + } + } + + meter.Stop("done") + + return nil +} + +// buildAdjacency indexes every delta record by its base, +// so a resolved base can find the children that delta against it. +func (ingestion *ingestion) buildAdjacency() adjacency { + out := adjacency{ + byOffset: make(map[uint64][]int), + byOID: make(map[id.ObjectID][]int), + } + + for index := range ingestion.records { + rec := &ingestion.records[index] + + switch rec.packedType { + case packfile.EntryTypeOfsDelta: + out.byOffset[rec.baseOffset] = append(out.byOffset[rec.baseOffset], index) + case packfile.EntryTypeRefDelta: + out.byOID[rec.baseOID] = append(out.byOID[rec.baseOID], index) + case packfile.EntryTypeInvalid, + packfile.EntryTypeCommit, + packfile.EntryTypeTree, + packfile.EntryTypeBlob, + packfile.EntryTypeTag, + packfile.EntryTypeFuture: + } + } + + return out +} + +// resolveFrom resolves the delta subtree rooted at each resolved record. +func (ingestion *ingestion) resolveFrom(roots []int, adjacency adjacency, meter *progress.Meter) error { + for _, root := range roots { + content, err := ingestion.inflateRecord(root) + if err != nil { + return err + } + + err = ingestion.resolveSubtree(root, content, ingestion.records[root].objectType, 0, adjacency, meter) + if err != nil { + return err + } + } + + return nil +} + +// resolveSubtree resolves every delta child of one resolved record at depth, +// holding the record's content as the base for its children. +func (ingestion *ingestion) resolveSubtree( + index int, + content []byte, + objectType packfile.EntryType, + depth int, + adjacency adjacency, + meter *progress.Meter, +) error { + rec := &ingestion.records[index] + + for _, child := range adjacency.byOffset[rec.offset] { + err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) + if err != nil { + return err + } + } + + for _, child := range adjacency.byOID[rec.oid] { + err := ingestion.resolveChild(child, content, objectType, depth+1, adjacency, meter) + if err != nil { + return err + } + } + + return nil +} + +// resolveChild applies one delta record at depth against its base content, +// finalizes the record, and recurses into its own children. +func (ingestion *ingestion) resolveChild( + index int, + baseContent []byte, + baseType packfile.EntryType, + depth int, + adjacency adjacency, + meter *progress.Meter, +) error { + rec := &ingestion.records[index] + if rec.resolved { + return nil + } + + if depth > delta.MaxChainDepth { + return fmt.Errorf("%w: entry at %d: delta chain too deep", ErrMalformedPack, rec.offset) + } + + deltaPayload, err := ingestion.inflateRecord(index) + if err != nil { + return err + } + + baseSize, resultSize, _, err := delta.ParseHeaderSizes(deltaPayload) + if err != nil { + return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + if baseSize != uint64(len(baseContent)) { + return fmt.Errorf("%w: entry at %d: delta base size mismatch", ErrMalformedPack, rec.offset) + } + + content, err := delta.Apply(baseContent, deltaPayload) + if err != nil { + return fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + if uint64(len(content)) != resultSize { + return fmt.Errorf("%w: entry at %d: delta result size mismatch", ErrMalformedPack, rec.offset) + } + + oid, err := ingestion.hashObject(baseType, content) + if err != nil { + return err + } + + rec.objectType = baseType + rec.oid = oid + rec.resolved = true + ingestion.byOID[oid] = index + + ingestion.deltasResolved++ + meter.Set(ingestion.deltasResolved, 0) + + return ingestion.resolveSubtree(index, content, baseType, depth, adjacency, meter) +} + +// inflateRecord inflates one record's payload from the temporary pack file. +func (ingestion *ingestion) inflateRecord(index int) ([]byte, error) { + rec := &ingestion.records[index] + + offset, err := intconv.Uint64ToInt64(rec.dataOffset()) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + compressedLen, err := intconv.Uint64ToInt64(rec.packedLen - rec.headerLen) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + size, err := intconv.Uint64ToInt(rec.declaredSize) + if err != nil { + return nil, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + zr, err := zlib.NewReader(io.NewSectionReader(ingestion.packFile, offset, compressedLen)) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + out := make([]byte, size) + + _, err = io.ReadFull(zr, out) + if err != nil { + return nil, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + return out, nil +} + +// hashObject computes the object ID of one resolved object. +func (ingestion *ingestion) hashObject(objectType packfile.EntryType, content []byte) (id.ObjectID, error) { + var zero id.ObjectID + + ty, err := objectType.ObjectType() + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, ty, uint64(len(content)))) + _, _ = hashImpl.Write(content) + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return oid, nil +} + +// resolvedRoots returns the indices of every currently resolved record. +func (ingestion *ingestion) resolvedRoots() []int { + var roots []int + + for index := range ingestion.records { + if ingestion.records[index].resolved { + roots = append(roots, index) + } + } + + return roots +} + +// countDeltas returns the number of delta records. +func (ingestion *ingestion) countDeltas() uint64 { + return ingestion.deltaCount +} + +// countUnresolved returns the number of records that remain unresolved. +// +// Every base is resolved during scanning or thin completion, +// so the unresolved records are exactly the unresolved deltas: +// the delta records minus those already resolved. +func (ingestion *ingestion) countUnresolved() uint64 { + return ingestion.deltaCount - ingestion.deltasResolved +} + +// unresolvedExternalBases returns the unique base object IDs +// of unresolved ref-deltas whose base is not present in the pack, +// in first-reference order. +func (ingestion *ingestion) unresolvedExternalBases() []id.ObjectID { + seen := make(map[id.ObjectID]struct{}) + + var out []id.ObjectID + + for index := range ingestion.records { + rec := &ingestion.records[index] + if rec.resolved || rec.packedType != packfile.EntryTypeRefDelta { + continue + } + + if _, ok := ingestion.byOID[rec.baseOID]; ok { + continue + } + + if _, ok := seen[rec.baseOID]; ok { + continue + } + + seen[rec.baseOID] = struct{}{} + out = append(out, rec.baseOID) + } + + return out +} diff --git a/object/store/packed/internal/ingest/scan.go b/object/store/packed/internal/ingest/scan.go new file mode 100644 index 00000000..56e42cea --- /dev/null +++ b/object/store/packed/internal/ingest/scan.go @@ -0,0 +1,467 @@ +package ingest + +import ( + "bytes" + "errors" + "fmt" + "hash" + "hash/crc32" + "io" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/header" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/lgo/intconv" +) + +// scanBufferSize is the stream scanner's fixed input window size. +const scanBufferSize = 64 << 10 + +// scanner reads one pack stream, +// mirroring consumed bytes into the destination pack file +// while maintaining the running pack hash and a per-entry CRC. +// +// It implements [io.Reader] and [io.ByteReader] +// so a zlib reader can consume an entry payload through it +// without reading past the end of that compressed stream. +type scanner struct { + src io.Reader + dst io.Writer + + // buf[off:n] is the unread window. + buf []byte + off int + n int + + // consumed counts stream bytes consumed so far. + consumed uint64 + + // hash accumulates the pack hash over consumed bytes + // while hashing is true. + hash hash.Hash + hashing bool + + // crc accumulates the CRC of the current entry + // while crcing is true. + crc uint32 + crcing bool +} + +// newScanner constructs one scanner mirroring src into dst, +// seeding the running hash from the already-consumed pack header. +func newScanner(src io.Reader, dst io.Writer, packHash hash.Hash) *scanner { + return &scanner{ + src: src, + dst: dst, + buf: make([]byte, scanBufferSize), + consumed: packfile.HeaderLen, + hash: packHash, + hashing: true, + crc: 0, + crcing: false, + } +} + +// readPackHeader reads and validates the pack header from src, +// returning the raw header and its declared object count. +func readPackHeader(src io.Reader) ([packfile.HeaderLen]byte, uint32, error) { + var raw [packfile.HeaderLen]byte + + _, err := io.ReadFull(src, raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: reading header: %w", ErrMalformedPack, err) + } + + packHeader, err := packfile.ParseHeader(raw[:]) + if err != nil { + return raw, 0, fmt.Errorf("%w: %w", ErrMalformedPack, err) + } + + return raw, packHeader.ObjectCount, nil +} + +// Read implements [io.Reader]. +func (scanner *scanner) Read(dst []byte) (int, error) { + if len(dst) == 0 { + return 0, nil + } + + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + read := min(len(dst), scanner.n-scanner.off) + + copy(dst, scanner.buf[scanner.off:scanner.off+read]) + + err = scanner.use(read) + if err != nil { + return 0, err + } + + return read, nil +} + +// ReadByte implements [io.ByteReader] without allocation. +func (scanner *scanner) ReadByte() (byte, error) { + err := scanner.ensureAvailable() + if err != nil { + return 0, err + } + + b := scanner.buf[scanner.off] + + err = scanner.use(1) + if err != nil { + return 0, err + } + + return b, nil +} + +// ensureAvailable makes at least one unread byte available, +// returning [io.EOF] once the source is exhausted. +func (scanner *scanner) ensureAvailable() error { + for scanner.n-scanner.off == 0 { + err := scanner.flushPrefix() + if err != nil { + return err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + if scanner.n-scanner.off == 0 { + return io.EOF + } + + return nil + } + + return fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 && scanner.n-scanner.off == 0 { + return io.ErrNoProgress + } + } + + return nil +} + +// peekHeader returns the unread window grown to at most maxLen bytes +// without consuming, tolerating an early end of stream. +func (scanner *scanner) peekHeader(maxLen int) ([]byte, error) { + maxLen = min(maxLen, len(scanner.buf)) + + for scanner.n-scanner.off < maxLen { + err := scanner.flushPrefix() + if err != nil { + return nil, err + } + + read, err := scanner.src.Read(scanner.buf[scanner.n:]) + scanner.n += read + + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) + } + + if read == 0 { + break + } + } + + if scanner.n-scanner.off == 0 { + return nil, fmt.Errorf("%w: unexpected end of stream", ErrMalformedPack) + } + + return scanner.buf[scanner.off:scanner.n], nil +} + +// use consumes n unread bytes, +// folding them into the running hash and entry CRC as enabled. +func (scanner *scanner) use(n int) error { + chunk := scanner.buf[scanner.off : scanner.off+n] + + if scanner.hashing { + _, err := scanner.hash.Write(chunk) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: hashing pack: %w", err) + } + } + + if scanner.crcing { + scanner.crc = crc32.Update(scanner.crc, crc32.IEEETable, chunk) + } + + scanner.off += n + scanner.consumed += uint64(n) //nolint:gosec + + return nil +} + +// flushPrefix writes the consumed buffer prefix to the destination +// and compacts the unread window to the start of the buffer. +func (scanner *scanner) flushPrefix() error { + if scanner.off == 0 { + return nil + } + + _, err := scanner.dst.Write(scanner.buf[:scanner.off]) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing pack: %w", err) + } + + unread := scanner.n - scanner.off + + copy(scanner.buf, scanner.buf[scanner.off:scanner.n]) + + scanner.off = 0 + scanner.n = unread + + return nil +} + +// beginCRC starts CRC accumulation for one entry. +func (scanner *scanner) beginCRC() { + scanner.crc = 0 + scanner.crcing = true +} + +// endCRC ends CRC accumulation and returns the entry CRC. +func (scanner *scanner) endCRC() uint32 { + crc := scanner.crc + scanner.crc = 0 + scanner.crcing = false + + return crc +} + +// finishTrailer reads and verifies the pack trailer hash, +// flushing the remaining buffered pack bytes to the destination. +// +// The trailer is mirrored to the destination but excluded from the pack hash. +func (scanner *scanner) finishTrailer(hashSize int) ([]byte, error) { + trailer := make([]byte, hashSize) + + scanner.hashing = false + + _, err := io.ReadFull(scanner, trailer) + if err != nil { + return nil, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) + } + + if scanner.n-scanner.off > 0 { + return nil, fmt.Errorf("%w: trailing data after pack", ErrMalformedPack) + } + + if !bytes.Equal(scanner.hash.Sum(nil), trailer) { + return nil, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) + } + + err = scanner.flushPrefix() + if err != nil { + return nil, err + } + + return trailer, nil +} + +// streamAndScan streams the pack body to the temporary pack file, +// scanning one record per declared object and verifying the trailer. +func (ingestion *ingestion) streamAndScan() error { + meter := progress.New(progress.Options{ + Writer: ingestion.opts.Progress, + Title: "receiving objects", + Total: uint64(ingestion.headerCount), + Delay: 0, + Sparse: false, + Throughput: true, + }) + + for done := range ingestion.headerCount { + err := ingestion.scanEntry(ingestion.scanner.consumed) + if err != nil { + return err + } + + meter.Set(uint64(done)+1, ingestion.scanner.consumed) + } + + meter.Stop("done") + + trailer, err := ingestion.scanner.finishTrailer(ingestion.objectFormat.Size()) + if err != nil { + return err + } + + packHash, err := ingestion.objectFormat.FromBytes(trailer) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// scanEntry scans the entry beginning at start into one record. +func (ingestion *ingestion) scanEntry(start uint64) error { + ingestion.scanner.beginCRC() + + rec, err := ingestion.scanHeader(start) + if err != nil { + return err + } + + inflated, oid, err := ingestion.drainPayload(&rec) + if err != nil { + return err + } + + if inflated != rec.declaredSize { + return fmt.Errorf( + "%w: entry at %d: inflated size %d differs from declared %d", + ErrMalformedPack, start, inflated, rec.declaredSize, + ) + } + + rec.packedLen = ingestion.scanner.consumed - start + rec.crc32 = ingestion.scanner.endCRC() + + if rec.packedType.IsBase() { + rec.objectType = rec.packedType + rec.oid = oid + rec.resolved = true + } else { + ingestion.deltaCount++ + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[rec.offset] = index + + if rec.resolved { + ingestion.byOID[rec.oid] = index + } + + return nil +} + +// scanHeader parses and consumes the entry header at start. +func (ingestion *ingestion) scanHeader(start uint64) (record, error) { + var rec record + + rec.offset = start + + window, err := ingestion.scanner.peekHeader(packfile.MaxEntryHeaderLen(ingestion.objectFormat.Size())) + if err != nil { + return rec, err + } + + entryHeader, err := packfile.ParseEntryHeader(window, ingestion.objectFormat.Size()) + if err != nil { + return rec, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, start, err) + } + + headerLen, err := intconv.IntToUint64(entryHeader.HeaderLen) + if err != nil { + return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + rec.packedType = entryHeader.Type + rec.declaredSize = entryHeader.Size + rec.headerLen = headerLen + + switch entryHeader.Type { + case packfile.EntryTypeCommit, packfile.EntryTypeTree, packfile.EntryTypeBlob, packfile.EntryTypeTag: + case packfile.EntryTypeOfsDelta: + if entryHeader.OfsDistance == 0 || entryHeader.OfsDistance > start { + return rec, fmt.Errorf("%w: entry at %d: ofs-delta base out of bounds", ErrMalformedPack, start) + } + + rec.baseOffset = start - entryHeader.OfsDistance + case packfile.EntryTypeRefDelta: + baseID, err := ingestion.objectFormat.FromBytes(entryHeader.RefBase[:ingestion.objectFormat.Size()]) + if err != nil { + return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + rec.baseOID = baseID + case packfile.EntryTypeInvalid, packfile.EntryTypeFuture: + return rec, fmt.Errorf("%w: entry at %d: unsupported entry type", ErrMalformedPack, start) + } + + err = ingestion.scanner.use(entryHeader.HeaderLen) + if err != nil { + return rec, err + } + + return rec, nil +} + +// drainPayload consumes one entry's compressed payload from the stream, +// returning its inflated length and, for base entries, its object ID. +func (ingestion *ingestion) drainPayload(rec *record) (uint64, id.ObjectID, error) { + var zero id.ObjectID + + zr, err := zlib.NewReader(ingestion.scanner) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + defer func() { _ = zr.Close() }() + + if !rec.packedType.IsBase() { + read, err := io.Copy(io.Discard, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + inflated, err := intconv.Int64ToUint64(read) + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return inflated, zero, nil + } + + objectType, err := rec.packedType.ObjectType() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, _ = hashImpl.Write(header.Append(nil, objectType, rec.declaredSize)) + + read, err := io.Copy(hashImpl, zr) + if err != nil { + return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) + } + + oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + inflated, err := intconv.Int64ToUint64(read) + if err != nil { + return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + return inflated, oid, nil +} diff --git a/object/store/packed/internal/ingest/thin.go b/object/store/packed/internal/ingest/thin.go new file mode 100644 index 00000000..e96846cb --- /dev/null +++ b/object/store/packed/internal/ingest/thin.go @@ -0,0 +1,226 @@ +package ingest + +import ( + "errors" + "fmt" + "hash/crc32" + "io" + "os" + + "lindenii.org/go/furgit/internal/compress/zlib" + "lindenii.org/go/furgit/internal/format/packfile" + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/typ" + "lindenii.org/go/lgo/intconv" +) + +// fixThin completes a thin pack +// by appending the external bases it references, +// rewriting the pack header and trailer, +// and resolving the deltas reached from the appended bases. +func (ingestion *ingestion) fixThin(external []id.ObjectID, adjacency adjacency, meter *progress.Meter) error { + if ingestion.opts.ThinBase == nil { + return ErrThinPackNotPermitted + } + + hashSize := uint64(ingestion.objectFormat.Size()) //nolint:gosec + if ingestion.scanner.consumed < uint64(packfile.HeaderLen)+hashSize { + return fmt.Errorf("%w: pack shorter than trailer", ErrMalformedPack) + } + + // Drop the trailer from the write cursor. + ingestion.scanner.consumed -= hashSize + + var appended []int + + for _, baseOID := range external { + ty, content, err := ingestion.opts.ThinBase.ReadBytesContent(baseOID) + if errors.Is(err, store.ErrObjectNotFound) { + continue + } + + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: reading thin base %s: %w", baseOID, err) + } + + index, err := ingestion.appendBaseObject(baseOID, ty, content) + if err != nil { + return err + } + + appended = append(appended, index) + } + + err := ingestion.rewriteHeaderTrailer() + if err != nil { + return err + } + + err = ingestion.resolveFrom(appended, adjacency, meter) + if err != nil { + return err + } + + missing := ingestion.unresolvedExternalBases() + if len(missing) > 0 { + return &ThinBasesMissingError{OIDs: missing} + } + + if ingestion.countUnresolved() > 0 { + return fmt.Errorf("%w: unresolvable delta entries after thin completion", ErrMalformedPack) + } + + ingestion.thinFixed = len(appended) > 0 + + return nil +} + +// appendBaseObject appends one external thin base +// as a non-delta pack entry at the current write cursor, +// verifying that its content hashes to the requested object ID. +func (ingestion *ingestion) appendBaseObject(objectID id.ObjectID, objectType typ.Type, content []byte) (int, error) { + entryType, err := packfile.EntryTypeFromObjectType(objectType) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + computed, err := ingestion.hashObject(entryType, content) + if err != nil { + return 0, err + } + + if computed != objectID { + return 0, fmt.Errorf("%w: thin base %s content hashes to %s", ErrMalformedPack, objectID, computed) + } + + start := ingestion.scanner.consumed + + startOffset, err := intconv.Uint64ToInt64(start) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + headerBytes := packfile.AppendTypeSize(nil, entryType, uint64(len(content))) + + _, err = ingestion.packFile.WriteAt(headerBytes, startOffset) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: writing thin base header: %w", err) + } + + crc := crc32.NewIEEE() + _, _ = crc.Write(headerBytes) + + dataOffset := startOffset + int64(len(headerBytes)) + writer := &offsetWriter{file: ingestion.packFile, offset: dataOffset} + + zw := zlib.NewWriter(io.MultiWriter(writer, crc)) + + _, err = zw.Write(content) + if err != nil { + _ = zw.Close() + + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + err = zw.Close() + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: compressing thin base: %w", err) + } + + compressedLen, err := intconv.Int64ToUint64(writer.offset - dataOffset) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + headerLen, err := intconv.IntToUint64(len(headerBytes)) + if err != nil { + return 0, fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + packedLen := headerLen + compressedLen + ingestion.scanner.consumed = start + packedLen + + rec := record{ + offset: start, + headerLen: headerLen, + packedLen: packedLen, + crc32: crc.Sum32(), + packedType: entryType, + declaredSize: uint64(len(content)), + baseOffset: 0, + baseOID: id.ObjectID{}, + objectType: entryType, + oid: objectID, + resolved: true, + } + + index := len(ingestion.records) + ingestion.records = append(ingestion.records, rec) + ingestion.byOffset[start] = index + ingestion.byOID[objectID] = index + + return index, nil +} + +// rewriteHeaderTrailer updates the pack object count +// and recomputes the pack trailer hash +// over the entries left after thin completion. +func (ingestion *ingestion) rewriteHeaderTrailer() error { + count, err := intconv.IntToUint32(len(ingestion.records)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = ingestion.packFile.WriteAt(packfile.AppendHeader(nil, count), 0) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rewriting header: %w", err) + } + + bodyEnd, err := intconv.Uint64ToInt64(ingestion.scanner.consumed) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + hashImpl, err := ingestion.objectFormat.New() + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + _, err = io.Copy(hashImpl, io.NewSectionReader(ingestion.packFile, 0, bodyEnd)) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: rehashing pack: %w", err) + } + + sum := hashImpl.Sum(nil) + + _, err = ingestion.packFile.WriteAt(sum, bodyEnd) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: writing trailer: %w", err) + } + + packHash, err := ingestion.objectFormat.FromBytes(sum) + if err != nil { + return fmt.Errorf("object/store/packed/internal/ingest: %w", err) + } + + ingestion.packHash = packHash + + return nil +} + +// offsetWriter writes to a file via WriteAt, +// advancing sequentially from a base offset. +type offsetWriter struct { + file *os.File + offset int64 +} + +// Write implements [io.Writer]. +func (writer *offsetWriter) Write(p []byte) (int, error) { + n, err := writer.file.WriteAt(p, writer.offset) + writer.offset += int64(n) + + return n, err //nolint:wrapcheck +} diff --git a/object/store/packed/internal/ingest/writepack_test.go b/object/store/packed/internal/ingest/writepack_test.go new file mode 100644 index 00000000..394d8f6e --- /dev/null +++ b/object/store/packed/internal/ingest/writepack_test.go @@ -0,0 +1,417 @@ +package ingest_test + +import ( + "bytes" + "errors" + "io" + "os" + "path/filepath" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/store/packed/internal/ingest" +) + +// TestWritePackMatchesGit verifies that ingesting a normal pack +// matches git's own pack, index, and reverse index. +// +// The pack is streamed through verbatim, +// and the index and reverse index are regenerated deterministically, +// so a successful match also confirms that scanning and delta resolution +// recovered every object ID, offset, and CRC that git recorded. +func TestWritePackMatchesGit(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if want := filepath.Base(gitPrefix) + ".pack"; result.PackName != want { + t.Fatalf("PackName = %q, want %q", result.PackName, want) + } + + for _, artifact := range []struct { + kind string + ours string + want string + }{ + {"pack", result.PackName, gitPrefix + ".pack"}, + {"idx", result.IdxName, gitPrefix + ".idx"}, + {"rev", result.RevName, gitPrefix + ".rev"}, + } { + ours, err := os.ReadFile(filepath.Join(dir, artifact.ours)) //nolint:gosec + if err != nil { + t.Fatalf("ReadFile %s: %v", artifact.kind, err) + } + + want, err := os.ReadFile(artifact.want) + if err != nil { + t.Fatalf("ReadFile git %s: %v", artifact.kind, err) + } + + if !bytes.Equal(ours, want) { + t.Errorf("%s differs from git: %d bytes vs %d", artifact.kind, len(ours), len(want)) + } + } + }) + } +} + +// TestWritePackEmpty verifies that a zero-object pack +// succeeds without writing any artifacts. +func TestWritePackEmpty(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + stream, err := repo.PackObjectsStdout(t, nil, testgit.PackObjectsStdoutOptions{ + Revs: false, + Thin: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if result.ObjectCount != 0 { + t.Fatalf("ObjectCount = %d, want 0", result.ObjectCount) + } + + if result.PackName != "" { + t.Fatalf("PackName = %q, want empty", result.PackName) + } + + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatalf("ReadDir: %v", err) + } + + if len(entries) != 0 { + t.Fatalf("empty pack wrote %d files, want 0", len(entries)) + } + }) + } +} + +// TestWritePackIdempotent verifies that ingesting the same pack twice +// into one store succeeds and leaves the artifacts in place. +func TestWritePackIdempotent(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + gitPrefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: true, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + stream, err := os.ReadFile(gitPrefix + ".pack") //nolint:gosec + if err != nil { + t.Fatalf("ReadFile pack: %v", err) + } + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + first, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("first WritePack: %v", err) + } + + second, err := ingest.WritePack(root, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("second WritePack: %v", err) + } + + if second.PackName != first.PackName { + t.Fatalf("second PackName = %q, want %q", second.PackName, first.PackName) + } + + for _, name := range []string{first.PackName, first.IdxName, first.RevName} { + _, err := os.Stat(filepath.Join(dir, name)) + if err != nil { + t.Fatalf("missing %q after re-write: %v", name, err) + } + } + }) + } +} + +// writePack ingests src into a fresh store directory, +// returning the directory and the ingest result. +func writePack( + t *testing.T, + objectFormat id.ObjectFormat, + src io.Reader, + opts store.PackWriteOptions, +) (string, ingest.Result) { + t.Helper() + + dir := t.TempDir() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + result, err := ingest.WritePack(root, objectFormat, src, opts) + if err != nil { + t.Fatalf("WritePack: %v", err) + } + + return dir, result +} + +// TestWritePackThin verifies that a thin pack is completed from the thin base +// and that git accepts the resulting self-contained pack. +func TestWritePackThin(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + thinBase := fullStore(t, repo, objectFormat, seeded) + stream := thinStream(t, repo, seeded) + + dir, result := writePack(t, objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: thinBase, + Progress: nil, + }) + + if !result.ThinFixed { + t.Fatalf("ThinFixed = false, want true (pack was not thin)") + } + + _, err := repo.VerifyPack(t, filepath.Join(dir, result.IdxName)) + if err != nil { + t.Fatalf("VerifyPack on completed pack: %v", err) + } + }) + } +} + +// TestWritePackThinWithoutBase verifies that a thin pack is rejected +// when no thin base is supplied. +func TestWritePackThinWithoutBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + + if !errors.Is(err, ingest.ErrThinPackNotPermitted) { + t.Fatalf("err = %v, want ErrThinPackNotPermitted", err) + } + }) + } +} + +// TestWritePackThinMissingBase verifies that a thin pack +// whose bases are absent from the thin base +// reports the missing object IDs. +func TestWritePackThinMissingBase(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, seeded := seedHistory(t, objectFormat) + emptyBase := emptyStore(t, objectFormat) + stream := thinStream(t, repo, seeded) + + _, err := ingest.WritePack(freshRoot(t), objectFormat, bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: emptyBase, + Progress: nil, + }) + + missing, ok := errors.AsType[*ingest.ThinBasesMissingError](err) + if !ok { + t.Fatalf("err = %v, want *ThinBasesMissingError", err) + } + + if len(missing.OIDs) == 0 { + t.Fatalf("ThinBasesMissingError reported no object IDs") + } + }) + } +} + +// seedHistory creates one repository with a seeded history. +func seedHistory(t *testing.T, objectFormat id.ObjectFormat) (*testgit.Repo, testgit.Seeded) { + t.Helper() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + return repo, seeded +} + +// thinStream produces a thin pack of the tip commit excluding its parent, +// so its deltas reference the omitted parent objects. +func thinStream(t *testing.T, repo *testgit.Repo, seeded testgit.Seeded) []byte { + t.Helper() + + tip := seeded.Commits[len(seeded.Commits)-1] + parent := seeded.Commits[len(seeded.Commits)-2] + + stream, err := repo.PackObjectsStdout(t, []id.ObjectID{tip}, testgit.PackObjectsStdoutOptions{ + Revs: true, + Thin: true, + Exclude: []id.ObjectID{parent}, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + return stream +} + +// fullStore opens a packed store over a pack of every seeded object. +func fullStore(t *testing.T, repo *testgit.Repo, objectFormat id.ObjectFormat, seeded testgit.Seeded) *packed.Packed { + t.Helper() + + prefix, err := repo.PackObjects(t, seeded.All(), testgit.PackObjectsOptions{ + RevIndex: false, + Revs: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjects: %v", err) + } + + return openStore(t, filepath.Dir(prefix), objectFormat) +} + +// emptyStore opens a packed store over an empty directory. +func emptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + return openStore(t, t.TempDir(), objectFormat) +} + +// openStore opens a packed store over dir. +func openStore(t *testing.T, dir string, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(dir) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} + +// freshRoot opens a writable root over a fresh temporary directory. +func freshRoot(t *testing.T) *os.Root { + t.Helper() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + return root +} diff --git a/object/store/packed/quarantine.go b/object/store/packed/quarantine.go index fac64402..5e0b85cb 100644 --- a/object/store/packed/quarantine.go +++ b/object/store/packed/quarantine.go @@ -1,57 +1,164 @@ package packed -// import ( -// "os" -// -// "lindenii.org/go/furgit/object/store" -// ) -// -// var ( -// _ store.PackQuarantiner = (*Packed)(nil) -// _ store.PackQuarantine = (*packQuarantine)(nil) -// ) -// -// // packQuarantine is one quarantined packed store -// // rooted privately beneath a destination pack root. -// type packQuarantine struct { -// *Packed -// -// parent *Packed -// tempName string -// tempRoot *os.Root -// } -// -// // BeginPackQuarantine creates one quarantined packed store -// // rooted privately beneath the destination pack root. -// // -// // Labels: Deps-Borrowed, Life-Parent, Close-No. -// func (packed *Packed) BeginPackQuarantine(opts store.PackQuarantineOptions) (store.PackQuarantine, error) -// -// // Discard removes the quarantine -// // and invalidates the receiver. -// func (quarantine *packQuarantine) Discard() error -// -// // Promote publishes all finalized pack artifacts in the quarantine -// // into the parent packed store, -// // and invalidates the receiver. -// func (quarantine *packQuarantine) Promote() error -// -// // promoteAll links every pack artifact in the quarantine -// // into the parent packed store, -// // in pack/rev/idx dependency order. -// func (quarantine *packQuarantine) promoteAll() error -// -// // promoteFile links one quarantined pack artifact -// // into the parent packed store, -// // treating an already-present destination as success. -// func (quarantine *packQuarantine) promoteFile(name string) error -// -// // createPackQuarantineRoot creates a private quarantine directory -// // beneath parent, -// // and returns its name and an os.Root over it. -// func createPackQuarantineRoot(parent *os.Root) (string, *os.Root, error) +import ( + "crypto/rand" + "errors" + "fmt" + "io/fs" + "os" + "slices" + "strings" + + "lindenii.org/go/furgit/object/store" +) + +var ( + _ store.PackQuarantiner = (*Packed)(nil) + _ store.PackQuarantine = (*packQuarantine)(nil) +) + +var errQuarantineNamesExhausted = errors.New("object/store/packed: exhausted quarantine directory names") + +// packQuarantine is one quarantined packed store +// rooted privately beneath a destination pack root. +type packQuarantine struct { + *Packed + + parent *Packed + + tempName string + tempRoot *os.Root +} + +// BeginPackQuarantine creates one quarantined packed store +// rooted privately beneath the destination pack root. // -// // packPromotionPriority orders pack artifacts -// // so that data files are linked -// // before the index that publishes them. -// func packPromotionPriority(name string) int +// Labels: Deps-Borrowed, Life-Parent. +func (packed *Packed) BeginPackQuarantine(_ store.PackQuarantineOptions) (store.PackQuarantine, error) { //nolint:ireturn + tempName, tempRoot, err := createPackQuarantineRoot(packed.root) + if err != nil { + return nil, err + } + + quarantineStore, err := New(tempRoot, packed.objectFormat) + if err != nil { + _ = tempRoot.Close() + _ = packed.root.RemoveAll(tempName) + + return nil, err + } + + return &packQuarantine{ + Packed: quarantineStore, + parent: packed, + tempName: tempName, + tempRoot: tempRoot, + }, nil +} + +// Promote publishes the quarantined pack artifacts into the parent store, +// refreshes the parent so the objects become available, +// and invalidates the receiver. +func (quarantine *packQuarantine) Promote() error { + closeErr := quarantine.Close() + promoteErr := quarantine.promoteAll() + + var refreshErr error + if promoteErr == nil { + refreshErr = quarantine.parent.Refresh() + } + + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + return errors.Join(closeErr, promoteErr, refreshErr, tempRootErr, removeErr) +} + +// Discard removes the quarantine and invalidates the receiver. +func (quarantine *packQuarantine) Discard() error { + closeErr := quarantine.Close() + tempRootErr := quarantine.tempRoot.Close() + removeErr := quarantine.parent.root.RemoveAll(quarantine.tempName) + + return errors.Join(closeErr, tempRootErr, removeErr) +} + +// promoteAll links every pack artifact in the quarantine into the parent store, +// in pack/rev/idx dependency order. +func (quarantine *packQuarantine) promoteAll() error { + entries, err := fs.ReadDir(quarantine.tempRoot.FS(), ".") + if err != nil { + return fmt.Errorf("object/store/packed: %w", err) + } + + slices.SortFunc(entries, func(left, right fs.DirEntry) int { + return packPromotionPriority(left.Name()) - packPromotionPriority(right.Name()) + }) + + for _, entry := range entries { + err := quarantine.promoteFile(entry.Name()) + if err != nil { + return err + } + } + + return nil +} + +// promoteFile links one quarantined artifact into the parent store, +// treating an already-present destination as success. +func (quarantine *packQuarantine) promoteFile(name string) error { + src := quarantine.tempName + "/" + name + + err := quarantine.parent.root.Link(src, name) + if err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("object/store/packed: promoting %q: %w", name, err) + } + + _ = quarantine.parent.root.Remove(src) + + return nil +} + +// createPackQuarantineRoot creates a private quarantine directory beneath parent +// and returns its name and an os.Root over it. +func createPackQuarantineRoot(parent *os.Root) (string, *os.Root, error) { + for range 32 { + name := "tmp_packq_" + rand.Text() + + err := parent.Mkdir(name, 0o700) + if err != nil { + if errors.Is(err, fs.ErrExist) { + continue + } + + return "", nil, fmt.Errorf("object/store/packed: %w", err) + } + + root, err := parent.OpenRoot(name) + if err != nil { + _ = parent.RemoveAll(name) + + return "", nil, fmt.Errorf("object/store/packed: %w", err) + } + + return name, root, nil + } + + return "", nil, errQuarantineNamesExhausted +} + +// packPromotionPriority orders pack artifacts +// so that data files are linked before the index that publishes them. +func packPromotionPriority(name string) int { + switch { + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".pack"): + return 1 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".rev"): + return 2 + case strings.HasPrefix(name, "pack-") && strings.HasSuffix(name, ".idx"): + return 3 + default: + return 0 + } +} diff --git a/object/store/packed/writer.go b/object/store/packed/writer.go index a01bc7dd..59309c24 100644 --- a/object/store/packed/writer.go +++ b/object/store/packed/writer.go @@ -1,10 +1,38 @@ package packed -// import ( -// "io" +import ( + "fmt" + "io" + + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed/internal/ingest" +) + +var _ store.PackWriter = (*Packed)(nil) + +// WritePack ingests one pack stream into the pack store, +// publishing a pack, index, and reverse index +// under content-addressed names derived from the pack trailer hash. // -// "lindenii.org/go/furgit/object/store" -// ) +// WritePack consumes the pack stream through its trailer and stops there. +// It does not require src to reach EOF afterward, +// so it is safe on a still-open transport connection, +// such as receive-pack, +// whose peer keeps the connection open to read the response. // -// // WritePack ingests one pack stream into the packed store. -// func (packed *Packed) WritePack(src io.Reader, opts store.PackWriteOptions) error +// The pack must be the last thing the peer sends before that response: +// any bytes arriving immediately after the trailer +// are rejected as a malformed pack. +func (packed *Packed) WritePack(src io.Reader, opts store.PackWriteOptions) error { + _, err := ingest.WritePack(packed.root, packed.objectFormat, src, opts) + if err != nil { + return err //nolint:wrapcheck + } + + err = packed.Refresh() + if err != nil { + return fmt.Errorf("object/store/packed: refresh after pack write: %w", err) + } + + return nil +} diff --git a/object/store/packed/writer_test.go b/object/store/packed/writer_test.go new file mode 100644 index 00000000..8227caa7 --- /dev/null +++ b/object/store/packed/writer_test.go @@ -0,0 +1,105 @@ +package packed_test + +import ( + "bytes" + "os" + "testing" + + "lindenii.org/go/furgit/internal/testgit" + "lindenii.org/go/furgit/object/id" + "lindenii.org/go/furgit/object/store" + "lindenii.org/go/furgit/object/store/packed" + "lindenii.org/go/furgit/object/typ" +) + +// TestWritePack verifies that writing a pack through the store +// makes its objects readable without a manual refresh. +func TestWritePack(t *testing.T) { + t.Parallel() + + for _, objectFormat := range id.SupportedObjectFormats() { + t.Run(objectFormat.String(), func(t *testing.T) { + t.Parallel() + + repo, err := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: objectFormat}) + if err != nil { + t.Fatalf("NewRepo: %v", err) + } + + seeded, err := repo.SeedHistory(t) + if err != nil { + t.Fatalf("SeedHistory: %v", err) + } + + stream, err := repo.PackObjectsStdout(t, seeded.All(), testgit.PackObjectsStdoutOptions{ + Revs: false, + Thin: false, + Exclude: nil, + }) + if err != nil { + t.Fatalf("PackObjectsStdout: %v", err) + } + + packedStore := openEmptyStore(t, objectFormat) + + err = packedStore.WritePack(bytes.NewReader(stream), store.PackWriteOptions{ + ThinBase: nil, + Progress: nil, + }) + if err != nil { + t.Fatalf("WritePack: %v", err) + } + + probes := []struct { + ty typ.Type + oid id.ObjectID + }{ + {typ.Blob, seeded.Blobs[0]}, + {typ.Tree, seeded.Trees[0]}, + {typ.Commit, seeded.Commits[len(seeded.Commits)-1]}, + {typ.Tag, seeded.Tags[0]}, + } + + for _, probe := range probes { + want, err := repo.CatFile(t, probe.ty, probe.oid) + if err != nil { + t.Fatalf("CatFile(%s): %v", probe.oid, err) + } + + ty, content, err := packedStore.ReadBytesContent(probe.oid) + if err != nil { + t.Fatalf("ReadBytesContent(%s): %v", probe.oid, err) + } + + if ty != probe.ty { + t.Fatalf("ReadBytesContent(%s) type = %v, want %v", probe.oid, ty, probe.ty) + } + + if !bytes.Equal(content, want) { + t.Fatalf("ReadBytesContent(%s) content mismatch", probe.oid) + } + } + }) + } +} + +// openEmptyStore opens a packed store over a fresh empty directory. +func openEmptyStore(t *testing.T, objectFormat id.ObjectFormat) *packed.Packed { + t.Helper() + + root, err := os.OpenRoot(t.TempDir()) + if err != nil { + t.Fatalf("OpenRoot: %v", err) + } + + t.Cleanup(func() { _ = root.Close() }) + + packedStore, err := packed.New(root, objectFormat) + if err != nil { + t.Fatalf("New: %v", err) + } + + t.Cleanup(func() { _ = packedStore.Close() }) + + return packedStore +} |
