From f0fb5737fd6437c5776e8c5c25563e355f0ceef3 Mon Sep 17 00:00:00 2001 From: Runxi Yu Date: Wed, 24 Jun 2026 04:51:05 +0000 Subject: internal/progress: Concurrent --- internal/progress/meter.go | 126 +++++++++++++++++++++++++--------------- internal/progress/meter_test.go | 47 +++++++++++++++ internal/progress/render.go | 13 +++-- 3 files changed, 134 insertions(+), 52 deletions(-) create mode 100644 internal/progress/meter_test.go (limited to 'internal/progress') diff --git a/internal/progress/meter.go b/internal/progress/meter.go index e5e64fb4..9d4f1155 100644 --- a/internal/progress/meter.go +++ b/internal/progress/meter.go @@ -1,17 +1,22 @@ package progress import ( + "sync/atomic" "time" "lindenii.org/go/lgo/iowrap" ) const ( - updateInterval = time.Second + renderInterval = 100 * time.Millisecond + forceInterval = time.Second throughputInterval = 500 * time.Millisecond ) // Meter renders one in-place progress line. +// +// Add is safe for concurrent use; a single background goroutine renders. +// Stop must be called exactly once to flush the final line and release it. type Meter struct { writer iowrap.WriteFlusher @@ -21,24 +26,29 @@ type Meter struct { sparse bool throughput bool - startedAt time.Time - nextUpdateAt time.Time - nextThroughput time.Time + done atomic.Int64 + bytes atomic.Int64 + sawValue atomic.Bool - lastDone int - lastBytes int - lastPercent int - lastCounterW int - sawValue bool + startedAt time.Time + stop chan struct{} + exited chan struct{} + + // The following are owned by the render goroutine while it runs, + // then by Stop once exited is closed. + nextForceAt time.Time + nextThroughput time.Time + lastPercent int + lastCounterW int throughputSuffix string } -// New creates one progress meter. +// New creates one progress meter and starts its render goroutine. func New(opts Options) *Meter { now := time.Now() - return &Meter{ + meter := &Meter{ writer: opts.Writer, title: opts.Title, total: opts.Total, @@ -46,10 +56,20 @@ func New(opts Options) *Meter { sparse: opts.Sparse, throughput: opts.Throughput, startedAt: now, - nextUpdateAt: now.Add(updateInterval), + stop: make(chan struct{}), + exited: make(chan struct{}), + nextForceAt: now.Add(forceInterval), nextThroughput: now.Add(throughputInterval), lastPercent: -1, } + + if meter.writer != nil { + go meter.loop() + } else { + close(meter.exited) + } + + return meter } // Options configures one progress meter. @@ -67,59 +87,71 @@ type Options struct { Throughput bool } -// Set records current progress -// and renders when percent changed or the 1s tick elapsed. -func (meter *Meter) Set(done int, bytes int) { - meter.lastDone = done - meter.lastBytes = bytes - meter.sawValue = true +// Add increments the done and byte counters. +// +// Labels: MT-Safe. +func (meter *Meter) Add(done, bytes int64) { + meter.done.Add(done) + meter.bytes.Add(bytes) + meter.sawValue.Store(true) +} + +// Stop ends the render goroutine, forces the final line, and appends ", .". +func (meter *Meter) Stop(msg string) { + close(meter.stop) + <-meter.exited - if meter.writer == nil { + if !meter.sawValue.Load() || meter.writer == nil { return } - now := time.Now() - forced := meter.consumeUpdateTick(now) - - percentChanged := false - - if meter.total > 0 { - percent := int(int64(done) * 100 / int64(meter.total)) - percentChanged = percent != meter.lastPercent + if msg == "" { + msg = "done" } - if !percentChanged && !forced { - return + if meter.sparse && meter.total > 0 && int(meter.done.Load()) != meter.total { + meter.done.Store(int64(meter.total)) } - meter.render(now, "\r") + meter.render(time.Now(), ", "+msg+".\n") } -// Stop forces the final progress line and appends ", .". -func (meter *Meter) Stop(msg string) { - if !meter.sawValue || meter.writer == nil { - return - } +func (meter *Meter) loop() { + defer close(meter.exited) - if msg == "" { - msg = "done" + ticker := time.NewTicker(renderInterval) + defer ticker.Stop() + + for { + select { + case <-meter.stop: + return + case now := <-ticker.C: + meter.maybeRender(now) + } } +} - if meter.sparse && meter.total > 0 && meter.lastDone != meter.total { - meter.lastDone = meter.total +func (meter *Meter) maybeRender(now time.Time) { + if !meter.sawValue.Load() { + return } - meter.render(time.Now(), ", "+msg+".\n") -} + forced := false -func (meter *Meter) consumeUpdateTick(now time.Time) bool { - if now.Before(meter.nextUpdateAt) { - return false + for !now.Before(meter.nextForceAt) { + meter.nextForceAt = meter.nextForceAt.Add(forceInterval) + forced = true } - for !now.Before(meter.nextUpdateAt) { - meter.nextUpdateAt = meter.nextUpdateAt.Add(updateInterval) + percentChanged := false + + if meter.total > 0 { + percent := int(meter.done.Load() * 100 / int64(meter.total)) + percentChanged = percent != meter.lastPercent } - return true + if percentChanged || forced { + meter.render(now, "\r") + } } diff --git a/internal/progress/meter_test.go b/internal/progress/meter_test.go new file mode 100644 index 00000000..62b489e3 --- /dev/null +++ b/internal/progress/meter_test.go @@ -0,0 +1,47 @@ +package progress_test + +import ( + "bytes" + "strings" + "sync" + "testing" + "time" + + "lindenii.org/go/furgit/internal/progress" + "lindenii.org/go/lgo/iowrap" +) + +func TestMeterConcurrentAdd(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + + meter := progress.New(progress.Options{ + Writer: iowrap.NopFlush(&buf), + Title: "test", + Total: 1000, + }) + + var wg sync.WaitGroup + + for range 10 { + wg.Add(1) + + go func() { + defer wg.Done() + + for range 100 { + meter.Add(1, 0) + time.Sleep(time.Millisecond) + } + }() + } + + wg.Wait() + + meter.Stop("done") + + if got := buf.String(); !strings.Contains(got, "100% (1000/1000)") { + t.Fatalf("final line = %q, want it to contain %q", got, "100% (1000/1000)") + } +} diff --git a/internal/progress/render.go b/internal/progress/render.go index 814ced98..44b9c252 100644 --- a/internal/progress/render.go +++ b/internal/progress/render.go @@ -40,13 +40,15 @@ func (meter *Meter) render(now time.Time, eol string) { } func (meter *Meter) renderCounters() string { + done := meter.done.Load() + if meter.total > 0 { - meter.lastPercent = int(int64(meter.lastDone) * 100 / int64(meter.total)) + meter.lastPercent = int(done * 100 / int64(meter.total)) - return fmt.Sprintf("%3d%% (%d/%d)%s", meter.lastPercent, meter.lastDone, meter.total, meter.throughputSuffix) + return fmt.Sprintf("%3d%% (%d/%d)%s", meter.lastPercent, done, meter.total, meter.throughputSuffix) } - return fmt.Sprintf("%d%s", meter.lastDone, meter.throughputSuffix) + return fmt.Sprintf("%d%s", done, meter.throughputSuffix) } func (meter *Meter) refreshThroughput(now time.Time) { @@ -67,6 +69,7 @@ func (meter *Meter) refreshThroughput(now time.Time) { return } - rate := uint64(float64(meter.lastBytes) / elapsed.Seconds()) - meter.throughputSuffix = ", " + humanize.Bytes(uint64(meter.lastBytes)) + " | " + humanize.Bytes(rate) + "/s" //nolint:gosec + bytes := meter.bytes.Load() + rate := uint64(float64(bytes) / elapsed.Seconds()) + meter.throughputSuffix = ", " + humanize.Bytes(uint64(bytes)) + " | " + humanize.Bytes(rate) + "/s" //nolint:gosec } -- cgit v1.3.1-10-gc9f91