mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 13:25:42 +00:00
Catching a few more races reported by go test -race ./... (#713)
* Fix a few more races * Make sure room store on leave has correct count * Revert logger change to reduce diff
This commit is contained in:
@@ -36,7 +36,7 @@ type RedisRouter struct {
|
||||
rc *redis.Client
|
||||
ctx context.Context
|
||||
isStarted atomic.Bool
|
||||
statsMu sync.Mutex
|
||||
nodeMu sync.RWMutex
|
||||
// previous stats for computing averages
|
||||
prevStats *livekit.NodeStats
|
||||
|
||||
@@ -54,7 +54,9 @@ func NewRedisRouter(currentNode LocalNode, rc *redis.Client) *RedisRouter {
|
||||
}
|
||||
|
||||
func (r *RedisRouter) RegisterNode() error {
|
||||
r.nodeMu.RLock()
|
||||
data, err := proto.Marshal((*livekit.Node)(r.currentNode))
|
||||
r.nodeMu.RUnlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -287,7 +289,9 @@ func (r *RedisRouter) Start() error {
|
||||
}
|
||||
|
||||
func (r *RedisRouter) Drain() {
|
||||
r.nodeMu.Lock()
|
||||
r.currentNode.State = livekit.NodeState_SHUTTING_DOWN
|
||||
r.nodeMu.Unlock()
|
||||
if err := r.RegisterNode(); err != nil {
|
||||
logger.Errorw("failed to mark as draining", err, "nodeID", r.currentNode.Id)
|
||||
}
|
||||
@@ -344,9 +348,9 @@ func (r *RedisRouter) statsWorker() {
|
||||
_ = r.WriteNodeRTC(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_KeepAlive{},
|
||||
})
|
||||
r.statsMu.Lock()
|
||||
r.nodeMu.RLock()
|
||||
stats := r.currentNode.Stats
|
||||
r.statsMu.Unlock()
|
||||
r.nodeMu.RUnlock()
|
||||
|
||||
delaySeconds := time.Now().Unix() - stats.UpdatedAt
|
||||
if delaySeconds > statsMaxDelaySeconds {
|
||||
@@ -470,21 +474,21 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error {
|
||||
break
|
||||
}
|
||||
|
||||
r.statsMu.Lock()
|
||||
r.nodeMu.Lock()
|
||||
if r.prevStats == nil {
|
||||
r.prevStats = r.currentNode.Stats
|
||||
}
|
||||
updated, computedAvg, err := prometheus.GetUpdatedNodeStats(r.currentNode.Stats, r.prevStats)
|
||||
if err != nil {
|
||||
logger.Errorw("could not update node stats", err)
|
||||
r.statsMu.Unlock()
|
||||
r.nodeMu.Unlock()
|
||||
return err
|
||||
}
|
||||
r.currentNode.Stats = updated
|
||||
if computedAvg {
|
||||
r.prevStats = updated
|
||||
}
|
||||
r.statsMu.Unlock()
|
||||
r.nodeMu.Unlock()
|
||||
|
||||
// TODO: check stats against config.Limit values
|
||||
if err := r.RegisterNode(); err != nil {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package rtc_test
|
||||
package rtc
|
||||
|
||||
import (
|
||||
"github.com/livekit/protocol/livekit"
|
||||
|
||||
@@ -36,8 +36,8 @@ type broadcastOptions struct {
|
||||
type Room struct {
|
||||
lock sync.RWMutex
|
||||
|
||||
Room *livekit.Room
|
||||
Logger logger.Logger
|
||||
protoRoom *livekit.Room
|
||||
Logger logger.Logger
|
||||
|
||||
config WebRTCConfig
|
||||
audioConfig *config.AudioConfig
|
||||
@@ -70,7 +70,7 @@ type ParticipantOptions struct {
|
||||
|
||||
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room {
|
||||
r := &Room{
|
||||
Room: proto.Clone(room).(*livekit.Room),
|
||||
protoRoom: proto.Clone(room).(*livekit.Room),
|
||||
Logger: LoggerWithRoom(logger.GetDefaultLogger(), livekit.RoomName(room.Name), livekit.RoomID(room.Sid)),
|
||||
config: config,
|
||||
audioConfig: audioConfig,
|
||||
@@ -81,11 +81,11 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC
|
||||
batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
if r.Room.EmptyTimeout == 0 {
|
||||
r.Room.EmptyTimeout = DefaultEmptyTimeout
|
||||
if r.protoRoom.EmptyTimeout == 0 {
|
||||
r.protoRoom.EmptyTimeout = DefaultEmptyTimeout
|
||||
}
|
||||
if r.Room.CreationTime == 0 {
|
||||
r.Room.CreationTime = time.Now().Unix()
|
||||
if r.protoRoom.CreationTime == 0 {
|
||||
r.protoRoom.CreationTime = time.Now().Unix()
|
||||
}
|
||||
|
||||
go r.audioUpdateWorker()
|
||||
@@ -95,12 +95,19 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Room) ToProto() *livekit.Room {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
|
||||
return proto.Clone(r.protoRoom).(*livekit.Room)
|
||||
}
|
||||
|
||||
func (r *Room) Name() livekit.RoomName {
|
||||
return livekit.RoomName(r.Room.Name)
|
||||
return livekit.RoomName(r.protoRoom.Name)
|
||||
}
|
||||
|
||||
func (r *Room) ID() livekit.RoomID {
|
||||
return livekit.RoomID(r.Room.Sid)
|
||||
return livekit.RoomID(r.protoRoom.Sid)
|
||||
}
|
||||
|
||||
func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.LocalParticipant {
|
||||
@@ -195,7 +202,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
|
||||
return ErrAlreadyJoined
|
||||
}
|
||||
|
||||
if r.Room.MaxParticipants > 0 && len(r.participants) >= int(r.Room.MaxParticipants) {
|
||||
if r.protoRoom.MaxParticipants > 0 && len(r.participants) >= int(r.protoRoom.MaxParticipants) {
|
||||
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1)
|
||||
return ErrMaxParticipantsExceeded
|
||||
}
|
||||
@@ -204,7 +211,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
|
||||
r.joinedAt.Store(time.Now().Unix())
|
||||
}
|
||||
if !participant.Hidden() {
|
||||
r.Room.NumParticipants++
|
||||
r.protoRoom.NumParticipants++
|
||||
}
|
||||
|
||||
// it's important to set this before connection, we don't want to miss out on any publishedTracks
|
||||
@@ -228,7 +235,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
|
||||
// start the workers once connectivity is established
|
||||
p.Start()
|
||||
|
||||
r.telemetry.ParticipantActive(context.Background(), r.Room, p.ToProto(), &livekit.AnalyticsClientMeta{ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds())})
|
||||
r.telemetry.ParticipantActive(context.Background(), r.ToProto(), p.ToProto(), &livekit.AnalyticsClientMeta{ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds())})
|
||||
} else if state == livekit.ParticipantInfo_DISCONNECTED {
|
||||
// remove participant from room
|
||||
go r.RemoveParticipant(p.Identity())
|
||||
@@ -256,8 +263,8 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
|
||||
"protocol", participant.ProtocolVersion(),
|
||||
"options", opts)
|
||||
|
||||
if participant.IsRecorder() && !r.Room.ActiveRecording {
|
||||
r.Room.ActiveRecording = true
|
||||
if participant.IsRecorder() && !r.protoRoom.ActiveRecording {
|
||||
r.protoRoom.ActiveRecording = true
|
||||
r.sendRoomUpdateLocked()
|
||||
}
|
||||
|
||||
@@ -283,7 +290,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
|
||||
}
|
||||
})
|
||||
|
||||
if err := participant.SendJoinResponse(r.Room, otherParticipants, iceServers, region); err != nil {
|
||||
if err := participant.SendJoinResponse(r.protoRoom, otherParticipants, iceServers, region); err != nil {
|
||||
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
|
||||
return err
|
||||
}
|
||||
@@ -325,12 +332,12 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) {
|
||||
delete(r.participants, identity)
|
||||
delete(r.participantOpts, identity)
|
||||
if !p.Hidden() {
|
||||
r.Room.NumParticipants--
|
||||
r.protoRoom.NumParticipants--
|
||||
}
|
||||
}
|
||||
|
||||
activeRecording := false
|
||||
if (p != nil && p.IsRecorder()) || p == nil && r.Room.ActiveRecording {
|
||||
if (p != nil && p.IsRecorder()) || p == nil && r.protoRoom.ActiveRecording {
|
||||
for _, op := range r.participants {
|
||||
if op.IsRecorder() {
|
||||
activeRecording = true
|
||||
@@ -339,8 +346,8 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) {
|
||||
}
|
||||
}
|
||||
|
||||
if r.Room.ActiveRecording != activeRecording {
|
||||
r.Room.ActiveRecording = activeRecording
|
||||
if r.protoRoom.ActiveRecording != activeRecording {
|
||||
r.protoRoom.ActiveRecording = activeRecording
|
||||
r.sendRoomUpdateLocked()
|
||||
}
|
||||
r.lock.Unlock()
|
||||
@@ -484,7 +491,7 @@ func (r *Room) CloseIfEmpty() {
|
||||
}
|
||||
}
|
||||
|
||||
timeout := r.Room.EmptyTimeout
|
||||
timeout := r.protoRoom.EmptyTimeout
|
||||
var elapsed int64
|
||||
if r.FirstJoinedAt() > 0 {
|
||||
// exit 20s after
|
||||
@@ -493,7 +500,7 @@ func (r *Room) CloseIfEmpty() {
|
||||
timeout = DefaultRoomDepartureGrace
|
||||
}
|
||||
} else {
|
||||
elapsed = time.Now().Unix() - r.Room.CreationTime
|
||||
elapsed = time.Now().Unix() - r.protoRoom.CreationTime
|
||||
}
|
||||
r.lock.Unlock()
|
||||
|
||||
@@ -538,7 +545,9 @@ func (r *Room) SendDataPacket(up *livekit.UserPacket, kind livekit.DataPacket_Ki
|
||||
}
|
||||
|
||||
func (r *Room) SetMetadata(metadata string) {
|
||||
r.Room.Metadata = metadata
|
||||
r.lock.Lock()
|
||||
r.protoRoom.Metadata = metadata
|
||||
r.lock.Unlock()
|
||||
|
||||
r.lock.RLock()
|
||||
r.sendRoomUpdateLocked()
|
||||
@@ -556,7 +565,7 @@ func (r *Room) sendRoomUpdateLocked() {
|
||||
continue
|
||||
}
|
||||
|
||||
err := p.SendRoomUpdate(r.Room)
|
||||
err := p.SendRoomUpdate(r.protoRoom)
|
||||
if err != nil {
|
||||
r.Logger.Warnw("failed to send room update", err, "participant", p.Identity())
|
||||
}
|
||||
@@ -930,9 +939,9 @@ func (r *Room) connectionQualityWorker() {
|
||||
|
||||
func (r *Room) DebugInfo() map[string]interface{} {
|
||||
info := map[string]interface{}{
|
||||
"Name": r.Room.Name,
|
||||
"Sid": r.Room.Sid,
|
||||
"CreatedAt": r.Room.CreationTime,
|
||||
"Name": r.protoRoom.Name,
|
||||
"Sid": r.protoRoom.Sid,
|
||||
"CreatedAt": r.protoRoom.CreationTime,
|
||||
}
|
||||
|
||||
participants := r.GetParticipants()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package rtc_test
|
||||
package rtc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -13,7 +13,6 @@ import (
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
serverlogger "github.com/livekit/livekit-server/pkg/logger"
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types"
|
||||
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/audio"
|
||||
@@ -75,7 +74,7 @@ func TestRoomJoin(t *testing.T) {
|
||||
|
||||
// expect new participant to get a JoinReply
|
||||
info, participants, iceServers, _ := pNew.SendJoinResponseArgsForCall(0)
|
||||
require.Equal(t, info.Sid, rm.Room.Sid)
|
||||
require.Equal(t, livekit.RoomID(info.Sid), rm.ID())
|
||||
require.Len(t, participants, numParticipants)
|
||||
require.Len(t, rm.GetParticipants(), numParticipants+1)
|
||||
require.NotEmpty(t, iceServers)
|
||||
@@ -86,7 +85,7 @@ func TestRoomJoin(t *testing.T) {
|
||||
rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting})
|
||||
p := newMockParticipant("new", types.DefaultProtocol, false, false)
|
||||
|
||||
err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
|
||||
err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
stateChangeCB := p.OnStateChangeArgsForCall(0)
|
||||
@@ -139,11 +138,11 @@ func TestRoomJoin(t *testing.T) {
|
||||
|
||||
t.Run("cannot exceed max participants", func(t *testing.T) {
|
||||
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})
|
||||
rm.Room.MaxParticipants = 1
|
||||
rm.protoRoom.MaxParticipants = 1
|
||||
p := newMockParticipant("second", types.ProtocolVersion(0), false, false)
|
||||
|
||||
err := rm.Join(p, nil, iceServersForRoom, "")
|
||||
require.Equal(t, rtc.ErrMaxParticipantsExceeded, err)
|
||||
require.Equal(t, ErrMaxParticipantsExceeded, err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -214,7 +213,7 @@ func TestRoomClosure(t *testing.T) {
|
||||
})
|
||||
p := rm.GetParticipants()[0]
|
||||
// allows immediate close after
|
||||
rm.Room.EmptyTimeout = 0
|
||||
rm.protoRoom.EmptyTimeout = 0
|
||||
rm.RemoveParticipant(p.Identity())
|
||||
|
||||
time.Sleep(defaultDelay)
|
||||
@@ -223,7 +222,7 @@ func TestRoomClosure(t *testing.T) {
|
||||
require.Len(t, rm.GetParticipants(), 0)
|
||||
require.True(t, isClosed)
|
||||
|
||||
require.Equal(t, rtc.ErrRoomClosed, rm.Join(p, nil, iceServersForRoom, ""))
|
||||
require.Equal(t, ErrRoomClosed, rm.Join(p, nil, iceServersForRoom, ""))
|
||||
})
|
||||
|
||||
t.Run("room does not close before empty timeout", func(t *testing.T) {
|
||||
@@ -232,7 +231,7 @@ func TestRoomClosure(t *testing.T) {
|
||||
rm.OnClose(func() {
|
||||
isClosed = true
|
||||
})
|
||||
require.NotZero(t, rm.Room.EmptyTimeout)
|
||||
require.NotZero(t, rm.protoRoom.EmptyTimeout)
|
||||
rm.CloseIfEmpty()
|
||||
require.False(t, isClosed)
|
||||
})
|
||||
@@ -243,7 +242,7 @@ func TestRoomClosure(t *testing.T) {
|
||||
rm.OnClose(func() {
|
||||
isClosed = true
|
||||
})
|
||||
rm.Room.EmptyTimeout = 1
|
||||
rm.protoRoom.EmptyTimeout = 1
|
||||
|
||||
time.Sleep(1010 * time.Millisecond)
|
||||
rm.CloseIfEmpty()
|
||||
@@ -507,7 +506,7 @@ func TestHiddenParticipants(t *testing.T) {
|
||||
|
||||
// expect new participant to get a JoinReply
|
||||
info, participants, iceServers, region := pNew.SendJoinResponseArgsForCall(0)
|
||||
require.Equal(t, info.Sid, rm.Room.Sid)
|
||||
require.Equal(t, livekit.RoomID(info.Sid), rm.ID())
|
||||
require.Len(t, participants, 2)
|
||||
require.Len(t, rm.GetParticipants(), 4)
|
||||
require.NotEmpty(t, iceServers)
|
||||
@@ -518,7 +517,7 @@ func TestHiddenParticipants(t *testing.T) {
|
||||
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1})
|
||||
p := newMockParticipant("new", types.DefaultProtocol, false, true)
|
||||
|
||||
err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
|
||||
err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
stateChangeCB := p.OnStateChangeArgsForCall(0)
|
||||
@@ -562,10 +561,10 @@ type testRoomOpts struct {
|
||||
audioSmoothIntervals uint32
|
||||
}
|
||||
|
||||
func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
|
||||
rm := rtc.NewRoom(
|
||||
func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
|
||||
rm := NewRoom(
|
||||
&livekit.Room{Name: "room"},
|
||||
rtc.WebRTCConfig{},
|
||||
WebRTCConfig{},
|
||||
&config.AudioConfig{
|
||||
UpdateInterval: audioUpdateInterval,
|
||||
SmoothIntervals: opts.audioSmoothIntervals,
|
||||
@@ -575,7 +574,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
|
||||
for i := 0; i < opts.num+opts.numHidden; i++ {
|
||||
identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i))
|
||||
participant := newMockParticipant(identity, opts.protocol, i >= opts.num, true)
|
||||
err := rm.Join(participant, &rtc.ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
|
||||
err := rm.Join(participant, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
|
||||
require.NoError(t, err)
|
||||
participant.StateReturns(livekit.ParticipantInfo_ACTIVE)
|
||||
participant.IsReadyReturns(true)
|
||||
|
||||
@@ -75,7 +75,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
|
||||
}
|
||||
case *livekit.SignalRequest_Leave:
|
||||
pLogger.Infow("client leaving room")
|
||||
_ = participant.Close(true)
|
||||
room.RemoveParticipant(participant.Identity())
|
||||
case *livekit.SignalRequest_SubscriptionPermission:
|
||||
err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission)
|
||||
if err != nil {
|
||||
|
||||
@@ -170,6 +170,7 @@ type LocalParticipant interface {
|
||||
type Room interface {
|
||||
Name() livekit.RoomName
|
||||
ID() livekit.RoomID
|
||||
RemoveParticipant(identity livekit.ParticipantIdentity)
|
||||
UpdateSubscriptions(participant LocalParticipant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) error
|
||||
UpdateSubscriptionPermission(participant LocalParticipant, permissions *livekit.SubscriptionPermission) error
|
||||
SyncState(participant LocalParticipant, state *livekit.SyncState) error
|
||||
|
||||
@@ -29,6 +29,11 @@ type FakeRoom struct {
|
||||
nameReturnsOnCall map[int]struct {
|
||||
result1 livekit.RoomName
|
||||
}
|
||||
RemoveParticipantStub func(livekit.ParticipantIdentity)
|
||||
removeParticipantMutex sync.RWMutex
|
||||
removeParticipantArgsForCall []struct {
|
||||
arg1 livekit.ParticipantIdentity
|
||||
}
|
||||
SetParticipantPermissionStub func(types.LocalParticipant, *livekit.ParticipantPermission) error
|
||||
setParticipantPermissionMutex sync.RWMutex
|
||||
setParticipantPermissionArgsForCall []struct {
|
||||
@@ -213,6 +218,38 @@ func (fake *FakeRoom) NameReturnsOnCall(i int, result1 livekit.RoomName) {
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRoom) RemoveParticipant(arg1 livekit.ParticipantIdentity) {
|
||||
fake.removeParticipantMutex.Lock()
|
||||
fake.removeParticipantArgsForCall = append(fake.removeParticipantArgsForCall, struct {
|
||||
arg1 livekit.ParticipantIdentity
|
||||
}{arg1})
|
||||
stub := fake.RemoveParticipantStub
|
||||
fake.recordInvocation("RemoveParticipant", []interface{}{arg1})
|
||||
fake.removeParticipantMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.RemoveParticipantStub(arg1)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeRoom) RemoveParticipantCallCount() int {
|
||||
fake.removeParticipantMutex.RLock()
|
||||
defer fake.removeParticipantMutex.RUnlock()
|
||||
return len(fake.removeParticipantArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoom) RemoveParticipantCalls(stub func(livekit.ParticipantIdentity)) {
|
||||
fake.removeParticipantMutex.Lock()
|
||||
defer fake.removeParticipantMutex.Unlock()
|
||||
fake.RemoveParticipantStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoom) RemoveParticipantArgsForCall(i int) livekit.ParticipantIdentity {
|
||||
fake.removeParticipantMutex.RLock()
|
||||
defer fake.removeParticipantMutex.RUnlock()
|
||||
argsForCall := fake.removeParticipantArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRoom) SetParticipantPermission(arg1 types.LocalParticipant, arg2 *livekit.ParticipantPermission) error {
|
||||
fake.setParticipantPermissionMutex.Lock()
|
||||
ret, specificReturn := fake.setParticipantPermissionReturnsOnCall[len(fake.setParticipantPermissionArgsForCall)]
|
||||
@@ -604,6 +641,8 @@ func (fake *FakeRoom) Invocations() map[string][][]interface{} {
|
||||
defer fake.iDMutex.RUnlock()
|
||||
fake.nameMutex.RLock()
|
||||
defer fake.nameMutex.RUnlock()
|
||||
fake.removeParticipantMutex.RLock()
|
||||
defer fake.removeParticipantMutex.RUnlock()
|
||||
fake.setParticipantPermissionMutex.RLock()
|
||||
defer fake.setParticipantPermissionMutex.RUnlock()
|
||||
fake.simulateScenarioMutex.RLock()
|
||||
|
||||
@@ -239,6 +239,7 @@ func (r *RoomManager) StartSession(
|
||||
rtcConf.SetBufferFactory(room.GetBufferFactory())
|
||||
sid := livekit.ParticipantID(utils.NewGuid(utils.ParticipantPrefix))
|
||||
pLogger := rtc.LoggerWithParticipant(room.Logger, pi.Identity, sid)
|
||||
protoRoom := room.ToProto()
|
||||
participant, err = rtc.NewParticipant(rtc.ParticipantParams{
|
||||
Identity: pi.Identity,
|
||||
Name: pi.Name,
|
||||
@@ -251,7 +252,7 @@ func (r *RoomManager) StartSession(
|
||||
Telemetry: r.telemetry,
|
||||
PLIThrottleConfig: r.config.RTC.PLIThrottle,
|
||||
CongestionControlConfig: r.config.RTC.CongestionControl,
|
||||
EnabledCodecs: room.Room.EnabledCodecs,
|
||||
EnabledCodecs: protoRoom.EnabledCodecs,
|
||||
Grants: pi.Grants,
|
||||
Logger: pLogger,
|
||||
ClientConf: clientConf,
|
||||
@@ -266,7 +267,7 @@ func (r *RoomManager) StartSession(
|
||||
opts := rtc.ParticipantOptions{
|
||||
AutoSubscribe: pi.AutoSubscribe,
|
||||
}
|
||||
if err = room.Join(participant, &opts, r.iceServersForRoom(room.Room), r.currentNode.Region); err != nil {
|
||||
if err = room.Join(participant, &opts, r.iceServersForRoom(protoRoom), r.currentNode.Region); err != nil {
|
||||
pLogger.Errorw("could not join room", err)
|
||||
_ = participant.Close(true)
|
||||
return err
|
||||
@@ -275,9 +276,9 @@ func (r *RoomManager) StartSession(
|
||||
pLogger.Errorw("could not store participant", err)
|
||||
}
|
||||
|
||||
updateParticipantCount := func() {
|
||||
updateParticipantCount := func(proto *livekit.Room) {
|
||||
if !participant.Hidden() {
|
||||
err = r.roomStore.StoreRoom(ctx, room.Room)
|
||||
err = r.roomStore.StoreRoom(ctx, proto)
|
||||
if err != nil {
|
||||
logger.Errorw("could not store room", err)
|
||||
}
|
||||
@@ -285,18 +286,19 @@ func (r *RoomManager) StartSession(
|
||||
}
|
||||
|
||||
// update room store with new numParticipants
|
||||
updateParticipantCount()
|
||||
updateParticipantCount(protoRoom)
|
||||
|
||||
clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id}
|
||||
r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto(), pi.Client, clientMeta)
|
||||
r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta)
|
||||
participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
|
||||
if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil {
|
||||
pLogger.Errorw("could not delete participant", err)
|
||||
}
|
||||
|
||||
// update room store with new numParticipants
|
||||
updateParticipantCount()
|
||||
r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto())
|
||||
proto := room.ToProto()
|
||||
updateParticipantCount(proto)
|
||||
r.telemetry.ParticipantLeft(ctx, proto, p.ToProto())
|
||||
|
||||
room.RemoveDisallowedSubscriptions(p, disallowedSubscriptions)
|
||||
})
|
||||
@@ -345,7 +347,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
|
||||
newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.telemetry)
|
||||
|
||||
newRoom.OnClose(func() {
|
||||
r.telemetry.RoomEnded(ctx, newRoom.Room)
|
||||
r.telemetry.RoomEnded(ctx, newRoom.ToProto())
|
||||
if err := r.DeleteRoom(ctx, roomName); err != nil {
|
||||
logger.Errorw("could not delete room", err)
|
||||
}
|
||||
@@ -354,7 +356,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
|
||||
})
|
||||
|
||||
newRoom.OnMetadataUpdate(func(metadata string) {
|
||||
if err := r.roomStore.StoreRoom(ctx, newRoom.Room); err != nil {
|
||||
if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto()); err != nil {
|
||||
logger.Errorw("could not handle metadata update", err)
|
||||
}
|
||||
})
|
||||
@@ -373,7 +375,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
|
||||
|
||||
newRoom.Hold()
|
||||
|
||||
r.telemetry.RoomStarted(ctx, newRoom.Room)
|
||||
r.telemetry.RoomStarted(ctx, newRoom.ToProto())
|
||||
|
||||
return newRoom, nil
|
||||
}
|
||||
@@ -384,8 +386,8 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
|
||||
logger.Debugw("RTC session finishing",
|
||||
"participant", participant.Identity(),
|
||||
"pID", participant.ID(),
|
||||
"room", room.Room.Name,
|
||||
"roomID", room.Room.Sid,
|
||||
"room", room.Name(),
|
||||
"roomID", room.ID(),
|
||||
)
|
||||
_ = participant.Close(true)
|
||||
requestSource.Close()
|
||||
|
||||
@@ -154,7 +154,7 @@ type DownTrack struct {
|
||||
onTransportCCFeedback func(dt *DownTrack, cc *rtcp.TransportLayerCC)
|
||||
|
||||
// simulcast layer availability change callback
|
||||
onAvailableLayersChanged func(dt *DownTrack)
|
||||
onAvailableLayersChanged atomic.Value // func(dt *DownTrack)
|
||||
|
||||
// layer bitrate availability change callback
|
||||
onBitrateAvailabilityChanged func(dt *DownTrack)
|
||||
@@ -687,8 +687,8 @@ func (d *DownTrack) GetForwardingStatus() ForwardingStatus {
|
||||
func (d *DownTrack) UpTrackLayersChange(availableLayers []int32) {
|
||||
d.forwarder.UpTrackLayersChange(availableLayers)
|
||||
|
||||
if d.onAvailableLayersChanged != nil {
|
||||
d.onAvailableLayersChanged(d)
|
||||
if onAvailableLayersChanged, ok := d.onAvailableLayersChanged.Load().(func(dt *DownTrack)); ok {
|
||||
onAvailableLayersChanged(d)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -723,7 +723,7 @@ func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) {
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack)) {
|
||||
d.onAvailableLayersChanged = fn
|
||||
d.onAvailableLayersChanged.Store(fn)
|
||||
}
|
||||
|
||||
func (d *DownTrack) OnBitrateAvailabilityChanged(fn func(dt *DownTrack)) {
|
||||
|
||||
Reference in New Issue
Block a user