aboutsummaryrefslogtreecommitdiff
package ingest

import (
	"bytes"
	"errors"
	"fmt"
	"hash"
	"hash/crc32"
	"io"

	"lindenii.org/go/furgit/internal/compress/zlib"
	"lindenii.org/go/furgit/internal/format/packfile"
	"lindenii.org/go/furgit/internal/progress"
	"lindenii.org/go/furgit/object/header"
	"lindenii.org/go/furgit/object/id"
	"lindenii.org/go/lgo/intconv"
)

// scanBufferSize is the stream scanner's fixed input window size.
const scanBufferSize = 64 << 10

// scanner reads one pack stream,
// mirroring consumed bytes into the destination pack file
// while maintaining the running pack hash and a per-entry CRC.
//
// It implements [io.Reader] and [io.ByteReader]
// so a zlib reader can consume an entry payload through it
// without reading past the end of that compressed stream.
type scanner struct {
	src io.Reader
	dst io.Writer

	// buf[off:n] is the unread window.
	buf []byte
	off int
	n   int

	// consumed counts stream bytes consumed so far.
	consumed int

	// hash accumulates the pack hash over consumed bytes
	// while hashing is true.
	hash    hash.Hash
	hashing bool

	// crc accumulates the CRC of the current entry
	// while crcing is true.
	crc    uint32
	crcing bool
}

// newScanner constructs one scanner mirroring src into dst,
// seeding the running hash from the already-consumed pack header.
func newScanner(src io.Reader, dst io.Writer, packHash hash.Hash) *scanner {
	return &scanner{
		src:      src,
		dst:      dst,
		buf:      make([]byte, scanBufferSize),
		consumed: packfile.HeaderLen,
		hash:     packHash,
		hashing:  true,
		crc:      0,
		crcing:   false,
	}
}

// readPackHeader reads and validates the pack header from src,
// returning the raw header and its declared object count.
func readPackHeader(src io.Reader) ([packfile.HeaderLen]byte, int, error) {
	var raw [packfile.HeaderLen]byte

	_, err := io.ReadFull(src, raw[:])
	if err != nil {
		return raw, 0, fmt.Errorf("%w: reading header: %w", ErrMalformedPack, err)
	}

	packHeader, err := packfile.ParseHeader(raw[:])
	if err != nil {
		return raw, 0, fmt.Errorf("%w: %w", ErrMalformedPack, err)
	}

	count, err := intconv.Uint32ToInt(packHeader.ObjectCount)
	if err != nil {
		return raw, 0, fmt.Errorf("%w: object count: %w", ErrMalformedPack, err)
	}

	return raw, count, nil
}

// Read implements [io.Reader].
func (scanner *scanner) Read(dst []byte) (int, error) {
	if len(dst) == 0 {
		return 0, nil
	}

	err := scanner.ensureAvailable()
	if err != nil {
		return 0, err
	}

	read := min(len(dst), scanner.n-scanner.off)

	copy(dst, scanner.buf[scanner.off:scanner.off+read])

	err = scanner.use(read)
	if err != nil {
		return 0, err
	}

	return read, nil
}

// ReadByte implements [io.ByteReader] without allocation.
func (scanner *scanner) ReadByte() (byte, error) {
	err := scanner.ensureAvailable()
	if err != nil {
		return 0, err
	}

	b := scanner.buf[scanner.off]

	err = scanner.use(1)
	if err != nil {
		return 0, err
	}

	return b, nil
}

// ensureAvailable makes at least one unread byte available,
// returning [io.EOF] once the source is exhausted.
func (scanner *scanner) ensureAvailable() error {
	for scanner.n-scanner.off == 0 {
		err := scanner.flushPrefix()
		if err != nil {
			return err
		}

		read, err := scanner.src.Read(scanner.buf[scanner.n:])
		scanner.n += read

		if err != nil {
			if errors.Is(err, io.EOF) {
				if scanner.n-scanner.off == 0 {
					return io.EOF
				}

				return nil
			}

			return fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err)
		}

		if read == 0 && scanner.n-scanner.off == 0 {
			return io.ErrNoProgress
		}
	}

	return nil
}

