package ingest import ( "bytes" "errors" "fmt" "hash" "hash/crc32" "io" "lindenii.org/go/furgit/internal/compress/zlib" "lindenii.org/go/furgit/internal/format/packfile" "lindenii.org/go/furgit/internal/progress" "lindenii.org/go/furgit/object/header" "lindenii.org/go/furgit/object/id" "lindenii.org/go/lgo/intconv" ) // scanBufferSize is the stream scanner's fixed input window size. const scanBufferSize = 64 << 10 // scanner reads one pack stream, // mirroring consumed bytes into the destination pack file // while maintaining the running pack hash and a per-entry CRC. // // It implements [io.Reader] and [io.ByteReader] // so a zlib reader can consume an entry payload through it // without reading past the end of that compressed stream. type scanner struct { src io.Reader dst io.Writer // buf[off:n] is the unread window. buf []byte off int n int // consumed counts stream bytes consumed so far. consumed int // hash accumulates the pack hash over consumed bytes // while hashing is true. hash hash.Hash hashing bool // crc accumulates the CRC of the current entry // while crcing is true. crc uint32 crcing bool } // newScanner constructs one scanner mirroring src into dst, // seeding the running hash from the already-consumed pack header. func newScanner(src io.Reader, dst io.Writer, packHash hash.Hash) *scanner { return &scanner{ src: src, dst: dst, buf: make([]byte, scanBufferSize), consumed: packfile.HeaderLen, hash: packHash, hashing: true, crc: 0, crcing: false, } } // readPackHeader reads and validates the pack header from src, // returning the raw header and its declared object count. func readPackHeader(src io.Reader) ([packfile.HeaderLen]byte, int, error) { var raw [packfile.HeaderLen]byte _, err := io.ReadFull(src, raw[:]) if err != nil { return raw, 0, fmt.Errorf("%w: reading header: %w", ErrMalformedPack, err) } packHeader, err := packfile.ParseHeader(raw[:]) if err != nil { return raw, 0, fmt.Errorf("%w: %w", ErrMalformedPack, err) } count, err := intconv.Uint32ToInt(packHeader.ObjectCount) if err != nil { return raw, 0, fmt.Errorf("%w: object count: %w", ErrMalformedPack, err) } return raw, count, nil } // Read implements [io.Reader]. func (scanner *scanner) Read(dst []byte) (int, error) { if len(dst) == 0 { return 0, nil } err := scanner.ensureAvailable() if err != nil { return 0, err } read := min(len(dst), scanner.n-scanner.off) copy(dst, scanner.buf[scanner.off:scanner.off+read]) err = scanner.use(read) if err != nil { return 0, err } return read, nil } // ReadByte implements [io.ByteReader] without allocation. func (scanner *scanner) ReadByte() (byte, error) { err := scanner.ensureAvailable() if err != nil { return 0, err } b := scanner.buf[scanner.off] err = scanner.use(1) if err != nil { return 0, err } return b, nil } // ensureAvailable makes at least one unread byte available, // returning [io.EOF] once the source is exhausted. func (scanner *scanner) ensureAvailable() error { for scanner.n-scanner.off == 0 { err := scanner.flushPrefix() if err != nil { return err } read, err := scanner.src.Read(scanner.buf[scanner.n:]) scanner.n += read if err != nil { if errors.Is(err, io.EOF) { if scanner.n-scanner.off == 0 { return io.EOF } return nil } return fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) } if read == 0 && scanner.n-scanner.off == 0 { return io.ErrNoProgress } } return nil } // peekHeader returns the unread window grown to at most maxLen bytes // without consuming, tolerating an early end of stream. func (scanner *scanner) peekHeader(maxLen int) ([]byte, error) { maxLen = min(maxLen, len(scanner.buf)) for scanner.n-scanner.off < maxLen { err := scanner.flushPrefix() if err != nil { return nil, err } read, err := scanner.src.Read(scanner.buf[scanner.n:]) scanner.n += read if err != nil { if errors.Is(err, io.EOF) { break } return nil, fmt.Errorf("object/store/packed/internal/ingest: reading pack: %w", err) } if read == 0 { break } } if scanner.n-scanner.off == 0 { return nil, fmt.Errorf("%w: unexpected end of stream", ErrMalformedPack) } return scanner.buf[scanner.off:scanner.n], nil } // use consumes n unread bytes, // folding them into the running hash and entry CRC as enabled. func (scanner *scanner) use(n int) error { chunk := scanner.buf[scanner.off : scanner.off+n] if scanner.hashing { _, err := scanner.hash.Write(chunk) if err != nil { return fmt.Errorf("object/store/packed/internal/ingest: hashing pack: %w", err) } } if scanner.crcing { scanner.crc = crc32.Update(scanner.crc, crc32.IEEETable, chunk) } scanner.off += n scanner.consumed += n return nil } // flushPrefix writes the consumed buffer prefix to the destination // and compacts the unread window to the start of the buffer. func (scanner *scanner) flushPrefix() error { if scanner.off == 0 { return nil } _, err := scanner.dst.Write(scanner.buf[:scanner.off]) if err != nil { return fmt.Errorf("object/store/packed/internal/ingest: writing pack: %w", err) } unread := scanner.n - scanner.off copy(scanner.buf, scanner.buf[scanner.off:scanner.n]) scanner.off = 0 scanner.n = unread return nil } // beginCRC starts CRC accumulation for one entry. func (scanner *scanner) beginCRC() { scanner.crc = 0 scanner.crcing = true } // endCRC ends CRC accumulation and returns the entry CRC. func (scanner *scanner) endCRC() uint32 { crc := scanner.crc scanner.crc = 0 scanner.crcing = false return crc } // finishTrailer reads and verifies the pack trailer hash, // flushing the remaining buffered pack bytes to the destination. // // The trailer is mirrored to the destination but excluded from the pack hash. func (scanner *scanner) finishTrailer(hashSize int) ([]byte, error) { trailer := make([]byte, hashSize) scanner.hashing = false _, err := io.ReadFull(scanner, trailer) if err != nil { return nil, fmt.Errorf("%w: reading trailer: %w", ErrMalformedPack, err) } if scanner.n-scanner.off > 0 { return nil, fmt.Errorf("%w: trailing data after pack", ErrMalformedPack) } if !bytes.Equal(scanner.hash.Sum(nil), trailer) { return nil, fmt.Errorf("%w: trailer hash mismatch", ErrMalformedPack) } err = scanner.flushPrefix() if err != nil { return nil, err } return trailer, nil } // streamAndScan streams the pack body to the temporary pack file, // scanning one record per declared object and verifying the trailer. func (ingestion *ingestion) streamAndScan() error { meter := progress.New(progress.Options{ Writer: ingestion.opts.Progress, Title: "receiving objects", Total: ingestion.headerCount, Delay: 0, Sparse: false, Throughput: true, }) for done := range ingestion.headerCount { err := ingestion.scanEntry(ingestion.scanner.consumed) if err != nil { return err } meter.Set(done+1, ingestion.scanner.consumed) } meter.Stop("done") trailer, err := ingestion.scanner.finishTrailer(ingestion.objectFormat.Size()) if err != nil { return err } packHash, err := ingestion.objectFormat.FromBytes(trailer) if err != nil { return fmt.Errorf("object/store/packed/internal/ingest: %w", err) } ingestion.packHash = packHash return nil } // scanEntry scans the entry beginning at start into one record. func (ingestion *ingestion) scanEntry(start int) error { ingestion.scanner.beginCRC() rec, err := ingestion.scanHeader(start) if err != nil { return err } inflated, oid, err := ingestion.drainPayload(&rec) if err != nil { return err } if inflated != int64(rec.declaredSize) { return fmt.Errorf( "%w: entry at %d: inflated size %d differs from declared %d", ErrMalformedPack, start, inflated, rec.declaredSize, ) } rec.packedLen = ingestion.scanner.consumed - start rec.crc32 = ingestion.scanner.endCRC() if rec.packedType.IsBase() { rec.objectType = rec.packedType rec.oid = oid rec.resolved = true } else { ingestion.deltaCount++ } index := len(ingestion.records) ingestion.records = append(ingestion.records, rec) ingestion.byOffset[rec.offset] = index if rec.resolved { ingestion.byOID[rec.oid] = index } return nil } // scanHeader parses and consumes the entry header at start. func (ingestion *ingestion) scanHeader(start int) (record, error) { var rec record rec.offset = start window, err := ingestion.scanner.peekHeader(packfile.MaxEntryHeaderLen(ingestion.objectFormat.Size())) if err != nil { return rec, err } entryHeader, err := packfile.ParseEntryHeader(window, ingestion.objectFormat.Size()) if err != nil { return rec, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, start, err) } declaredSize, err := intconv.Uint64ToInt(entryHeader.Size) if err != nil { return rec, fmt.Errorf("%w: entry at %d: declared size overflows int: %w", ErrMalformedPack, start, err) } rec.packedType = entryHeader.Type rec.declaredSize = declaredSize rec.headerLen = entryHeader.HeaderLen switch entryHeader.Type { case packfile.EntryTypeCommit, packfile.EntryTypeTree, packfile.EntryTypeBlob, packfile.EntryTypeTag: case packfile.EntryTypeOfsDelta: dist, err := intconv.Uint64ToInt(entryHeader.OfsDistance) if err != nil || dist == 0 || dist > start { return rec, fmt.Errorf("%w: entry at %d: ofs-delta base out of bounds", ErrMalformedPack, start) } rec.baseOffset = start - dist case packfile.EntryTypeRefDelta: baseID, err := ingestion.objectFormat.FromBytes(entryHeader.RefBase[:ingestion.objectFormat.Size()]) if err != nil { return rec, fmt.Errorf("object/store/packed/internal/ingest: %w", err) } rec.baseOID = baseID case packfile.EntryTypeInvalid, packfile.EntryTypeFuture: return rec, fmt.Errorf("%w: entry at %d: unsupported entry type", ErrMalformedPack, start) } err = ingestion.scanner.use(entryHeader.HeaderLen) if err != nil { return rec, err } return rec, nil } // drainPayload consumes one entry's compressed payload from the stream, // returning its inflated length and, for base entries, its object ID. func (ingestion *ingestion) drainPayload(rec *record) (int64, id.ObjectID, error) { var zero id.ObjectID zr, err := zlib.NewReader(ingestion.scanner) if err != nil { return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) } defer func() { _ = zr.Close() }() if !rec.packedType.IsBase() { read, err := io.Copy(io.Discard, zr) if err != nil { return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) } return read, zero, nil } objectType, err := rec.packedType.ObjectType() if err != nil { return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) } hashImpl, err := ingestion.objectFormat.New() if err != nil { return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) } _, _ = hashImpl.Write(header.Append(nil, objectType, rec.declaredSize)) read, err := io.Copy(hashImpl, zr) if err != nil { return 0, zero, fmt.Errorf("%w: entry at %d: %w", ErrMalformedPack, rec.offset, err) } oid, err := ingestion.objectFormat.FromBytes(hashImpl.Sum(nil)) if err != nil { return 0, zero, fmt.Errorf("object/store/packed/internal/ingest: %w", err) } return read, oid, nil }