Fix deadlock in IncrementalDispatcher (#2632)

This commit is contained in:
David Zhao
2024-04-06 10:36:02 -07:00
committed by GitHub
parent 8334149034
commit 0712719c2f
2 changed files with 8 additions and 1 deletions

View File

@@ -48,8 +48,10 @@ func (d *IncrementalDispatcher[T]) Add(item T) {
}
func (d *IncrementalDispatcher[T]) Done() {
d.lock.Lock()
d.done.Break()
d.cond.Broadcast()
d.lock.Unlock()
}
func (d *IncrementalDispatcher[T]) ForEach(fn func(T)) {
@@ -69,6 +71,11 @@ func (d *IncrementalDispatcher[T]) ForEach(fn func(T)) {
for !d.done.IsBroken() {
dispatchFromIdx()
d.lock.Lock()
// need to check again because Done may have been called while dispatching
if d.done.IsBroken() {
d.lock.Unlock()
break
}
if idx == len(d.items) {
d.cond.Wait()
}

View File

@@ -48,7 +48,7 @@ func TestForEach(t *testing.T) {
func TestConcurrentConsumption(t *testing.T) {
producer := utils.NewIncrementalDispatcher[int]()
numConsumers := 5
numConsumers := 100
sums := make([]atomic.Int32, numConsumers)
var wg sync.WaitGroup