// peekHeader returns the unread window grown to at most maxLen bytes
// without consuming, tolerating an early end of stream.
func (scanner *scanner) peekHeader(maxLen int) ([]byte, error) {
	maxLen = min(maxLen, len(scanner.buf))

	for scanner.n-scanner.off < maxLen {
		err := scanner.flushPrefix()
		if err != nil {
			return nil, err
		}

		read, err := scanner.src.Read(scanner.buf[scanner.n:])
		scanner.n += read

		if err != nil {
			if errors.Is(err, io.EOF) {
				break
			}

			return nil, fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err)
		}

		if read == 0 {
			break
		}
	}

	if scanner.n-scanner.off == 0 {
		return nil, fmt.Errorf("%w: unexpected end of stream", ErrMalformedPack)
	}

	return scanner.buf[scanner.off:scanner.n], nil
}

// use consumes n unread bytes,
// folding them into the running hash and entry CRC as enabled.
func (scanner *scanner) use(n int) error {
	chunk := scanner.buf[scanner.off : scanner.off+n]

	if scanner.hashing {
		_, err := scanner.hash.Write(chunk)
		if err != nil {
			return fmt.Errorf("object/store/packed/internal/ingest: hashing pack: %w", err)
		}
	}

	if scanner.crcing {
		scanner.crc = crc32.Update(scanner.crc, crc32.IEEETable, chunk)
	}

	scanner.off += n
	scanner.consumed += n

	return nil
}

// flushPrefix writes the consumed buffer prefix to the destination
// and compacts the unread window to the start of the buffer.
func (scanner *scanner) flushPrefix() error {
	if scanner.off == 0 {
		return nil
	}

	_, err := scanner.dst.Write(scanner.buf[:scanner.off])
	if err != nil {
		return fmt.Errorf("object/store/packed/internal/ingest: writing pack: %w", err)
	}

	unread := scanner.n - scanner.off

	copy(scanner.buf, scanner.buf[scanner.off:scanner.n])

	scanner.off = 0
	scanner.n = unread

	return nil
}

// beginCRC starts CRC accumulation for one entry.
func (scanner *scanner) beginCRC() {
	scanner.crc = 0
	scanner.crcing = true
}

// endCRC ends CRC accumulation and returns the entry CRC.
func (scanner *scanner) endCRC() uint32 {
	crc := scanner.crc
	scanner.crc = 0
	scanner.crcing = false

	return crc
}

// finishTrailer reads and verifies the pack trailer hash,
// flushing the remaining buffered pack bytes to the destination.
//
// The trailer is mirrored to the destination but excluded from the pack hash.
func (scanner *scanner) finishTrailer(hashSize int) ([]byte, error) {
	trailer := make([]byte, hashSize)

	scanner.hashing = false

	_, err := io.ReadFull(scanner, trailer)
	if err != nil {
		return nil, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err)
	}

	if scanner.n-scanner.off > 0 {
		return nil, fmt.Errorf("%w: trailing data after pack", ErrMalformedPack)
	}

	if !bytes.Equal(scanner.hash.Sum(nil), trailer) {
		return nil, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack)
	}

	err = scanner.flushPrefix()
	if err != nil {
		return nil, err
	}

	return trailer, nil
}

// streamAndScan streams the pack body to the temporary pack file,
// scanning one record per declared object and verifying the trailer.
func (ingestion *ingestion) streamAndScan() error {
	meter := progress.New(progress.Options{
		Writer:     ingestion.opts.Progress,
		Title:      "receiving objects",
		Total:      ingestion.headerCount,
		Delay:      0,
		Sparse:     false,
		Throughput: true,
	})

	for done := range ingestion.headerCount {
		err := ingestion.scanEntry(ingestion.scanner.consumed)
		if err != nil {
			return err
		}

		meter.Set(done+1, ingestion.scanner.consumed)
	}

	meter.Stop("done")

	trailer, err := ingestion.scanner.finishTrailer(ingestion.objectFormat.Size())
	if err != nil {
		return err
	}

	packHash, err := ingestion.objectFormat.FromBytes(trailer)
	if err != nil {
		return fmt.Errorf("object/store/packed/internal/ingest: %w", err)
	}

	ingestion.packHash = packHash

	return nil
}

