package ingest
import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"io"
"io/fs"
"os"
"lindenii.org/go/furgit/internal/format/packfile"
"lindenii.org/go/furgit/object/id"
"lindenii.org/go/furgit/object/store"
)
var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names")
// ingestion holds the state for one WritePack call.
type ingestion struct {
// root is the destination objects/pack directory.
root *os.Root
// objectFormat is the pack's object format.
objectFormat id.ObjectFormat
// opts carries the thin base reader and progress sink.
opts store.PackWriteOptions
// src is the pack stream, positioned just past the header.
src io.Reader
// packFile is the temporary pack file being written,
// and packTmp is its destination-relative name.
packFile *os.File
packTmp string
// temps lists every temporary file to remove on failure.
temps []string
// scanner streams src into packFile while scanning entries.
scanner *scanner
// records holds one entry per object, in pack-offset order.
records []record
// byOffset maps an entry offset to its record index,
// and byOID maps a resolved object ID to its record index.
byOffset map[int]int
byOID map[id.ObjectID]int
// headerCount is the object count declared by the pack header.
headerCount int
// deltaCount counts delta records, accumulated during scanning.
deltaCount int
// deltasResolved counts resolved delta records, for progress.
deltasResolved int
// packHash is the final pack trailer hash.
packHash id.ObjectID
// thinFixed reports whether thin completion appended local bases.
thinFixed bool
// committed suppresses temporary file removal once artifacts are published.
committed bool
}
// WritePack ingests one pack stream into root,
// publishing a pack, index, and reverse index
// under content-addressed names derived from the pack trailer hash.
//
// WritePack consumes the pack stream through its trailer and stops there.
// It does not require src to reach EOF afterward,
// so it is safe on a still-open transport connection,
// such as receive-pack,
// whose peer keeps the connection open to read the response.
//
// The pack must be the last thing the peer sends before that response:
// any bytes arriving immediately after the trailer
// are rejected as a malformed pack.
func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) {
if objectFormat.Size() == 0 {
return Result{}, id.ErrInvalidObjectFormat
}
headerRaw, count, err := readPackHeader(src)
if err != nil {
return Result{}, err
}
ingestion := &ingestion{
root: root,
objectFormat: objectFormat,
opts: opts,
src: src,
packFile: nil,
packTmp: "",
temps: nil,
scanner: nil,
records: nil,
byOffset: make(map[int]int),
byOID: make(map[id.ObjectID]int),
headerCount: count,
deltaCount: 0,
deltasResolved: 0,
packHash: id.ObjectID{},
thinFixed: false,
committed: false,
}
defer ingestion.cleanup()
if count == 0 {
return ingestion.finishEmpty(headerRaw)
}
err = ingestion.openPackTemp(headerRaw)
if err != nil {
return Result{}, err
}
err = ingestion.streamAndScan()
if err != nil {
return Result{}, err
}
err = ingestion.resolveDeltas()
if err != nil {
return Result{}, err
}
err = ingestion.packFile.Sync()
if err != nil {
return Result{}, fmt.Errorf("object/store/packed/internal/ingest: syncing pack: %w", err)
}
result, err := ingestion.finalize()
if err != nil {
return Result{}, err
}
ingestion.committed = true
return result, nil
}
// finishEmpty verifies the trailer of a zero-object pack
// and returns success without writing any artifacts.
func (ingestion *ingestion) finishEmpty(headerRaw [packfile.HeaderLen]byte) (Result, error) {
hashImpl, err := ingestion.objectFormat.New()
if err != nil {
return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
}
_, _ = hashImpl.Write(headerRaw[:])
trailer := make([]byte, ingestion.objectFormat.Size())
_, err = io.ReadFull(ingestion.src, trailer)
if err != nil {
return Result{}, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err)
}
if !bytes.Equal(hashImpl.Sum(nil), trailer) {
return Result{}, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack)
}
packHash, err := ingestion.objectFormat.FromBytes(trailer)
if err != nil {
return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
}
return Result{
PackName: "",
IdxName: "",
RevName: "",
PackHash: packHash,
ObjectCount: 0,
ThinFixed: false,
}, nil
}
// openPackTemp creates the temporary pack file,
// writes the validated header to it,
// and builds the stream scanner seeded with that header.
func (ingestion *ingestion) openPackTemp(headerRaw [packfile.HeaderLen]byte) error {
name, file, err := ingestion.createTemp("tmp_pack_")
if err != nil {
return err
}
ingestion.packTmp = name
ingestion.packFile = file
_, err = file.Write(headerRaw[:])
if err != nil {
return fmt.Errorf("object/store/packed/internal/ingest: writing pack header: %w", err)
}
hashImpl, err := ingestion.objectFormat.New()
if err != nil {
return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
}
_, _ = hashImpl.Write(headerRaw[:])
ingestion.scanner = newScanner(ingestion.src, ingestion.packFile, hashImpl)
return nil
}
// createTemp creates one temporary file under root,
// recording its name for cleanup on failure.
func (ingestion *ingestion) createTemp(prefix string) (string, *os.File, error) {
for range 32 {
name := prefix + rand.Text()
file, err := ingestion.root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644)
if err == nil {
ingestion.temps = append(ingestion.temps, name)
return name, file, nil
}
if !errors.Is(err, fs.ErrExist) {
return "", nil, fmt.Errorf("object/store/packed/internal/ingest: creating temp file: %w", err)
}
}
return "", nil, errTempNamesExhausted
}
// cleanup closes the pack file
// and removes temporary files unless the write committed.
func (ingestion *ingestion) cleanup() {
if ingestion.packFile != nil {
_ = ingestion.packFile.Close()
}
if ingestion.committed {
return
}
for _, name := range ingestion.temps {
_ = ingestion.root.Remove(name)
}
}