package loose import ( "bytes" "crypto/rand" "errors" "fmt" "hash" "io/fs" "os" "path/filepath" "lindenii.org/go/furgit/internal/compress/zlib" "lindenii.org/go/furgit/object/header" "lindenii.org/go/furgit/object/id" "lindenii.org/go/furgit/object/store" "lindenii.org/go/lgo/intconv" ) const tempObjectFilePrefix = "tmp_obj_" // streamWriter incrementally hashes and deflates an object into a temp file. // finalize validates size accounting and atomically renames the temp file. type streamWriter struct { // loose owns path and root operations used by this write session. loose *Loose // file is the temporary destination file under objects/. file *os.File // zw compresses raw object bytes into file. zw *zlib.Writer // hash receives the same raw bytes used to compute the resulting object ID. hash hash.Hash // tmpRelPath is the relative path of file under the objects root. tmpRelPath string // fullMode selects full-object input ("type size\x00content") // as opposed to content-only input. fullMode bool // headerBuf accumulates header bytes while fullMode parses up to the first NUL. headerBuf []byte // headerDone reports whether the full-object header has been parsed. headerDone bool // expectedContentLeft tracks remaining declared content bytes. expectedContentLeft uint64 closed bool finalized bool } // Write validates and writes raw bytes into the stream. // In full mode, it parses and enforces the streamed header-declared content size. func (writer *streamWriter) Write(src []byte) (int, error) { if writer.finalized { return 0, fmt.Errorf("%w: write after finalize", store.ErrInvalidObject) } if writer.closed { return 0, fmt.Errorf("%w: write after close", store.ErrInvalidObject) } if writer.fullMode { err := writer.acceptFull(src) if err != nil { return 0, err } } else { n, err := intconv.IntToUint64(len(src)) if err != nil { return 0, fmt.Errorf("object/store/loose: %w", err) } err = writer.acceptContent(n) if err != nil { return 0, err } } err := writer.writeRawChunk(src) if err != nil { return 0, err } return len(src), nil } // Close flushes and closes the underlying zlib stream and temp file. func (writer *streamWriter) Close() error { errZlib := writer.zw.Close() errSync := writer.file.Sync() errFile := writer.file.Close() writer.closed = true writer.file = nil return errors.Join(errZlib, errSync, errFile) } // acceptFull validates and accounts raw full-object input. func (writer *streamWriter) acceptFull(src []byte) error { if writer.headerDone { n, err := intconv.IntToUint64(len(src)) if err != nil { return fmt.Errorf("object/store/loose: %w", err) } return writer.acceptContent(n) } nul := bytes.IndexByte(src, 0) if nul < 0 { writer.headerBuf = append(writer.headerBuf, src...) return nil } headerChunkLen := nul + 1 writer.headerBuf = append(writer.headerBuf, src[:headerChunkLen]...) _, size, _, err := header.Parse(writer.headerBuf) if err != nil { return fmt.Errorf("%w: %w", store.ErrInvalidObject, err) } writer.headerDone = true writer.expectedContentLeft = size rest, err := intconv.IntToUint64(len(src) - headerChunkLen) if err != nil { return fmt.Errorf("object/store/loose: %w", err) } return writer.acceptContent(rest) } // acceptContent validates and accounts content byte counts. func (writer *streamWriter) acceptContent(n uint64) error { if n > writer.expectedContentLeft { return fmt.Errorf("%w: object content exceeds declared size", store.ErrInvalidObject) } writer.expectedContentLeft -= n return nil } // writeRawChunk forwards raw bytes to the hash and deflate pipeline. func (writer *streamWriter) writeRawChunk(src []byte) error { _, err := writer.hash.Write(src) if err != nil { return fmt.Errorf("object/store/loose: %w", err) } _, err = writer.zw.Write(src) if err != nil { return fmt.Errorf("object/store/loose: %w", err) } return nil } // finalize validates write completeness and atomically publishes the object. // Publication is no-clobber: it links tmpRelPath to the object path and treats // existing destination objects as success. func (writer *streamWriter) finalize() (id.ObjectID, error) { writer.finalized = true var zero id.ObjectID if !writer.closed { err := writer.Close() if err != nil { return zero, err } } if writer.fullMode && !writer.headerDone { return zero, fmt.Errorf("%w: missing full object header", store.ErrInvalidObject) } if writer.expectedContentLeft != 0 { return zero, fmt.Errorf("%w: object content shorter than declared size", store.ErrInvalidObject) } idBytes := writer.hash.Sum(nil) objectID, err := writer.loose.objectFormat.FromBytes(idBytes) if err != nil { return zero, fmt.Errorf("object/store/loose: %w", err) } relPath, err := writer.loose.objectPath(objectID) if err != nil { return zero, err } dir := filepath.Dir(relPath) err = writer.loose.root.MkdirAll(dir, 0o755) if err != nil { return zero, fmt.Errorf("object/store/loose: %w", err) } cleanup := true defer func() { if cleanup { _ = writer.loose.root.Remove(writer.tmpRelPath) } }() err = writer.loose.root.Link(writer.tmpRelPath, relPath) if err != nil { if errors.Is(err, fs.ErrExist) { cleanup = false _ = writer.loose.root.Remove(writer.tmpRelPath) return objectID, nil } return zero, fmt.Errorf("object/store/loose: %w", err) } cleanup = false _ = writer.loose.root.Remove(writer.tmpRelPath) return objectID, nil } // newStreamWriter creates a stream writer with a temp file rooted in objects/. func (loose *Loose) newStreamWriter(fullMode bool) (*streamWriter, error) { hashFn, err := loose.objectFormat.New() if err != nil { return nil, fmt.Errorf("object/store/loose: %w", err) } tmpRelPath, file, err := loose.createTempObjectFile(".") if err != nil { return nil, err } return &streamWriter{ loose: loose, file: file, zw: zlib.NewWriter(file), hash: hashFn, tmpRelPath: tmpRelPath, fullMode: fullMode, headerBuf: make([]byte, 0, 64), }, nil } // createTempObjectFile creates a unique temporary object file within dir. // The returned path is relative to the objects root. func (loose *Loose) createTempObjectFile(dir string) (string, *os.File, error) { var lastErr error for range 16 { relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text()) file, err := loose.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644) if err == nil { return relPath, file, nil } lastErr = err if errors.Is(err, fs.ErrExist) { continue } return "", nil, fmt.Errorf("object/store/loose: %w", err) } return "", nil, fmt.Errorf("object/store/loose: failed to create temporary object file: %w", lastErr) }