Files
livekit/pkg/utils/incrementaldispatcher.go
2025-04-25 12:21:30 +05:30

87 lines
1.9 KiB
Go

/*
* Copyright 2024 LiveKit, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"sync"
"github.com/frostbyte73/core"
)
// IncrementalDispatcher is a dispatcher that allows multiple consumers to consume items as they become
// available, while producers can add items at anytime.
type IncrementalDispatcher[T any] struct {
done core.Fuse
lock sync.RWMutex
cond *sync.Cond
items []T
}
func NewIncrementalDispatcher[T any]() *IncrementalDispatcher[T] {
p := &IncrementalDispatcher[T]{}
p.cond = sync.NewCond(&p.lock)
return p
}
func (d *IncrementalDispatcher[T]) Add(item T) {
if d.done.IsBroken() {
return
}
d.lock.Lock()
d.items = append(d.items, item)
d.cond.Broadcast()
d.lock.Unlock()
}
func (d *IncrementalDispatcher[T]) Done() {
d.lock.Lock()
d.done.Break()
d.cond.Broadcast()
d.lock.Unlock()
}
func (d *IncrementalDispatcher[T]) ForEach(fn func(T)) {
idx := 0
dispatchFromIdx := func() {
var itemsToDispatch []T
d.lock.RLock()
for idx < len(d.items) {
itemsToDispatch = append(itemsToDispatch, d.items[idx])
idx++
}
d.lock.RUnlock()
for _, item := range itemsToDispatch {
fn(item)
}
}
for !d.done.IsBroken() {
dispatchFromIdx()
d.lock.Lock()
// need to check again because Done may have been called while dispatching
if d.done.IsBroken() {
d.lock.Unlock()
break
}
if idx == len(d.items) {
d.cond.Wait()
}
d.lock.Unlock()
}
dispatchFromIdx()
}