package loose import ( "bytes" "crypto/rand" "errors" "hash" "io/fs" "os" "path/filepath" "codeberg.org/lindenii/furgit/internal/zlib" "codeberg.org/lindenii/furgit/objectheader" "codeberg.org/lindenii/furgit/objectid" ) const tempObjectFilePrefix = "tmp_obj_" // streamWriter incrementally hashes and deflates an object into a temp file. // Finalize validates size accounting and atomically renames the temp file. type streamWriter struct { // store owns path and root operations used by this write session. store *Store // file is the temporary destination file under objects/. file *os.File // zw compresses raw object bytes into file. zw *zlib.Writer // hash receives the same raw bytes used to compute the resulting object ID. hash hash.Hash // tmpRelPath is the relative path of file under the objects root. tmpRelPath string // fullMode selects full-object input ("type size\0content") as opposed to content-only input. fullMode bool // headerBuf accumulates header bytes while fullMode parses up to the first NUL. headerBuf []byte // headerDone reports whether the full-object header has been parsed. headerDone bool // expectedContentLeft tracks remaining declared content bytes. expectedContentLeft int64 closed bool finalized bool finalID objectid.ObjectID finalErr error } // newStreamWriter creates a stream writer with a temp file rooted in objects/. func (store *Store) newStreamWriter(fullMode bool) (*streamWriter, error) { hashFn, err := store.algo.New() if err != nil { return nil, err } tmpRelPath, file, err := store.createTempObjectFile(".") if err != nil { return nil, err } return &streamWriter{ store: store, file: file, zw: zlib.NewWriter(file), hash: hashFn, tmpRelPath: tmpRelPath, fullMode: fullMode, headerBuf: make([]byte, 0, 64), }, nil } // Write validates and writes raw bytes into the stream. // In full mode, it parses and enforces the streamed header-declared content size. func (writer *streamWriter) Write(src []byte) (int, error) { if writer.finalized { return 0, errors.New("objectstore/loose: write after finalize") } if writer.closed { return 0, errors.New("objectstore/loose: write after close") } if writer.fullMode { if err := writer.acceptFull(src); err != nil { return 0, err } } else { if err := writer.acceptContent(int64(len(src))); err != nil { return 0, err } } if err := writer.writeRawChunk(src); err != nil { return 0, err } return len(src), nil } // Close flushes and closes the underlying zlib stream and temp file. // It is safe to call multiple times. func (writer *streamWriter) Close() error { if writer.closed { return nil } writer.closed = true errZlib := writer.zw.Close() errSync := writer.file.Sync() errFile := writer.file.Close() writer.file = nil return errors.Join(errZlib, errSync, errFile) } // finalize validates write completeness and atomically publishes the object. // Publication is no-clobber: it links tmpRelPath to the object path and treats // existing destination objects as success. func (writer *streamWriter) finalize() (objectid.ObjectID, error) { if writer.finalized { return writer.finalID, writer.finalErr } writer.finalized = true var zero objectid.ObjectID if !writer.closed { if err := writer.Close(); err != nil { writer.finalErr = err return zero, err } } if writer.fullMode && !writer.headerDone { writer.finalErr = errors.New("objectstore/loose: missing full object header") return zero, writer.finalErr } if writer.expectedContentLeft != 0 { writer.finalErr = errors.New("objectstore/loose: object content shorter than declared size") return zero, writer.finalErr } idBytes := writer.hash.Sum(nil) id, err := objectid.FromBytes(writer.store.algo, idBytes) if err != nil { writer.finalErr = err return zero, err } relPath, err := writer.store.objectPath(id) if err != nil { writer.finalErr = err return zero, err } dir := filepath.Dir(relPath) if err := writer.store.root.MkdirAll(dir, 0o755); err != nil { writer.finalErr = err return zero, err } cleanup := true defer func() { if cleanup { _ = writer.store.root.Remove(writer.tmpRelPath) } }() if err := writer.store.root.Link(writer.tmpRelPath, relPath); err != nil { if errors.Is(err, fs.ErrExist) { writer.finalID = id cleanup = false _ = writer.store.root.Remove(writer.tmpRelPath) return id, nil } writer.finalErr = err return zero, err } writer.finalID = id cleanup = false return id, nil } // acceptFull validates and accounts raw full-object input. func (writer *streamWriter) acceptFull(src []byte) error { if !writer.headerDone { if nul := bytes.IndexByte(src, 0); nul >= 0 { headerChunkLen := nul + 1 writer.headerBuf = append(writer.headerBuf, src[:headerChunkLen]...) _, size, _, ok := objectheader.Parse(writer.headerBuf) if !ok { return errors.New("objectstore/loose: malformed object header") } writer.headerDone = true writer.expectedContentLeft = size return writer.acceptContent(int64(len(src) - headerChunkLen)) } writer.headerBuf = append(writer.headerBuf, src...) return nil } return writer.acceptContent(int64(len(src))) } // acceptContent validates and accounts content byte counts. func (writer *streamWriter) acceptContent(n int64) error { if n > writer.expectedContentLeft { return errors.New("objectstore/loose: object content exceeds declared size") } writer.expectedContentLeft -= n return nil } // writeRawChunk forwards raw bytes to the hash and deflate pipeline. func (writer *streamWriter) writeRawChunk(src []byte) error { if _, err := writer.hash.Write(src); err != nil { return err } if _, err := writer.zw.Write(src); err != nil { return err } return nil } // createTempObjectFile creates a unique temporary object file within dir. // The returned path is relative to the objects root. func (store *Store) createTempObjectFile(dir string) (string, *os.File, error) { for range 16 { relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text()) file, err := store.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644) if err == nil { return relPath, file, nil } if errors.Is(err, fs.ErrExist) { continue } return "", nil, err } return "", nil, errors.New("objectstore/loose: failed to create temporary object file") }