mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 15:49:55 +00:00
87 lines
1.9 KiB
Go
87 lines
1.9 KiB
Go
/*
|
|
* Copyright 2024 LiveKit, Inc
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package utils
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/frostbyte73/core"
|
|
)
|
|
|
|
// IncrementalDispatcher is a dispatcher that allows multiple consumers to consume items as they become
|
|
// available, while producers can add items at anytime.
|
|
type IncrementalDispatcher[T any] struct {
|
|
done core.Fuse
|
|
lock sync.RWMutex
|
|
cond *sync.Cond
|
|
items []T
|
|
}
|
|
|
|
func NewIncrementalDispatcher[T any]() *IncrementalDispatcher[T] {
|
|
p := &IncrementalDispatcher[T]{}
|
|
p.cond = sync.NewCond(&p.lock)
|
|
return p
|
|
}
|
|
|
|
func (d *IncrementalDispatcher[T]) Add(item T) {
|
|
if d.done.IsBroken() {
|
|
return
|
|
}
|
|
d.lock.Lock()
|
|
d.items = append(d.items, item)
|
|
d.cond.Broadcast()
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
func (d *IncrementalDispatcher[T]) Done() {
|
|
d.lock.Lock()
|
|
d.done.Break()
|
|
d.cond.Broadcast()
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
func (d *IncrementalDispatcher[T]) ForEach(fn func(T)) {
|
|
idx := 0
|
|
dispatchFromIdx := func() {
|
|
var itemsToDispatch []T
|
|
d.lock.RLock()
|
|
for idx < len(d.items) {
|
|
itemsToDispatch = append(itemsToDispatch, d.items[idx])
|
|
idx++
|
|
}
|
|
d.lock.RUnlock()
|
|
for _, item := range itemsToDispatch {
|
|
fn(item)
|
|
}
|
|
}
|
|
for !d.done.IsBroken() {
|
|
dispatchFromIdx()
|
|
d.lock.Lock()
|
|
// need to check again because Done may have been called while dispatching
|
|
if d.done.IsBroken() {
|
|
d.lock.Unlock()
|
|
break
|
|
}
|
|
if idx == len(d.items) {
|
|
d.cond.Wait()
|
|
}
|
|
d.lock.Unlock()
|
|
}
|
|
|
|
dispatchFromIdx()
|
|
}
|