From 0712719c2fdb33fb8a5895601f39226df85e2c69 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 6 Apr 2024 10:36:02 -0700 Subject: [PATCH] Fix deadlock in IncrementalDispatcher (#2632) --- pkg/utils/incrementaldispatcher.go | 7 +++++++ pkg/utils/incrementaldispatcher_test.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/utils/incrementaldispatcher.go b/pkg/utils/incrementaldispatcher.go index dcc23d55c..2aaaf9ffd 100644 --- a/pkg/utils/incrementaldispatcher.go +++ b/pkg/utils/incrementaldispatcher.go @@ -48,8 +48,10 @@ func (d *IncrementalDispatcher[T]) Add(item T) { } func (d *IncrementalDispatcher[T]) Done() { + d.lock.Lock() d.done.Break() d.cond.Broadcast() + d.lock.Unlock() } func (d *IncrementalDispatcher[T]) ForEach(fn func(T)) { @@ -69,6 +71,11 @@ func (d *IncrementalDispatcher[T]) ForEach(fn func(T)) { for !d.done.IsBroken() { dispatchFromIdx() d.lock.Lock() + // need to check again because Done may have been called while dispatching + if d.done.IsBroken() { + d.lock.Unlock() + break + } if idx == len(d.items) { d.cond.Wait() } diff --git a/pkg/utils/incrementaldispatcher_test.go b/pkg/utils/incrementaldispatcher_test.go index da1393c64..828019559 100644 --- a/pkg/utils/incrementaldispatcher_test.go +++ b/pkg/utils/incrementaldispatcher_test.go @@ -48,7 +48,7 @@ func TestForEach(t *testing.T) { func TestConcurrentConsumption(t *testing.T) { producer := utils.NewIncrementalDispatcher[int]() - numConsumers := 5 + numConsumers := 100 sums := make([]atomic.Int32, numConsumers) var wg sync.WaitGroup