diff options
Diffstat (limited to 'format/pack')
| -rw-r--r-- | format/pack/ingest/api.go | 52 | ||||
| -rw-r--r-- | format/pack/ingest/cache.go | 53 | ||||
| -rw-r--r-- | format/pack/ingest/doc.go | 3 | ||||
| -rw-r--r-- | format/pack/ingest/errors.go | 63 | ||||
| -rw-r--r-- | format/pack/ingest/finalize.go | 77 | ||||
| -rw-r--r-- | format/pack/ingest/idx_write.go | 138 | ||||
| -rw-r--r-- | format/pack/ingest/ingest.go | 49 | ||||
| -rw-r--r-- | format/pack/ingest/ingest_test.go | 337 | ||||
| -rw-r--r-- | format/pack/ingest/records.go | 46 | ||||
| -rw-r--r-- | format/pack/ingest/resolve.go | 279 | ||||
| -rw-r--r-- | format/pack/ingest/rev_write.go | 97 | ||||
| -rw-r--r-- | format/pack/ingest/state.go | 71 | ||||
| -rw-r--r-- | format/pack/ingest/stream.go | 97 | ||||
| -rw-r--r-- | format/pack/ingest/stream_scan.go | 335 | ||||
| -rw-r--r-- | format/pack/ingest/temp.go | 80 | ||||
| -rw-r--r-- | format/pack/ingest/thin_fix.go | 211 | ||||
| -rw-r--r-- | format/pack/ingest/trailer.go | 65 |
17 files changed, 2053 insertions, 0 deletions
diff --git a/format/pack/ingest/api.go b/format/pack/ingest/api.go new file mode 100644 index 00000000..9e222c1d --- /dev/null +++ b/format/pack/ingest/api.go @@ -0,0 +1,52 @@ +package ingest + +import ( + "io" + "os" + + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objectstore" +) + +// Result describes one successful ingest transaction. +type Result struct { + // PackName is the destination-relative filename of the written .pack. + PackName string + // IdxName is the destination-relative filename of the written .idx. + IdxName string + // RevName is the destination-relative filename of the written .rev. + // + // RevName is empty when writeRev is false. + RevName string + // PackHash is the final pack hash (same hash embedded in .idx/.rev trailers). + PackHash objectid.ObjectID + // ObjectCount is the final object count in the resulting pack. + // + // If thin fixup appends objects, this includes appended base objects. + ObjectCount uint32 + // ThinFixed reports whether thin fixup appended local bases. + ThinFixed bool +} + +// Ingest ingests one pack stream from src into destination. +// +// Ingest performs streaming pack read/write/verification, delta resolution, +// optional thin fixup, then writes .idx and optionally .rev. +// +// destination ownership and lifecycle are managed by the caller. +// Ingest does not perform quarantine promotion/migration. +func Ingest( + src io.Reader, + destination *os.Root, + algo objectid.Algorithm, + fixThin bool, + writeRev bool, + base objectstore.Store, +) (Result, error) { + state, err := newIngestState(src, destination, algo, fixThin, writeRev, base) + if err != nil { + return Result{}, err + } + + return ingest(state) +} diff --git a/format/pack/ingest/cache.go b/format/pack/ingest/cache.go new file mode 100644 index 00000000..9c0b7a4f --- /dev/null +++ b/format/pack/ingest/cache.go @@ -0,0 +1,53 @@ +package ingest + +import ( + "codeberg.org/lindenii/furgit/internal/lru" + "codeberg.org/lindenii/furgit/objecttype" +) + +// deltaBaseCacheKey identifies one resolved base by record index. +type deltaBaseCacheKey struct { + recordIdx int +} + +// deltaBaseCacheValue stores one resolved base object payload. +type deltaBaseCacheValue struct { + realType objecttype.Type + content []byte +} + +// deltaBaseCache is a bounded LRU for resolved base payloads. +type deltaBaseCache struct { + lru *lru.Cache[deltaBaseCacheKey, deltaBaseCacheValue] +} + +// newDeltaBaseCache creates one bounded base cache. +func newDeltaBaseCache(maxBytes int64) *deltaBaseCache { + return &deltaBaseCache{ + lru: lru.New( + maxBytes, + func(_ deltaBaseCacheKey, value deltaBaseCacheValue) int64 { + return int64(len(value.content)) + }, + nil, + ), + } +} + +// get returns a cloned cache entry for recordIdx. +func (cache *deltaBaseCache) get(recordIdx int) (objecttype.Type, []byte, bool) { + value, ok := cache.lru.Get(deltaBaseCacheKey{recordIdx: recordIdx}) + if !ok { + return objecttype.TypeInvalid, nil, false + } + + return value.realType, append([]byte(nil), value.content...), true +} + +// add stores a cloned cache entry for recordIdx. +func (cache *deltaBaseCache) add(recordIdx int, realType objecttype.Type, content []byte) { + cache.lru.Add(deltaBaseCacheKey{recordIdx: recordIdx}, deltaBaseCacheValue{ + realType: realType, + content: append([]byte(nil), content...), + }) +} diff --git a/format/pack/ingest/doc.go b/format/pack/ingest/doc.go new file mode 100644 index 00000000..2095068a --- /dev/null +++ b/format/pack/ingest/doc.go @@ -0,0 +1,3 @@ +// Package ingest implements streaming ingestion of one Git pack stream into a +// destination root, producing .pack/.idx and optionally .rev. +package ingest diff --git a/format/pack/ingest/errors.go b/format/pack/ingest/errors.go new file mode 100644 index 00000000..1fc321e7 --- /dev/null +++ b/format/pack/ingest/errors.go @@ -0,0 +1,63 @@ +package ingest + +import "fmt" + +// ErrInvalidPackHeader reports an invalid or unsupported pack header. +type ErrInvalidPackHeader struct { + Reason string +} + +// Error implements error. +func (err *ErrInvalidPackHeader) Error() string { + return fmt.Sprintf("format/pack/ingest: invalid pack header: %s", err.Reason) +} + +// ErrPackTrailerMismatch reports a mismatch between computed and trailer pack hash. +type ErrPackTrailerMismatch struct{} + +// Error implements error. +func (err *ErrPackTrailerMismatch) Error() string { + return "format/pack/ingest: pack trailer hash mismatch" +} + +// ErrThinPackUnresolved reports unresolved REF deltas when fixThin is disabled +// or when required bases cannot be found in base. +type ErrThinPackUnresolved struct { + Count int +} + +// Error implements error. +func (err *ErrThinPackUnresolved) Error() string { + return fmt.Sprintf("format/pack/ingest: unresolved thin deltas: %d", err.Count) +} + +// ErrMalformedPackEntry reports malformed entry encoding at one pack offset. +type ErrMalformedPackEntry struct { + Offset uint64 + Reason string +} + +// Error implements error. +func (err *ErrMalformedPackEntry) Error() string { + return fmt.Sprintf("format/pack/ingest: malformed pack entry at offset %d: %s", err.Offset, err.Reason) +} + +// ErrDeltaCycle reports a detected cycle in delta dependency resolution. +type ErrDeltaCycle struct { + Offset uint64 +} + +// Error implements error. +func (err *ErrDeltaCycle) Error() string { + return fmt.Sprintf("format/pack/ingest: delta cycle detected at offset %d", err.Offset) +} + +// ErrDestinationWrite reports destination I/O failures. +type ErrDestinationWrite struct { + Op string +} + +// Error implements error. +func (err *ErrDestinationWrite) Error() string { + return fmt.Sprintf("format/pack/ingest: destination write failure: %s", err.Op) +} diff --git a/format/pack/ingest/finalize.go b/format/pack/ingest/finalize.go new file mode 100644 index 00000000..06b30102 --- /dev/null +++ b/format/pack/ingest/finalize.go @@ -0,0 +1,77 @@ +package ingest + +import ( + "errors" + "fmt" + "io/fs" + "strings" +) + +// finalizeArtifacts links temporary files to final names and returns Result. +func finalizeArtifacts(state *ingestState) (Result, error) { + base := "pack-" + state.packHash.String() + packFinal := base + ".pack" + idxFinal := base + ".idx" + revFinal := "" + if state.writeRev { + revFinal = base + ".rev" + } + + if err := linkTempToFinal(state, state.packTmpName, packFinal); err != nil { + return Result{}, err + } + if err := linkTempToFinal(state, state.idxTmpName, idxFinal); err != nil { + return Result{}, err + } + if state.writeRev { + if err := linkTempToFinal(state, state.revTmpName, revFinal); err != nil { + return Result{}, err + } + } + + return Result{ + PackName: packFinal, + IdxName: idxFinal, + RevName: revFinal, + PackHash: state.packHash, + ObjectCount: uint32(len(state.records)), + ThinFixed: state.thinFixed, + }, nil +} + +// rollbackTemporaryArtifacts removes temporary files after failure. +func rollbackTemporaryArtifacts(state *ingestState) { + if state.packTmpName != "" { + _ = state.destination.Remove(state.packTmpName) + } + if state.idxTmpName != "" { + _ = state.destination.Remove(state.idxTmpName) + } + if state.revTmpName != "" { + _ = state.destination.Remove(state.revTmpName) + } +} + +// linkTempToFinal hard-links tmp to final, tolerating existing final paths. +func linkTempToFinal(state *ingestState, tmp, final string) error { + if tmp == "" || final == "" { + return fmt.Errorf("format/pack/ingest: invalid finalize names tmp=%q final=%q", tmp, final) + } + if strings.Contains(final, "/") { + return fmt.Errorf("format/pack/ingest: final name must be leaf: %q", final) + } + + err := state.destination.Link(tmp, final) + if err == nil { + _ = state.destination.Remove(tmp) + + return nil + } + if errors.Is(err, fs.ErrExist) { + _ = state.destination.Remove(tmp) + + return nil + } + + return err +} diff --git a/format/pack/ingest/idx_write.go b/format/pack/ingest/idx_write.go new file mode 100644 index 00000000..1e5f20c4 --- /dev/null +++ b/format/pack/ingest/idx_write.go @@ -0,0 +1,138 @@ +package ingest + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash" + "io" + "slices" +) + +const ( + idxMagicV2 = 0xff744f63 + idxVersionV2 = 2 +) + +// writeIdx writes idx v2 for resolved records. +func writeIdx(state *ingestState) error { + order := buildIdxOrder(state) + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + write := func(src []byte) error { + if _, err := state.idxFile.Write(src); err != nil { + return err + } + if _, err := hashImpl.Write(src); err != nil { + return err + } + + return nil + } + + var scratch [8]byte + binary.BigEndian.PutUint32(scratch[:4], idxMagicV2) + binary.BigEndian.PutUint32(scratch[4:8], idxVersionV2) + if err := write(scratch[:8]); err != nil { + return err + } + + var fanout [256]uint32 + for _, recordIdx := range order { + idRaw := state.records[recordIdx].objectID.Bytes() + fanout[idRaw[0]]++ + } + var cumulative uint32 + for i := range fanout { + cumulative += fanout[i] + binary.BigEndian.PutUint32(scratch[:4], cumulative) + if err := write(scratch[:4]); err != nil { + return err + } + } + + for _, recordIdx := range order { + idRaw := state.records[recordIdx].objectID.Bytes() + if err := write(idRaw); err != nil { + return err + } + } + + for _, recordIdx := range order { + binary.BigEndian.PutUint32(scratch[:4], state.records[recordIdx].crc32) + if err := write(scratch[:4]); err != nil { + return err + } + } + + largeOffsets := make([]uint64, 0) + for _, recordIdx := range order { + offset := state.records[recordIdx].offset + if offset >= 0x80000000 { + word := 0x80000000 | uint32(len(largeOffsets)) + largeOffsets = append(largeOffsets, offset) + binary.BigEndian.PutUint32(scratch[:4], word) + } else { + binary.BigEndian.PutUint32(scratch[:4], uint32(offset)) + } + if err := write(scratch[:4]); err != nil { + return err + } + } + for _, off := range largeOffsets { + binary.BigEndian.PutUint64(scratch[:8], off) + if err := write(scratch[:8]); err != nil { + return err + } + } + + if err := write(state.packHash.Bytes()); err != nil { + return err + } + + idxHash := hashImpl.Sum(nil) + if _, err := state.idxFile.Write(idxHash); err != nil { + return err + } + + return state.idxFile.Sync() +} + +// buildIdxOrder returns record indexes sorted by ObjectID. +func buildIdxOrder(state *ingestState) []int { + out := make([]int, 0, len(state.records)) + for idx := range state.records { + out = append(out, idx) + } + slices.SortFunc(out, func(a, b int) int { + return bytes.Compare(state.records[a].objectID.Bytes(), state.records[b].objectID.Bytes()) + }) + + return out +} + +// verifyResolvedRecords checks that all records are fully resolved before index writing. +func verifyResolvedRecords(state *ingestState) error { + for idx, record := range state.records { + if !record.resolved { + return fmt.Errorf("format/pack/ingest: unresolved record %d at offset %d", idx, record.offset) + } + } + + return nil +} + +// writeAndHash writes src to dst and updates hash. +func writeAndHash(dst io.Writer, hashImpl hash.Hash, src []byte) error { + if _, err := dst.Write(src); err != nil { + return err + } + if _, err := hashImpl.Write(src); err != nil { + return err + } + + return nil +} diff --git a/format/pack/ingest/ingest.go b/format/pack/ingest/ingest.go new file mode 100644 index 00000000..5d6bbfce --- /dev/null +++ b/format/pack/ingest/ingest.go @@ -0,0 +1,49 @@ +package ingest + +import "fmt" + +// ingest initializes transaction state and executes the ingest pipeline. +func ingest(state *ingestState) (out Result, err error) { + if err := openTemporaryArtifacts(state); err != nil { + return Result{}, err + } + defer func() { + _ = closeTemporaryArtifacts(state) + if err != nil { + rollbackTemporaryArtifacts(state) + } + }() + + if err := streamPackAndScan(state); err != nil { + return Result{}, err + } + if err := resolveAll(state); err != nil { + return Result{}, err + } + if err := maybeFixThin(state); err != nil { + return Result{}, err + } + if state.thinFixed { + if err := resolveAll(state); err != nil { + return Result{}, err + } + } + if len(state.unresolvedRefDeltas) > 0 { + return Result{}, &ErrThinPackUnresolved{Count: len(state.unresolvedRefDeltas)} + } + if err := verifyResolvedRecords(state); err != nil { + return Result{}, err + } + + if err := state.packFile.Sync(); err != nil { + return Result{}, &ErrDestinationWrite{Op: fmt.Sprintf("sync pack: %v", err)} + } + if err := writeIdx(state); err != nil { + return Result{}, err + } + if err := writeRev(state); err != nil { + return Result{}, err + } + + return finalizeArtifacts(state) +} diff --git a/format/pack/ingest/ingest_test.go b/format/pack/ingest/ingest_test.go new file mode 100644 index 00000000..d1b68cba --- /dev/null +++ b/format/pack/ingest/ingest_test.go @@ -0,0 +1,337 @@ +package ingest_test + +import ( + "bytes" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "testing" + + "codeberg.org/lindenii/furgit/format/pack/ingest" + "codeberg.org/lindenii/furgit/internal/testgit" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/repository" +) + +// thinBaseDistances enumerates candidate main-history distances for thin-pack +// probing. +var thinBaseDistances = [...]int{16, 24, 32, 48, 64, 96, 128, 160, 192, 224, 256, 320} + +// pickThinBase selects one main-history base where git emits a truly thin pack +// for `head ^base`. +func pickThinBase(t *testing.T, sender *testgit.TestRepo, head objectid.ObjectID) objectid.ObjectID { + t.Helper() + + for _, distance := range thinBaseDistances { + base := sender.RevParse(t, fmt.Sprintf("refs/heads/main~%d", distance)) + revs := []string{head.String(), "^" + base.String()} + if sender.PackObjectsIsThin(t, revs) { + return base + } + } + + t.Fatalf("failed to find thin base for head %s", head.String()) + + return objectid.ObjectID{} +} + +// verifyReindexOracle regenerates idx/rev with upstream git index-pack and +// compares bytes with files produced by ingest. +func verifyReindexOracle(t *testing.T, repo *testgit.TestRepo, packPath, idxPath, revPath string) { + t.Helper() + + oracleDir := t.TempDir() + oracleIdxPath := filepath.Join(oracleDir, "oracle.idx") + _ = repo.Run(t, "index-pack", "--rev-index", "-o", oracleIdxPath, packPath) + oracleRevPath := strings.TrimSuffix(oracleIdxPath, ".idx") + ".rev" + + gotIdx, err := os.ReadFile(idxPath) + if err != nil { + t.Fatalf("read idx: %v", err) + } + wantIdx, err := os.ReadFile(oracleIdxPath) + if err != nil { + t.Fatalf("read oracle idx: %v", err) + } + if !bytes.Equal(gotIdx, wantIdx) { + t.Fatal("idx bytes differ from git index-pack output") + } + + gotRev, err := os.ReadFile(revPath) + if err != nil { + t.Fatalf("read rev: %v", err) + } + wantRev, err := os.ReadFile(oracleRevPath) + if err != nil { + t.Fatalf("read oracle rev: %v", err) + } + if !bytes.Equal(gotRev, wantRev) { + t.Fatal("rev bytes differ from git index-pack output") + } +} + +func TestIngestNonThinPackWritesPackIdxRev(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + sender := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + sender.MakeManyObjectsHistory(t) + head := sender.RevParse(t, "refs/heads/main") + + reader := sender.PackObjectsReader(t, []string{head.String()}, false) + defer func() { + err := reader.Close() + if err != nil { + t.Fatalf("close pack reader: %v", err) + } + }() + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packRoot, err := os.OpenRoot(filepath.Join(receiver.Dir(), "objects", "pack")) + if err != nil { + t.Fatalf("open pack root: %v", err) + } + defer func() { + err = packRoot.Close() + if err != nil { + t.Fatalf("close pack root: %v", err) + } + }() + + result, err := ingest.Ingest(reader, packRoot, algo, false, true, nil) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + if result.ThinFixed { + t.Fatalf("ThinFixed = true, want false") + } + if result.RevName == "" { + t.Fatal("RevName is empty") + } + + _, err = packRoot.Stat(result.PackName) + if err != nil { + t.Fatalf("stat pack: %v", err) + } + _, err = packRoot.Stat(result.IdxName) + if err != nil { + t.Fatalf("stat idx: %v", err) + } + _, err = packRoot.Stat(result.RevName) + if err != nil { + t.Fatalf("stat rev: %v", err) + } + + idxPath := filepath.Join(receiver.Dir(), "objects", "pack", result.IdxName) + packPath := filepath.Join(receiver.Dir(), "objects", "pack", result.PackName) + revPath := filepath.Join(receiver.Dir(), "objects", "pack", result.RevName) + _ = receiver.Run(t, "verify-pack", "-v", idxPath) + verifyReindexOracle(t, receiver, packPath, idxPath, revPath) + + receiver.UpdateRef(t, "refs/heads/main", head) + wantRaw := sender.Run(t, "rev-list", "--objects", "refs/heads/main") + for line := range strings.SplitSeq(strings.TrimSpace(wantRaw), "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + fields := strings.Fields(line) + if len(fields) == 0 { + continue + } + _ = receiver.Run(t, "cat-file", "-e", fields[0]) + } + }) +} + +func TestIngestThinPackWithoutFixReturnsUnresolved(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + sender := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + sender.MakeManyObjectsHistory(t) + sender.Repack(t, "-a", "-d", "-f", "--window=128", "--depth=128") + + head := sender.RevParse(t, "refs/heads/main") + base := pickThinBase(t, sender, head) + reader := sender.PackObjectsReader(t, []string{head.String(), "^" + base.String()}, true) + defer func() { + err := reader.Close() + if err != nil { + t.Fatalf("close pack reader: %v", err) + } + }() + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packDir := filepath.Join(receiver.Dir(), "objects", "pack") + packRoot, err := os.OpenRoot(packDir) + if err != nil { + t.Fatalf("open pack root: %v", err) + } + defer func() { + err = packRoot.Close() + if err != nil { + t.Fatalf("close pack root: %v", err) + } + }() + + _, err = ingest.Ingest(reader, packRoot, algo, false, true, nil) + if err == nil { + t.Fatal("Ingest error = nil, want error") + } + + var unresolved *ingest.ErrThinPackUnresolved + if !errors.As(err, &unresolved) { + t.Fatalf("Ingest error type = %T (%v), want *ErrThinPackUnresolved", err, err) + } + + matches, err := filepath.Glob(filepath.Join(packDir, "pack-*.pack")) + if err != nil { + t.Fatalf("glob pack files: %v", err) + } + if len(matches) != 0 { + t.Fatalf("found finalized pack files after failure: %v", matches) + } + }) +} + +func TestIngestThinPackWithFixThin(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + sender := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + sender.MakeManyObjectsHistory(t) + sender.Repack(t, "-a", "-d", "-f", "--window=128", "--depth=128") + + head := sender.RevParse(t, "refs/heads/main") + base := pickThinBase(t, sender, head) + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + + packRoot, err := os.OpenRoot(filepath.Join(receiver.Dir(), "objects", "pack")) + if err != nil { + t.Fatalf("open pack root: %v", err) + } + defer func() { + err = packRoot.Close() + if err != nil { + t.Fatalf("close pack root: %v", err) + } + }() + + baseReader := sender.PackObjectsReader(t, []string{base.String()}, false) + _, err = ingest.Ingest(baseReader, packRoot, algo, false, false, nil) + if err != nil { + _ = baseReader.Close() + t.Fatalf("ingest base pack: %v", err) + } + err = baseReader.Close() + if err != nil { + t.Fatalf("close base reader: %v", err) + } + + receiverRoot, err := os.OpenRoot(receiver.Dir()) + if err != nil { + t.Fatalf("open receiver root: %v", err) + } + defer func() { + err = receiverRoot.Close() + if err != nil { + t.Fatalf("close receiver root: %v", err) + } + }() + + receiverRepo, err := repository.Open(receiverRoot) + if err != nil { + t.Fatalf("repository.Open(receiver): %v", err) + } + defer func() { + err = receiverRepo.Close() + if err != nil { + t.Fatalf("close receiver repo: %v", err) + } + }() + + thinReader := sender.PackObjectsReader(t, []string{head.String(), "^" + base.String()}, true) + result, err := ingest.Ingest(thinReader, packRoot, algo, true, true, receiverRepo.Objects()) + if err != nil { + _ = thinReader.Close() + t.Fatalf("Ingest(thin): %v", err) + } + err = thinReader.Close() + if err != nil { + t.Fatalf("close thin reader: %v", err) + } + if !result.ThinFixed { + t.Fatal("ThinFixed = false, want true") + } + + idxPath := filepath.Join(receiver.Dir(), "objects", "pack", result.IdxName) + packPath := filepath.Join(receiver.Dir(), "objects", "pack", result.PackName) + revPath := filepath.Join(receiver.Dir(), "objects", "pack", result.RevName) + _ = receiver.Run(t, "verify-pack", "-v", idxPath) + verifyReindexOracle(t, receiver, packPath, idxPath, revPath) + receiver.UpdateRef(t, "refs/heads/main", head) + _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling") + }) +} + +func TestIngestPackTrailerMismatch(t *testing.T) { + t.Parallel() + + testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper + sender := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + sender.MakeManyObjectsHistory(t) + head := sender.RevParse(t, "refs/heads/main") + + stream := sender.PackObjectsReader(t, []string{head.String()}, false) + packBytes, err := io.ReadAll(stream) + if err != nil { + _ = stream.Close() + t.Fatalf("read pack stream: %v", err) + } + err = stream.Close() + if err != nil { + t.Fatalf("close stream: %v", err) + } + if len(packBytes) == 0 { + t.Fatal("empty pack stream") + } + + packBytes[len(packBytes)-1] ^= 0xff + + receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true}) + packDir := filepath.Join(receiver.Dir(), "objects", "pack") + packRoot, err := os.OpenRoot(packDir) + if err != nil { + t.Fatalf("open pack root: %v", err) + } + defer func() { + err = packRoot.Close() + if err != nil { + t.Fatalf("close pack root: %v", err) + } + }() + + _, err = ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, false, true, nil) + if err == nil { + t.Fatal("Ingest error = nil, want error") + } + + var mismatch *ingest.ErrPackTrailerMismatch + if !errors.As(err, &mismatch) { + t.Fatalf("Ingest error type = %T (%v), want *ErrPackTrailerMismatch", err, err) + } + + matches, err := filepath.Glob(filepath.Join(packDir, "pack-*.pack")) + if err != nil { + t.Fatalf("glob pack files: %v", err) + } + if len(matches) != 0 { + t.Fatalf("found finalized pack files after failure: %v", matches) + } + }) +} diff --git a/format/pack/ingest/records.go b/format/pack/ingest/records.go new file mode 100644 index 00000000..06b7f708 --- /dev/null +++ b/format/pack/ingest/records.go @@ -0,0 +1,46 @@ +package ingest + +import ( + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// objectRecord stores metadata for one packed object entry. +type objectRecord struct { + // offset is the entry start offset in the pack file. + offset uint64 + // headerLen is packed entry header length in bytes. + headerLen uint32 + // packedLen is total packed entry length in bytes. + packedLen uint64 + // crc32 is the CRC over the full packed entry. + crc32 uint32 + // packedType is the entry type tag from the pack stream. + packedType objecttype.Type + // realType is canonical object type after delta resolution. + realType objecttype.Type + // declaredSize is the declared output object size for this entry. + declaredSize int64 + // dataOffset is compressed payload start offset for this entry. + dataOffset uint64 + // baseOffset is OFS base offset when packedType is OFS delta. + baseOffset uint64 + // baseObject is REF base object ID when packedType is REF delta. + baseObject objectid.ObjectID + // objectID is final resolved object ID. + objectID objectid.ObjectID + // resolved reports whether objectID/realType are finalized. + resolved bool +} + +// ofsDeltaRef maps one OFS delta record to its base offset. +type ofsDeltaRef struct { + baseOffset uint64 + recordIdx int +} + +// refDeltaRef maps one REF delta record to its base object ID. +type refDeltaRef struct { + baseObject objectid.ObjectID + recordIdx int +} diff --git a/format/pack/ingest/resolve.go b/format/pack/ingest/resolve.go new file mode 100644 index 00000000..c6336d18 --- /dev/null +++ b/format/pack/ingest/resolve.go @@ -0,0 +1,279 @@ +package ingest + +import ( + "errors" + "fmt" + "io" + "slices" + + deltaapply "codeberg.org/lindenii/furgit/format/delta/apply" + packfmt "codeberg.org/lindenii/furgit/format/pack" + "codeberg.org/lindenii/furgit/internal/compress/zlib" + "codeberg.org/lindenii/furgit/objectheader" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +var errExternalThinBase = errors.New("format/pack/ingest: external thin base required") + +// resolveAll resolves all delta records and finalizes ObjectID/RealType for every record. +func resolveAll(state *ingestState) error { + state.unresolvedRefDeltas = state.unresolvedRefDeltas[:0] + + for idx := range state.records { + if state.records[idx].resolved { + continue + } + + visiting := make(map[int]struct{}) + ty, content, err := resolveRecord(state, idx, visiting) + if err != nil { + if errors.Is(err, errExternalThinBase) { + state.unresolvedRefDeltas = append(state.unresolvedRefDeltas, idx) + continue + } + + return err + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return err + } + + record := &state.records[idx] + record.realType = ty + record.objectID = id + record.resolved = true + state.objectToRecord[id.String()] = idx + state.baseCache.add(idx, ty, content) + } + + return nil +} + +// resolveRecord resolves one record and returns canonical type/content. +func resolveRecord(state *ingestState, idx int, visiting map[int]struct{}) (objecttype.Type, []byte, error) { + if idx < 0 || idx >= len(state.records) { + return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record index out of bounds") + } + + if _, ok := visiting[idx]; ok { + return objecttype.TypeInvalid, nil, &ErrDeltaCycle{Offset: state.records[idx].offset} + } + visiting[idx] = struct{}{} + defer delete(visiting, idx) + + record := &state.records[idx] + if ty, content, ok := state.baseCache.get(idx); ok { + return ty, content, nil + } + + if packfmt.IsBaseObjectType(record.packedType) { + ty, content, err := readBaseRecordContent(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + if record.resolved { + state.baseCache.add(idx, record.realType, content) + + return record.realType, content, nil + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + record.objectID = id + record.realType = ty + record.resolved = true + state.objectToRecord[id.String()] = idx + state.baseCache.add(idx, ty, content) + + return ty, content, nil + } + + var ( + baseType objecttype.Type + baseContent []byte + err error + ) + switch record.packedType { + case objecttype.TypeOfsDelta: + baseIdx, ok := state.offsetToRecord[record.baseOffset] + if !ok { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: "missing ofs-delta base entry", + } + } + baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + case objecttype.TypeRefDelta: + baseIdx, ok := state.objectToRecord[record.baseObject.String()] + if ok { + baseType, baseContent, err = resolveRecord(state, baseIdx, visiting) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + } else { + return objecttype.TypeInvalid, nil, errExternalThinBase + } + default: + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: "unsupported delta type", + } + } + + ty, content, err := applyDeltaRecord(state, idx, baseType, baseContent) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + + id, err := hashCanonicalObject(state.algo, ty, content) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + record.objectID = id + record.realType = ty + record.resolved = true + state.objectToRecord[id.String()] = idx + state.baseCache.add(idx, ty, content) + + return ty, content, nil +} + +// readBaseRecordContent reads canonical base content for one non-delta record. +func readBaseRecordContent(state *ingestState, idx int) (objecttype.Type, []byte, error) { + record := state.records[idx] + if !packfmt.IsBaseObjectType(record.packedType) { + return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record %d is not a base object", idx) + } + + content, err := inflateRecordPayload(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + if int64(len(content)) != record.declaredSize { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("base content size mismatch got %d want %d", len(content), record.declaredSize), + } + } + + return record.packedType, content, nil +} + +// applyDeltaRecord applies one delta record onto base content. +func applyDeltaRecord(state *ingestState, idx int, baseType objecttype.Type, baseContent []byte) (objecttype.Type, []byte, error) { + record := state.records[idx] + if record.packedType != objecttype.TypeOfsDelta && record.packedType != objecttype.TypeRefDelta { + return objecttype.TypeInvalid, nil, fmt.Errorf("format/pack/ingest: record %d is not a delta record", idx) + } + + deltaPayload, err := inflateRecordPayload(state, idx) + if err != nil { + return objecttype.TypeInvalid, nil, err + } + if int64(len(deltaPayload)) != record.declaredSize { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("delta payload size mismatch got %d want %d", len(deltaPayload), record.declaredSize), + } + } + srcSize, dstSize, err := readDeltaHeaderSizes(deltaPayload) + if err != nil { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("read delta header: %v", err), + } + } + if srcSize != len(baseContent) { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("delta source size mismatch got %d want %d", srcSize, len(baseContent)), + } + } + + content, err := deltaapply.Apply(baseContent, deltaPayload) + if err != nil { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("apply delta: %v", err), + } + } + if len(content) != dstSize { + return objecttype.TypeInvalid, nil, &ErrMalformedPackEntry{ + Offset: record.offset, + Reason: fmt.Sprintf("delta result size mismatch got %d want %d", len(content), dstSize), + } + } + + return baseType, content, nil +} + +// inflateRecordPayload inflates one record's zlib payload from pack file. +func inflateRecordPayload(state *ingestState, idx int) ([]byte, error) { + record := state.records[idx] + if record.packedLen < uint64(record.headerLen) { + return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: "entry packed span underflow"} + } + compressedOffset := record.offset + uint64(record.headerLen) + compressedLen := record.packedLen - uint64(record.headerLen) + section := io.NewSectionReader(state.packFile, int64(compressedOffset), int64(compressedLen)) + + reader, err := zlib.NewReader(section) + if err != nil { + return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open payload zlib: %v", err)} + } + defer func() { _ = reader.Close() }() + + out, err := io.ReadAll(reader) + if err != nil { + return nil, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate payload: %v", err)} + } + + return out, nil +} + +// hashCanonicalObject hashes canonical object bytes (header+content). +func hashCanonicalObject(algo objectid.Algorithm, ty objecttype.Type, content []byte) (objectid.ObjectID, error) { + header, ok := objectheader.Encode(ty, int64(len(content))) + if !ok { + return objectid.ObjectID{}, fmt.Errorf("format/pack/ingest: encode object header for type %d", ty) + } + + hashImpl, err := algo.New() + if err != nil { + return objectid.ObjectID{}, err + } + _, _ = hashImpl.Write(header) + _, _ = hashImpl.Write(content) + + return objectid.FromBytes(algo, hashImpl.Sum(nil)) +} + +// unresolvedThinBaseIDs returns sorted unique unresolved ref base IDs. +func unresolvedThinBaseIDs(state *ingestState) []objectid.ObjectID { + seen := make(map[string]objectid.ObjectID) + for _, idx := range state.unresolvedRefDeltas { + record := state.records[idx] + if record.packedType != objecttype.TypeRefDelta { + continue + } + seen[record.baseObject.String()] = record.baseObject + } + + out := make([]objectid.ObjectID, 0, len(seen)) + for _, id := range seen { + out = append(out, id) + } + slices.SortFunc(out, func(a, b objectid.ObjectID) int { + return slices.Compare(a.Bytes(), b.Bytes()) + }) + + return out +} diff --git a/format/pack/ingest/rev_write.go b/format/pack/ingest/rev_write.go new file mode 100644 index 00000000..cf95c782 --- /dev/null +++ b/format/pack/ingest/rev_write.go @@ -0,0 +1,97 @@ +package ingest + +import ( + "encoding/binary" + "slices" + + "codeberg.org/lindenii/furgit/objectid" +) + +const ( + revMagic = 0x52494458 + revVersion = 1 +) + +// writeRev writes rev index for resolved records. +func writeRev(state *ingestState) error { + if !state.writeRev { + return nil + } + + idxOrder := buildIdxOrder(state) + recordToIdxPos := make([]int, len(state.records)) + for pos, recordIdx := range idxOrder { + recordToIdxPos[recordIdx] = pos + } + packOrder := buildPackOrder(state) + + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + var scratch [8]byte + binary.BigEndian.PutUint32(scratch[:4], revMagic) + if err := writeAndHash(state.revFile, hashImpl, scratch[:4]); err != nil { + return err + } + binary.BigEndian.PutUint32(scratch[:4], revVersion) + if err := writeAndHash(state.revFile, hashImpl, scratch[:4]); err != nil { + return err + } + binary.BigEndian.PutUint32(scratch[:4], hashID(state.algo)) + if err := writeAndHash(state.revFile, hashImpl, scratch[:4]); err != nil { + return err + } + + for _, recordIdx := range packOrder { + binary.BigEndian.PutUint32(scratch[:4], uint32(recordToIdxPos[recordIdx])) + if err := writeAndHash(state.revFile, hashImpl, scratch[:4]); err != nil { + return err + } + } + + if err := writeAndHash(state.revFile, hashImpl, state.packHash.Bytes()); err != nil { + return err + } + revHash := hashImpl.Sum(nil) + if _, err := state.revFile.Write(revHash); err != nil { + return err + } + + return state.revFile.Sync() +} + +// buildPackOrder returns record indexes sorted by pack offset. +func buildPackOrder(state *ingestState) []int { + out := make([]int, 0, len(state.records)) + for idx := range state.records { + out = append(out, idx) + } + slices.SortFunc(out, func(a, b int) int { + offA := state.records[a].offset + offB := state.records[b].offset + switch { + case offA < offB: + return -1 + case offA > offB: + return 1 + default: + return 0 + } + }) + + return out +} + +// hashID converts object algorithm to pack hash-id encoding. +func hashID(algo objectid.Algorithm) uint32 { + switch algo { + case objectid.AlgorithmSHA1: + return 1 + case objectid.AlgorithmSHA256: + return 2 + default: + return 0 + } +} diff --git a/format/pack/ingest/state.go b/format/pack/ingest/state.go new file mode 100644 index 00000000..7033a6f6 --- /dev/null +++ b/format/pack/ingest/state.go @@ -0,0 +1,71 @@ +package ingest + +import ( + "io" + "os" + + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objectstore" +) + +const ( + defaultDeltaBaseCacheMaxBytes = 32 << 20 +) + +// ingestState holds mutable state for one Ingest call. +type ingestState struct { + src io.Reader + destination *os.Root + algo objectid.Algorithm + fixThin bool + writeRev bool + base objectstore.Store + + packFile *os.File + packTmpName string + idxFile *os.File + idxTmpName string + revFile *os.File + revTmpName string + + stream *streamCopier + + records []objectRecord + ofsDeltas []ofsDeltaRef + refDeltas []refDeltaRef + unresolvedRefDeltas []int + offsetToRecord map[uint64]int + objectToRecord map[string]int + + baseCache *deltaBaseCache + packHash objectid.ObjectID + + objectCountHeader uint32 + thinFixed bool +} + +// newIngestState constructs one call-local ingest state. +func newIngestState( + src io.Reader, + destination *os.Root, + algo objectid.Algorithm, + fixThin bool, + writeRev bool, + base objectstore.Store, +) (*ingestState, error) { + if algo.Size() == 0 { + return nil, objectid.ErrInvalidAlgorithm + } + + return &ingestState{ + src: src, + destination: destination, + algo: algo, + fixThin: fixThin, + writeRev: writeRev, + base: base, + offsetToRecord: make(map[uint64]int), + objectToRecord: make(map[string]int), + baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes), + }, nil +} diff --git a/format/pack/ingest/stream.go b/format/pack/ingest/stream.go new file mode 100644 index 00000000..17a19d8d --- /dev/null +++ b/format/pack/ingest/stream.go @@ -0,0 +1,97 @@ +package ingest + +import ( + "bufio" + "fmt" + "hash" + "hash/crc32" + "io" + "os" +) + +// streamCopier reads bytes from src, writes them to packFile, and updates +// trailer verification state. +type streamCopier struct { + reader *bufio.Reader + packFile *os.File + verifier *trailerVerifier + offset uint64 + entryCRC hash.Hash32 +} + +// newStreamCopier constructs one stream copier. +func newStreamCopier(src io.Reader, packFile *os.File, verifier *trailerVerifier) *streamCopier { + return &streamCopier{ + reader: bufio.NewReaderSize(src, 64<<10), + packFile: packFile, + verifier: verifier, + } +} + +// Read implements io.Reader. +func (stream *streamCopier) Read(dst []byte) (int, error) { + n, err := stream.reader.Read(dst) + if n > 0 { + if writeErr := stream.writeChunk(dst[:n]); writeErr != nil { + return 0, writeErr + } + } + + return n, err +} + +// ReadByte implements io.ByteReader. +func (stream *streamCopier) ReadByte() (byte, error) { + b, err := stream.reader.ReadByte() + if err != nil { + return 0, err + } + + if writeErr := stream.writeChunk([]byte{b}); writeErr != nil { + return 0, writeErr + } + + return b, nil +} + +// readFull reads exactly len(dst) bytes through stream. +func (stream *streamCopier) readFull(dst []byte) error { + _, err := io.ReadFull(stream, dst) + if err != nil { + return err + } + + return nil +} + +// writeChunk mirrors src bytes to destination artifacts and accounting. +func (stream *streamCopier) writeChunk(src []byte) error { + _, err := stream.packFile.WriteAt(src, int64(stream.offset)) + if err != nil { + return &ErrDestinationWrite{Op: fmt.Sprintf("write pack: %v", err)} + } + + if stream.entryCRC != nil { + _, _ = stream.entryCRC.Write(src) + } + stream.verifier.write(src) + stream.offset += uint64(len(src)) + + return nil +} + +// beginEntryCRC starts inline CRC accumulation for one packed entry. +func (stream *streamCopier) beginEntryCRC() { + stream.entryCRC = crc32.NewIEEE() +} + +// endEntryCRC finishes inline CRC accumulation for one packed entry. +func (stream *streamCopier) endEntryCRC() (uint32, error) { + if stream.entryCRC == nil { + return 0, fmt.Errorf("format/pack/ingest: entry CRC not started") + } + crc := stream.entryCRC.Sum32() + stream.entryCRC = nil + + return crc, nil +} diff --git a/format/pack/ingest/stream_scan.go b/format/pack/ingest/stream_scan.go new file mode 100644 index 00000000..2c2389d8 --- /dev/null +++ b/format/pack/ingest/stream_scan.go @@ -0,0 +1,335 @@ +package ingest + +import ( + "encoding/binary" + "fmt" + "io" + + deltaapply "codeberg.org/lindenii/furgit/format/delta/apply" + packfmt "codeberg.org/lindenii/furgit/format/pack" + "codeberg.org/lindenii/furgit/internal/compress/zlib" + "codeberg.org/lindenii/furgit/objectheader" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// streamPackAndScan copies src into temp .pack while scanning packed entries. +func streamPackAndScan(state *ingestState) error { + hashImpl, err := state.algo.New() + if err != nil { + return err + } + + state.stream = newStreamCopier( + state.src, + state.packFile, + newTrailerVerifier(hashImpl, state.algo.Size()), + ) + + if err := readAndValidatePackHeader(state); err != nil { + return err + } + + state.records = make([]objectRecord, 0, state.objectCountHeader) + state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader) + state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader) + + for range state.objectCountHeader { + nextOffset, err := scanOneEntry(state, state.stream.offset) + if err != nil { + return err + } + + if nextOffset != state.stream.offset { + return fmt.Errorf("format/pack/ingest: internal stream offset mismatch") + } + } + + return finalizeStreamPackHash(state) +} + +// readAndValidatePackHeader reads and validates PACK header from the stream. +func readAndValidatePackHeader(state *ingestState) error { + var hdr [12]byte + if err := state.stream.readFull(hdr[:]); err != nil { + return &ErrInvalidPackHeader{Reason: fmt.Sprintf("read header: %v", err)} + } + + if binary.BigEndian.Uint32(hdr[:4]) != packfmt.Signature { + return &ErrInvalidPackHeader{Reason: "signature mismatch"} + } + + version := binary.BigEndian.Uint32(hdr[4:8]) + if !packfmt.VersionSupported(version) { + return &ErrInvalidPackHeader{Reason: fmt.Sprintf("unsupported version %d", version)} + } + + state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12]) + if state.objectCountHeader == 0 { + return &ErrInvalidPackHeader{Reason: "zero objects"} + } + + return nil +} + +// scanOneEntry scans one pack entry from stream and appends one record. +func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) { + state.stream.beginEntryCRC() + + record, err := parseEntryPrefix(state, startOffset) + if err != nil { + return 0, err + } + + contentLen, consumedInput, oid, err := drainEntryPayload(state, record) + if err != nil { + return 0, err + } + + if contentLen != record.declaredSize { + return 0, &ErrMalformedPackEntry{ + Offset: startOffset, + Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize), + } + } + + endOffset := startOffset + uint64(record.headerLen) + consumedInput + if endOffset > state.stream.offset { + return 0, &ErrMalformedPackEntry{ + Offset: startOffset, + Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.offset), + } + } + + record.packedLen = endOffset - startOffset + record.dataOffset = startOffset + uint64(record.headerLen) + if record.packedLen < uint64(record.headerLen) { + return 0, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative payload span"} + } + + crc, err := state.stream.endEntryCRC() + if err != nil { + return 0, err + } + record.crc32 = crc + + if packfmt.IsBaseObjectType(record.packedType) { + record.objectID = oid + record.realType = record.packedType + record.resolved = true + } + + recordIdx := len(state.records) + state.records = append(state.records, record) + state.offsetToRecord[record.offset] = recordIdx + if record.resolved { + state.objectToRecord[record.objectID.String()] = recordIdx + } + + switch record.packedType { + case objecttype.TypeOfsDelta: + state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{ + baseOffset: record.baseOffset, + recordIdx: recordIdx, + }) + case objecttype.TypeRefDelta: + state.refDeltas = append(state.refDeltas, refDeltaRef{ + baseObject: record.baseObject, + recordIdx: recordIdx, + }) + } + + return endOffset, nil +} + +// parseEntryPrefix parses one entry prefix from stream. +func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) { + var record objectRecord + record.offset = startOffset + + first, err := state.stream.ReadByte() + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)} + } + + record.packedType = objecttype.Type((first >> 4) & 0x07) + size := int64(first & 0x0f) + headerLen := uint32(1) + shift := uint(4) + b := first + + for b&0x80 != 0 { + b, err = state.stream.ReadByte() + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)} + } + headerLen++ + size |= int64(b&0x7f) << shift + shift += 7 + } + if size < 0 { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative declared size"} + } + record.declaredSize = size + + switch record.packedType { + case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag: + case objecttype.TypeRefDelta: + baseRaw := make([]byte, state.algo.Size()) + if err := state.stream.readFull(baseRaw); err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)} + } + baseID, err := objectid.FromBytes(state.algo, baseRaw) + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)} + } + record.baseObject = baseID + headerLen += uint32(len(baseRaw)) + case objecttype.TypeOfsDelta: + dist, consumed, err := readOfsDistanceFromStream(state.stream) + if err != nil { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: err.Error()} + } + if startOffset <= dist { + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "ofs base offset out of bounds"} + } + record.baseOffset = startOffset - dist + headerLen += uint32(consumed) + case objecttype.TypeInvalid, objecttype.TypeFuture: + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} + default: + return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)} + } + + record.headerLen = headerLen + + return record, nil +} + +// drainEntryPayload inflates one entry payload from stream and returns +// (inflatedLength, consumedInput, oidForBaseEntry). +func drainEntryPayload(state *ingestState, record objectRecord) (int64, uint64, objectid.ObjectID, error) { + var zero objectid.ObjectID + reader, err := zlib.NewReader(state.stream) + if err != nil { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)} + } + defer func() { _ = reader.Close() }() + + var total int64 + if packfmt.IsBaseObjectType(record.packedType) { + header, ok := objectheader.Encode(record.packedType, record.declaredSize) + if !ok { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "encode object header"} + } + + hashImpl, err := state.algo.New() + if err != nil { + return 0, 0, zero, err + } + _, _ = hashImpl.Write(header) + + n, err := io.Copy(hashImpl, reader) + if err != nil { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)} + } + total = n + + oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil)) + if err != nil { + return 0, 0, zero, err + } + + return total, reader.InputConsumed(), oid, nil + } + + if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta { + n, err := io.Copy(io.Discard, reader) + if err != nil { + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)} + } + total = n + + return total, reader.InputConsumed(), zero, nil + } + + return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "unsupported payload type"} +} + +// readOfsDistanceFromStream reads one ofs-delta encoded distance. +func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) { + first, err := reader.ReadByte() + if err != nil { + return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err) + } + + dist := uint64(first & 0x7f) + consumed := 1 + b := first + for b&0x80 != 0 { + b, err = reader.ReadByte() + if err != nil { + return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err) + } + consumed++ + dist = ((dist + 1) << 7) + uint64(b&0x7f) + } + + return dist, consumed, nil +} + +// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity. +func finalizeStreamPackHash(state *ingestState) error { + // We have already consumed object entries. Drain exactly the hash trailer. + trailer := make([]byte, state.algo.Size()) + if err := state.stream.readFull(trailer); err != nil { + return &ErrPackTrailerMismatch{} + } + + // Ensure no trailing garbage. + var probe [1]byte + n, err := state.stream.Read(probe[:]) + if n > 0 || err == nil { + return fmt.Errorf("format/pack/ingest: pack has trailing garbage") + } + if err != io.EOF { + return err + } + + if err := state.stream.verifier.verify(); err != nil { + return err + } + + packHash, err := objectid.FromBytes(state.algo, trailer) + if err != nil { + return err + } + state.packHash = packHash + + return nil +} + +// readDeltaHeaderSizes reads source and destination sizes from one delta payload. +func readDeltaHeaderSizes(payload []byte) (int, int, error) { + reader := &byteSliceReader{data: payload} + + return deltaapply.ReadHeaderSizes(reader) +} + +// byteSliceReader implements io.ByteReader on []byte. +type byteSliceReader struct { + data []byte + pos int +} + +// ReadByte reads one byte from receiver. +func (reader *byteSliceReader) ReadByte() (byte, error) { + if reader.pos >= len(reader.data) { + return 0, io.EOF + } + + b := reader.data[reader.pos] + reader.pos++ + + return b, nil +} diff --git a/format/pack/ingest/temp.go b/format/pack/ingest/temp.go new file mode 100644 index 00000000..cdda06c6 --- /dev/null +++ b/format/pack/ingest/temp.go @@ -0,0 +1,80 @@ +package ingest + +import ( + "crypto/rand" + "errors" + "fmt" + "io/fs" + "os" +) + +// openTemporaryArtifacts creates/open temp pack/idx/(rev) files under destination. +func openTemporaryArtifacts(state *ingestState) error { + packName, packFile, err := createTempFile(state.destination, "tmp_pack_") + if err != nil { + return err + } + state.packTmpName = packName + state.packFile = packFile + + idxName, idxFile, err := createTempFile(state.destination, "tmp_idx_") + if err != nil { + return err + } + state.idxTmpName = idxName + state.idxFile = idxFile + + if state.writeRev { + revName, revFile, err := createTempFile(state.destination, "tmp_rev_") + if err != nil { + return err + } + state.revTmpName = revName + state.revFile = revFile + } + + return nil +} + +// closeTemporaryArtifacts closes all temporary artifact file descriptors. +func closeTemporaryArtifacts(state *ingestState) error { + var out error + if state.packFile != nil { + if err := state.packFile.Close(); err != nil && out == nil { + out = err + } + state.packFile = nil + } + if state.idxFile != nil { + if err := state.idxFile.Close(); err != nil && out == nil { + out = err + } + state.idxFile = nil + } + if state.revFile != nil { + if err := state.revFile.Close(); err != nil && out == nil { + out = err + } + state.revFile = nil + } + + return out +} + +// createTempFile creates one temporary file under root using prefix. +func createTempFile(root *os.Root, prefix string) (string, *os.File, error) { + for range 32 { + name := prefix + rand.Text() + file, err := root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644) + if err == nil { + return name, file, nil + } + if errors.Is(err, fs.ErrExist) { + continue + } + + return "", nil, fmt.Errorf("format/pack/ingest: create temp file %q: %w", name, err) + } + + return "", nil, fmt.Errorf("format/pack/ingest: unable to create temporary file for prefix %q", prefix) +} diff --git a/format/pack/ingest/thin_fix.go b/format/pack/ingest/thin_fix.go new file mode 100644 index 00000000..e605c3f2 --- /dev/null +++ b/format/pack/ingest/thin_fix.go @@ -0,0 +1,211 @@ +package ingest + +import ( + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "os" + + "codeberg.org/lindenii/furgit/internal/compress/zlib" + "codeberg.org/lindenii/furgit/objectid" + "codeberg.org/lindenii/furgit/objecttype" +) + +// maybeFixThin appends missing bases and rewrites pack header/trailer when needed. +func maybeFixThin(state *ingestState) error { + if len(state.unresolvedRefDeltas) == 0 { + return nil + } + if !state.fixThin { + return &ErrThinPackUnresolved{Count: len(state.unresolvedRefDeltas)} + } + if state.base == nil { + return &ErrThinPackUnresolved{Count: len(state.unresolvedRefDeltas)} + } + + hashSize := int64(state.algo.Size()) + info, err := state.packFile.Stat() + if err != nil { + return err + } + size := info.Size() + if size < hashSize { + return fmt.Errorf("format/pack/ingest: pack too short to trim trailer") + } + newEnd := size - hashSize + if err := state.packFile.Truncate(newEnd); err != nil { + return err + } + state.stream.offset = uint64(newEnd) + + baseIDs := unresolvedThinBaseIDs(state) + for _, id := range baseIDs { + ty, content, err := state.base.ReadBytesContent(id) + if err != nil { + continue + } + if _, err := appendBaseObject(state, id, ty, content); err != nil { + return err + } + state.thinFixed = true + } + + if err := rewritePackHeaderAndTrailer(state); err != nil { + return err + } + + return nil +} + +// appendBaseObject appends one base object as a new packed non-delta entry. +func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) { + start := state.stream.offset + header := encodePackEntryHeader(realType, int64(len(content))) + if _, err := state.packFile.WriteAt(header, int64(start)); err != nil { + return 0, err + } + + section := &fileSectionWriter{file: state.packFile, off: int64(start) + int64(len(header))} + crc := crc32.NewIEEE() + _, _ = crc.Write(header) + counting := &countingWriter{dst: section} + zw := zlib.NewWriter(io.MultiWriter(counting, crc)) + if _, err := zw.Write(content); err != nil { + return 0, err + } + if err := zw.Close(); err != nil { + return 0, err + } + + packedLen := uint64(len(header)) + uint64(counting.n) + end := start + packedLen + state.stream.offset = end + + record := objectRecord{ + offset: start, + headerLen: uint32(len(header)), + packedLen: packedLen, + crc32: crc.Sum32(), + packedType: realType, + realType: realType, + declaredSize: int64(len(content)), + dataOffset: start + uint64(len(header)), + objectID: id, + resolved: true, + } + + recordIdx := len(state.records) + state.records = append(state.records, record) + state.offsetToRecord[start] = recordIdx + state.objectToRecord[id.String()] = recordIdx + state.baseCache.add(recordIdx, realType, content) + + return recordIdx, nil +} + +// fileSectionWriter writes sequentially to file via WriteAt at one base offset. +type fileSectionWriter struct { + file *os.File + off int64 + pos int64 +} + +// Write writes src at current section position. +func (writer *fileSectionWriter) Write(src []byte) (int, error) { + if len(src) == 0 { + return 0, nil + } + n, err := writer.file.WriteAt(src, writer.off+writer.pos) + writer.pos += int64(n) + + return n, err +} + +// countingWriter counts bytes written to dst. +type countingWriter struct { + dst io.Writer + n int +} + +// Write writes src to dst and tracks output byte count. +func (writer *countingWriter) Write(src []byte) (int, error) { + n, err := writer.dst.Write(src) + writer.n += n + + return n, err +} + +// rewritePackHeaderAndTrailer rewrites object count and trailer hash using ReadAt/WriteAt. +func rewritePackHeaderAndTrailer(state *ingestState) error { + var countRaw [4]byte + binary.BigEndian.PutUint32(countRaw[:], uint32(len(state.records))) + if _, err := state.packFile.WriteAt(countRaw[:], 8); err != nil { + return err + } + + info, err := state.packFile.Stat() + if err != nil { + return err + } + endWithoutTrailer := info.Size() + + hashImpl, err := state.algo.New() + if err != nil { + return err + } + var ( + buf [128 << 10]byte + pos int64 + ) + for pos < endWithoutTrailer { + want := int64(len(buf)) + remaining := endWithoutTrailer - pos + if remaining < want { + want = remaining + } + n, err := state.packFile.ReadAt(buf[:want], pos) + if err != nil && err != io.EOF { + return err + } + if n == 0 { + return io.ErrUnexpectedEOF + } + _, _ = hashImpl.Write(buf[:n]) + pos += int64(n) + } + + sum := hashImpl.Sum(nil) + if _, err := state.packFile.WriteAt(sum, endWithoutTrailer); err != nil { + return err + } + + packHash, err := objectid.FromBytes(state.algo, sum) + if err != nil { + return err + } + state.packHash = packHash + state.objectCountHeader = uint32(len(state.records)) + state.stream.offset = uint64(endWithoutTrailer + int64(len(sum))) + + return nil +} + +// encodePackEntryHeader encodes one non-delta packed entry header. +func encodePackEntryHeader(ty objecttype.Type, size int64) []byte { + var out [16]byte + n := 0 + s := uint64(size) + c := byte((uint8(ty) << 4) | byte(s&0x0f)) + s >>= 4 + for s != 0 { + out[n] = c | 0x80 + n++ + c = byte(s & 0x7f) + s >>= 7 + } + out[n] = c + n++ + + return append([]byte(nil), out[:n]...) +} diff --git a/format/pack/ingest/trailer.go b/format/pack/ingest/trailer.go new file mode 100644 index 00000000..be8156d3 --- /dev/null +++ b/format/pack/ingest/trailer.go @@ -0,0 +1,65 @@ +package ingest + +import ( + "bytes" + "fmt" + "hash" +) + +// trailerVerifier incrementally verifies trailing hash bytes in a stream. +type trailerVerifier struct { + hash hash.Hash + hashSize int + tail []byte + seen int64 +} + +// newTrailerVerifier creates a trailing hash verifier. +func newTrailerVerifier(hash hash.Hash, hashSize int) *trailerVerifier { + return &trailerVerifier{ + hash: hash, + hashSize: hashSize, + tail: make([]byte, 0, hashSize), + } +} + +// write feeds one chunk of stream bytes into the verifier. +func (verifier *trailerVerifier) write(src []byte) { + if len(src) == 0 { + return + } + + verifier.seen += int64(len(src)) + if len(verifier.tail) == 0 && len(src) <= verifier.hashSize { + verifier.tail = append(verifier.tail, src...) + + return + } + + tmp := make([]byte, 0, len(verifier.tail)+len(src)) + tmp = append(tmp, verifier.tail...) + tmp = append(tmp, src...) + if len(tmp) <= verifier.hashSize { + verifier.tail = tmp + + return + } + + flushN := len(tmp) - verifier.hashSize + _, _ = verifier.hash.Write(tmp[:flushN]) + verifier.tail = append(verifier.tail[:0], tmp[flushN:]...) +} + +// verify finalizes verification against the stream trailer. +func (verifier *trailerVerifier) verify() error { + if len(verifier.tail) != verifier.hashSize { + return fmt.Errorf("format/pack/ingest: stream too short for trailer hash") + } + + computed := verifier.hash.Sum(nil) + if !bytes.Equal(computed, verifier.tail) { + return &ErrPackTrailerMismatch{} + } + + return nil +} |
