package loose
import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"hash"
"io/fs"
"os"
"path/filepath"
"lindenii.org/go/furgit/internal/compress/zlib"
"lindenii.org/go/furgit/object/header"
"lindenii.org/go/furgit/object/id"
"lindenii.org/go/furgit/object/store"
"lindenii.org/go/lgo/intconv"
)
const tempObjectFilePrefix = "tmp_obj_"
// streamWriter incrementally hashes and deflates an object into a temp file.
// finalize validates size accounting and atomically renames the temp file.
type streamWriter struct {
// loose owns path and root operations used by this write session.
loose *Loose
// file is the temporary destination file under objects/.
file *os.File
// zw compresses raw object bytes into file.
zw *zlib.Writer
// hash receives the same raw bytes used to compute the resulting object ID.
hash hash.Hash
// tmpRelPath is the relative path of file under the objects root.
tmpRelPath string
// fullMode selects full-object input ("type size\x00content")
// as opposed to content-only input.
fullMode bool
// headerBuf accumulates header bytes while fullMode parses up to the first NUL.
headerBuf []byte
// headerDone reports whether the full-object header has been parsed.
headerDone bool
// expectedContentLeft tracks remaining declared content bytes.
expectedContentLeft uint64
closed bool
finalized bool
}
// Write validates and writes raw bytes into the stream.
// In full mode, it parses and enforces the streamed header-declared content size.
func (writer *streamWriter) Write(src []byte) (int, error) {
if writer.finalized {
return 0, fmt.Errorf("%w: write after finalize", store.ErrInvalidObject)
}
if writer.closed {
return 0, fmt.Errorf("%w: write after close", store.ErrInvalidObject)
}
if writer.fullMode {
err := writer.acceptFull(src)
if err != nil {
return 0, err
}
} else {
n, err := intconv.IntToUint64(len(src))
if err != nil {
return 0, fmt.Errorf("object/store/loose: %w", err)
}
err = writer.acceptContent(n)
if err != nil {
return 0, err
}
}
err := writer.writeRawChunk(src)
if err != nil {
return 0, err
}
return len(src), nil
}
// Close flushes and closes the underlying zlib stream and temp file.
func (writer *streamWriter) Close() error {
errZlib := writer.zw.Close()
errSync := writer.file.Sync()
errFile := writer.file.Close()
writer.closed = true
writer.file = nil
return errors.Join(errZlib, errSync, errFile)
}
// acceptFull validates and accounts raw full-object input.
func (writer *streamWriter) acceptFull(src []byte) error {
if writer.headerDone {
n, err := intconv.IntToUint64(len(src))
if err != nil {
return fmt.Errorf("object/store/loose: %w", err)
}
return writer.acceptContent(n)
}
nul := bytes.IndexByte(src, 0)
if nul < 0 {
writer.headerBuf = append(writer.headerBuf, src...)
return nil
}
headerChunkLen := nul + 1
writer.headerBuf = append(writer.headerBuf, src[:headerChunkLen]...)
_, size, _, err := header.Parse(writer.headerBuf)
if err != nil {
return fmt.Errorf("%w: %w", store.ErrInvalidObject, err)
}
writer.headerDone = true
writer.expectedContentLeft = size
rest, err := intconv.IntToUint64(len(src) - headerChunkLen)
if err != nil {
return fmt.Errorf("object/store/loose: %w", err)
}
return writer.acceptContent(rest)
}
// acceptContent validates and accounts content byte counts.
func (writer *streamWriter) acceptContent(n uint64) error {
if n > writer.expectedContentLeft {
return fmt.Errorf("%w: object content exceeds declared size", store.ErrInvalidObject)
}
writer.expectedContentLeft -= n
return nil
}
// writeRawChunk forwards raw bytes to the hash and deflate pipeline.
func (writer *streamWriter) writeRawChunk(src []byte) error {
_, err := writer.hash.Write(src)
if err != nil {
return fmt.Errorf("object/store/loose: %w", err)
}
_, err = writer.zw.Write(src)
if err != nil {
return fmt.Errorf("object/store/loose: %w", err)
}
return nil
}
// finalize validates write completeness and atomically publishes the object.
// Publication is no-clobber: it links tmpRelPath to the object path and treats
// existing destination objects as success.
func (writer *streamWriter) finalize() (id.ObjectID, error) {
writer.finalized = true
var zero id.ObjectID
if !writer.closed {
err := writer.Close()
if err != nil {
return zero, err
}
}
if writer.fullMode && !writer.headerDone {
return zero, fmt.Errorf("%w: missing full object header", store.ErrInvalidObject)
}
if writer.expectedContentLeft != 0 {
return zero, fmt.Errorf("%w: object content shorter than declared size", store.ErrInvalidObject)
}
idBytes := writer.hash.Sum(nil)
objectID, err := writer.loose.objectFormat.FromBytes(idBytes)
if err != nil {
return zero, fmt.Errorf("object/store/loose: %w", err)
}
relPath, err := writer.loose.objectPath(objectID)
if err != nil {
return zero, err
}
dir := filepath.Dir(relPath)
err = writer.loose.root.MkdirAll(dir, 0o755)
if err != nil {
return zero, fmt.Errorf("object/store/loose: %w", err)
}
cleanup := true
defer func() {
if cleanup {
_ = writer.loose.root.Remove(writer.tmpRelPath)
}
}()
err = writer.loose.root.Link(writer.tmpRelPath, relPath)
if err != nil {
if errors.Is(err, fs.ErrExist) {
cleanup = false
_ = writer.loose.root.Remove(writer.tmpRelPath)
return objectID, nil
}
return zero, fmt.Errorf("object/store/loose: %w", err)
}
cleanup = false
_ = writer.loose.root.Remove(writer.tmpRelPath)
return objectID, nil
}
// newStreamWriter creates a stream writer with a temp file rooted in objects/.
func (loose *Loose) newStreamWriter(fullMode bool) (*streamWriter, error) {
hashFn, err := loose.objectFormat.New()
if err != nil {
return nil, fmt.Errorf("object/store/loose: %w", err)
}
tmpRelPath, file, err := loose.createTempObjectFile(".")
if err != nil {
return nil, err
}
return &streamWriter{
loose: loose,
file: file,
zw: zlib.NewWriter(file),
hash: hashFn,
tmpRelPath: tmpRelPath,
fullMode: fullMode,
headerBuf: make([]byte, 0, 64),
}, nil
}
// createTempObjectFile creates a unique temporary object file within dir.
// The returned path is relative to the objects root.
func (loose *Loose) createTempObjectFile(dir string) (string, *os.File, error) {
var lastErr error
for range 16 {
relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text())
file, err := loose.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
if err == nil {
return relPath, file, nil
}
lastErr = err
if errors.Is(err, fs.ErrExist) {
continue
}
return "", nil, fmt.Errorf("object/store/loose: %w", err)
}
return "", nil, fmt.Errorf("object/store/loose: failed to create temporary object file: %w", lastErr)
}