// scanEntry scans the entry beginning at start into one record.
func (ingestion *ingestion) scanEntry(start int) error {
	ingestion.scanner.beginCRC()

	rec, err := ingestion.scanHeader(start)
	if err != nil {
		return err
	}

	inflated, oid, err := ingestion.drainPayload(&rec)
	if err != nil {
		return err
	}

	if inflated != int64(rec.declaredSize) {
		return fmt.Errorf(
			"%w: entry at %d: inflated size %d differs from declared %d",
			ErrMalformedPack, start, inflated, rec.declaredSize,
		)
	}

	rec.packedLen = ingestion.scanner.consumed - start
	rec.crc32 = ingestion.scanner.endCRC()

	if rec.packedType.IsBase() {
		rec.objectType = rec.packedType
		rec.oid = oid
		rec.resolved = true
	} else {
		ingestion.deltaCount++
	}

	index := len(ingestion.records)
	ingestion.records = append(ingestion.records, rec)
	ingestion.byOffset[rec.offset] = index

	if rec.resolved {
		ingestion.byOID[rec.oid] = index
	}

	return nil
}

// scanHeader parses and consumes the entry header at start.
func (ingestion *ingestion) scanHeader(start int) (record, error) {
	var rec record

	rec.offset = start

	window, err := ingestion.scanner.peekHeader(packfile.MaxEntryHeaderLen(ingestion.objectFormat.Size()))
	if err != nil {
		return rec, err
	}

	entryHeader, err := packfile.ParseEntryHeader(window, ingestion.objectFormat.Size())
	if err != nil {
		return rec, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, start, err)
	}

	declaredSize, err := intconv.Uint64ToInt(entryHeader.Size)
	if err != nil {
		return rec, fmt.Errorf("%w: entry at %d: declared size overflows int: %w", ErrMalformedPack, start, err)
	}

	rec.packedType = entryHeader.Type
	rec.declaredSize = declaredSize
	rec.headerLen = entryHeader.HeaderLen

	switch entryHeader.Type {
	case packfile.EntryTypeCommit, packfile.EntryTypeTree, packfile.EntryTypeBlob, packfile.EntryTypeTag:
	case packfile.EntryTypeOfsDelta:
		dist, err := intconv.Uint64ToInt(entryHeader.OfsDistance)
		if err != nil || dist == 0 || dist > start {
			return rec, fmt.Errorf("%w: entry at %d: ofs-delta base out of bounds", ErrMalformedPack, start)
		}

		rec.baseOffset = start - dist
	case packfile.EntryTypeRefDelta:
		baseID, err := ingestion.objectFormat.FromBytes(entryHeader.RefBase[:ingestion.objectFormat.Size()])
		if err != nil {
			return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
		}

		rec.baseOID = baseID
	case packfile.EntryTypeInvalid, packfile.EntryTypeFuture:
		return rec, fmt.Errorf("%w: entry at %d: unsupported entry type", ErrMalformedPack, start)
	}

	err = ingestion.scanner.use(entryHeader.HeaderLen)
	if err != nil {
		return rec, err
	}

	return rec, nil
}

// drainPayload consumes one entry's compressed payload from the stream,
// returning its inflated length and, for base entries, its object ID.
func (ingestion *ingestion) drainPayload(rec *record) (int64, id.ObjectID, error) {
	var zero id.ObjectID

	zr, err := zlib.NewReader(ingestion.scanner)
	if err != nil {
		return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
	}

	defer func() { _ = zr.Close() }()

	if !rec.packedType.IsBase() {
		read, err := io.Copy(io.Discard, zr)
		if err != nil {
			return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
		}

		return read, zero, nil
	}

	objectType, err := rec.packedType.ObjectType()
	if err != nil {
		return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
	}

	hashImpl, err := ingestion.objectFormat.New()
	if err != nil {
		return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
	}

	_, _ = hashImpl.Write(header.Append(nil, objectType, rec.declaredSize))

	read, err := io.Copy(hashImpl, zr)
	if err != nil {
		return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err)
	}

	oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil))
	if err != nil {
		return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err)
	}

	return read, oid, nil
}