aboutsummaryrefslogtreecommitdiff
package ingest

import (
	"bytes"
	"crypto/rand"
	"errors"
	"fmt"
	"io"
	"io/fs"
	"os"

	"lindenii.org/go/furgit/internal/format/packfile"
	"lindenii.org/go/furgit/object/id"
	"lindenii.org/go/furgit/object/store"
)

var errTempNamesExhausted = errors.New("object/store/packed/internal/ingest: exhausted temporary file names")

// ingestion holds the state for one WritePack call.
type ingestion struct {
	// root is the destination objects/pack directory.
	root *os.Root

	// objectFormat is the pack's object format.
	objectFormat id.ObjectFormat

	// opts carries the thin base reader and progress sink.
	opts store.PackWriteOptions

	// src is the pack stream, positioned just past the header.
	src io.Reader

	// packFile is the temporary pack file being written,
	// and packTmp is its destination-relative name.
	packFile *os.File
	packTmp  string

	// temps lists every temporary file to remove on failure.
	temps []string

	// scanner streams src into packFile while scanning entries.
	scanner *scanner

	// records holds one entry per object, in pack-offset order.
	records []record

	// byOffset maps an entry offset to its record index,
	// and byOID maps a resolved object ID to its record index.
	byOffset map[int]int
	byOID    map[id.ObjectID]int

	// headerCount is the object count declared by the pack header.
	headerCount int

	// deltaCount counts delta records, accumulated during scanning.
	deltaCount int

	// deltasResolved counts resolved delta records, for progress.
	deltasResolved int

	// packHash is the final pack trailer hash.
	packHash id.ObjectID

	// thinFixed reports whether thin completion appended local bases.
	thinFixed bool

	// committed suppresses temporary file removal once artifacts are published.
	committed bool
}

// WritePack ingests one pack stream into root,
// publishing a pack, index, and reverse index
// under content-addressed names derived from the pack trailer hash.
//
// WritePack consumes the pack stream through its trailer and stops there.
// It does not require src to reach EOF afterward,
// so it is safe on a still-open transport connection,
// such as receive-pack,
// whose peer keeps the connection open to read the response.
//
// The pack must be the last thing the peer sends before that response:
// any bytes arriving immediately after the trailer
// are rejected as a malformed pack.
func WritePack(root *os.Root, objectFormat id.ObjectFormat, src io.Reader, opts store.PackWriteOptions) (Result, error) {
	if objectFormat.Size() == 0 {
		return Result{}, id.ErrInvalidObjectFormat
	}

	headerRaw, count, err := readPackHeader(src)
	if err != nil {
		return Result{}, err
	}

	ingestion := &ingestion{
		root:           root,
		objectFormat:   objectFormat,
		opts:           opts,
		src:            src,
		packFile:       nil,
		packTmp:        "",
		temps:          nil,
		scanner:        nil,
		records:        nil,
		byOffset:       make(map[int]int),
		byOID:          make(map[id.ObjectID]int),
		headerCount:    count,
		deltaCount:     0,
		deltasResolved: 0,
		packHash:       id.ObjectID{},
		thinFixed:      false,
		committed:      false,
	}

	defer ingestion.cleanup()

	if count == 0 {
		return ingestion.finishEmpty(headerRaw)
	}

	err = ingestion.openPackTemp(headerRaw)
	if err != nil {
		return Result{}, err
	}

	err = ingestion.streamAndScan()
	if err != nil {
		return Result{}, err
	}

	err = ingestion.resolveDeltas()
	if err != nil {
		return Result{}, err
	}

	err = ingestion.packFile.Sync()
	if err != nil {
		return Result{}, fmt.Errorf("object/store/packed/internal/ingest: syncing pack: %w", err)
	}

	result, err := ingestion.finalize()
	if err != nil {
		return Result{}, err
	}

	ingestion.committed = true

	return result, nil
}

// finishEmpty verifies the trailer of a zero-object pack
// and returns success without writing any artifacts.
func (ingestion *ingestion) finishEmpty(headerRaw [packfile.HeaderLen]byte) (Result, error) {
	hashImpl, err := ingestion.objectFormat.New()
	if err != nil {
		return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
	}

	_, _ = hashImpl.Write(headerRaw[:])

	trailer := make([]byte, ingestion.objectFormat.Size())

	_, err = io.ReadFull(ingestion.src, trailer)
	if err != nil {
		return Result{}, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err)
	}

	if !bytes.Equal(hashImpl.Sum(nil), trailer) {
		return Result{}, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack)
	}

	packHash, err := ingestion.objectFormat.FromBytes(trailer)
	if err != nil {
		return Result{}, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
	}

	return Result{
		PackName:    "",
		IdxName:     "",
		RevName:     "",
		PackHash:    packHash,
		ObjectCount: 0,
		ThinFixed:   false,
	}, nil
}

// openPackTemp creates the temporary pack file,
// writes the validated header to it,
// and builds the stream scanner seeded with that header.
func (ingestion *ingestion) openPackTemp(headerRaw [packfile.HeaderLen]byte) error {
	name, file, err := ingestion.createTemp("tmp_pack_")
	if err != nil {
		return err
	}

	ingestion.packTmp = name
	ingestion.packFile = file

	_, err = file.Write(headerRaw[:])
	if err != nil {
		return fmt.Errorf("object/store/packed/internal/ingest: writing pack header: %w", err)
	}

	hashImpl, err := ingestion.objectFormat.New()
	if err != nil {
		return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
	}

	_, _ = hashImpl.Write(headerRaw[:])

	ingestion.scanner = newScanner(ingestion.src, ingestion.packFile, hashImpl)

	return nil
}

// createTemp creates one temporary file under root,
// recording its name for cleanup on failure.
func (ingestion *ingestion) createTemp(prefix string) (string, *os.File, error) {
	for range 32 {
		name := prefix + rand.Text()

		file, err := ingestion.root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644)
		if err == nil {
			ingestion.temps = append(ingestion.temps, name)

			return name, file, nil
		}

		if !errors.Is(err, fs.ErrExist) {
			return "", nil, fmt.Errorf("object/store/packed/internal/ingest: creating temp file: %w", err)
		}
	}

	return "", nil, errTempNamesExhausted
}

// cleanup closes the pack file
// and removes temporary files unless the write committed.
func (ingestion *ingestion) cleanup() {
	if ingestion.packFile != nil {
		_ = ingestion.packFile.Close()
	}

	if ingestion.committed {
		return
	}

	for _, name := range ingestion.temps {
		_ = ingestion.root.Remove(name)
	}
}