diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index c15fd6718..424cb1d2e 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -36,7 +36,7 @@ type RedisRouter struct { rc *redis.Client ctx context.Context isStarted atomic.Bool - statsMu sync.Mutex + nodeMu sync.RWMutex // previous stats for computing averages prevStats *livekit.NodeStats @@ -54,7 +54,9 @@ func NewRedisRouter(currentNode LocalNode, rc *redis.Client) *RedisRouter { } func (r *RedisRouter) RegisterNode() error { + r.nodeMu.RLock() data, err := proto.Marshal((*livekit.Node)(r.currentNode)) + r.nodeMu.RUnlock() if err != nil { return err } @@ -287,7 +289,9 @@ func (r *RedisRouter) Start() error { } func (r *RedisRouter) Drain() { + r.nodeMu.Lock() r.currentNode.State = livekit.NodeState_SHUTTING_DOWN + r.nodeMu.Unlock() if err := r.RegisterNode(); err != nil { logger.Errorw("failed to mark as draining", err, "nodeID", r.currentNode.Id) } @@ -344,9 +348,9 @@ func (r *RedisRouter) statsWorker() { _ = r.WriteNodeRTC(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_KeepAlive{}, }) - r.statsMu.Lock() + r.nodeMu.RLock() stats := r.currentNode.Stats - r.statsMu.Unlock() + r.nodeMu.RUnlock() delaySeconds := time.Now().Unix() - stats.UpdatedAt if delaySeconds > statsMaxDelaySeconds { @@ -470,21 +474,21 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error { break } - r.statsMu.Lock() + r.nodeMu.Lock() if r.prevStats == nil { r.prevStats = r.currentNode.Stats } updated, computedAvg, err := prometheus.GetUpdatedNodeStats(r.currentNode.Stats, r.prevStats) if err != nil { logger.Errorw("could not update node stats", err) - r.statsMu.Unlock() + r.nodeMu.Unlock() return err } r.currentNode.Stats = updated if computedAvg { r.prevStats = updated } - r.statsMu.Unlock() + r.nodeMu.Unlock() // TODO: check stats against config.Limit values if err := r.RegisterNode(); err != nil { diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index 4444139f5..398b67079 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -1,4 +1,4 @@ -package rtc_test +package rtc import ( "github.com/livekit/protocol/livekit" diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 982a79d26..0cd2acd96 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -36,8 +36,8 @@ type broadcastOptions struct { type Room struct { lock sync.RWMutex - Room *livekit.Room - Logger logger.Logger + protoRoom *livekit.Room + Logger logger.Logger config WebRTCConfig audioConfig *config.AudioConfig @@ -70,7 +70,7 @@ type ParticipantOptions struct { func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room { r := &Room{ - Room: proto.Clone(room).(*livekit.Room), + protoRoom: proto.Clone(room).(*livekit.Room), Logger: LoggerWithRoom(logger.GetDefaultLogger(), livekit.RoomName(room.Name), livekit.RoomID(room.Sid)), config: config, audioConfig: audioConfig, @@ -81,11 +81,11 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), closed: make(chan struct{}), } - if r.Room.EmptyTimeout == 0 { - r.Room.EmptyTimeout = DefaultEmptyTimeout + if r.protoRoom.EmptyTimeout == 0 { + r.protoRoom.EmptyTimeout = DefaultEmptyTimeout } - if r.Room.CreationTime == 0 { - r.Room.CreationTime = time.Now().Unix() + if r.protoRoom.CreationTime == 0 { + r.protoRoom.CreationTime = time.Now().Unix() } go r.audioUpdateWorker() @@ -95,12 +95,19 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC return r } +func (r *Room) ToProto() *livekit.Room { + r.lock.RLock() + defer r.lock.RUnlock() + + return proto.Clone(r.protoRoom).(*livekit.Room) +} + func (r *Room) Name() livekit.RoomName { - return livekit.RoomName(r.Room.Name) + return livekit.RoomName(r.protoRoom.Name) } func (r *Room) ID() livekit.RoomID { - return livekit.RoomID(r.Room.Sid) + return livekit.RoomID(r.protoRoom.Sid) } func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.LocalParticipant { @@ -195,7 +202,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions return ErrAlreadyJoined } - if r.Room.MaxParticipants > 0 && len(r.participants) >= int(r.Room.MaxParticipants) { + if r.protoRoom.MaxParticipants > 0 && len(r.participants) >= int(r.protoRoom.MaxParticipants) { prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1) return ErrMaxParticipantsExceeded } @@ -204,7 +211,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions r.joinedAt.Store(time.Now().Unix()) } if !participant.Hidden() { - r.Room.NumParticipants++ + r.protoRoom.NumParticipants++ } // it's important to set this before connection, we don't want to miss out on any publishedTracks @@ -228,7 +235,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions // start the workers once connectivity is established p.Start() - r.telemetry.ParticipantActive(context.Background(), r.Room, p.ToProto(), &livekit.AnalyticsClientMeta{ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds())}) + r.telemetry.ParticipantActive(context.Background(), r.ToProto(), p.ToProto(), &livekit.AnalyticsClientMeta{ClientConnectTime: uint32(time.Since(p.ConnectedAt()).Milliseconds())}) } else if state == livekit.ParticipantInfo_DISCONNECTED { // remove participant from room go r.RemoveParticipant(p.Identity()) @@ -256,8 +263,8 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions "protocol", participant.ProtocolVersion(), "options", opts) - if participant.IsRecorder() && !r.Room.ActiveRecording { - r.Room.ActiveRecording = true + if participant.IsRecorder() && !r.protoRoom.ActiveRecording { + r.protoRoom.ActiveRecording = true r.sendRoomUpdateLocked() } @@ -283,7 +290,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions } }) - if err := participant.SendJoinResponse(r.Room, otherParticipants, iceServers, region); err != nil { + if err := participant.SendJoinResponse(r.protoRoom, otherParticipants, iceServers, region); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) return err } @@ -325,12 +332,12 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) { delete(r.participants, identity) delete(r.participantOpts, identity) if !p.Hidden() { - r.Room.NumParticipants-- + r.protoRoom.NumParticipants-- } } activeRecording := false - if (p != nil && p.IsRecorder()) || p == nil && r.Room.ActiveRecording { + if (p != nil && p.IsRecorder()) || p == nil && r.protoRoom.ActiveRecording { for _, op := range r.participants { if op.IsRecorder() { activeRecording = true @@ -339,8 +346,8 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) { } } - if r.Room.ActiveRecording != activeRecording { - r.Room.ActiveRecording = activeRecording + if r.protoRoom.ActiveRecording != activeRecording { + r.protoRoom.ActiveRecording = activeRecording r.sendRoomUpdateLocked() } r.lock.Unlock() @@ -484,7 +491,7 @@ func (r *Room) CloseIfEmpty() { } } - timeout := r.Room.EmptyTimeout + timeout := r.protoRoom.EmptyTimeout var elapsed int64 if r.FirstJoinedAt() > 0 { // exit 20s after @@ -493,7 +500,7 @@ func (r *Room) CloseIfEmpty() { timeout = DefaultRoomDepartureGrace } } else { - elapsed = time.Now().Unix() - r.Room.CreationTime + elapsed = time.Now().Unix() - r.protoRoom.CreationTime } r.lock.Unlock() @@ -538,7 +545,9 @@ func (r *Room) SendDataPacket(up *livekit.UserPacket, kind livekit.DataPacket_Ki } func (r *Room) SetMetadata(metadata string) { - r.Room.Metadata = metadata + r.lock.Lock() + r.protoRoom.Metadata = metadata + r.lock.Unlock() r.lock.RLock() r.sendRoomUpdateLocked() @@ -556,7 +565,7 @@ func (r *Room) sendRoomUpdateLocked() { continue } - err := p.SendRoomUpdate(r.Room) + err := p.SendRoomUpdate(r.protoRoom) if err != nil { r.Logger.Warnw("failed to send room update", err, "participant", p.Identity()) } @@ -930,9 +939,9 @@ func (r *Room) connectionQualityWorker() { func (r *Room) DebugInfo() map[string]interface{} { info := map[string]interface{}{ - "Name": r.Room.Name, - "Sid": r.Room.Sid, - "CreatedAt": r.Room.CreationTime, + "Name": r.protoRoom.Name, + "Sid": r.protoRoom.Sid, + "CreatedAt": r.protoRoom.CreationTime, } participants := r.GetParticipants() diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index a1e06e81c..9e3fd0e16 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -1,4 +1,4 @@ -package rtc_test +package rtc import ( "fmt" @@ -13,7 +13,6 @@ import ( "github.com/livekit/livekit-server/pkg/config" serverlogger "github.com/livekit/livekit-server/pkg/logger" - "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" "github.com/livekit/livekit-server/pkg/sfu/audio" @@ -75,7 +74,7 @@ func TestRoomJoin(t *testing.T) { // expect new participant to get a JoinReply info, participants, iceServers, _ := pNew.SendJoinResponseArgsForCall(0) - require.Equal(t, info.Sid, rm.Room.Sid) + require.Equal(t, livekit.RoomID(info.Sid), rm.ID()) require.Len(t, participants, numParticipants) require.Len(t, rm.GetParticipants(), numParticipants+1) require.NotEmpty(t, iceServers) @@ -86,7 +85,7 @@ func TestRoomJoin(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting}) p := newMockParticipant("new", types.DefaultProtocol, false, false) - err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") + err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") require.NoError(t, err) stateChangeCB := p.OnStateChangeArgsForCall(0) @@ -139,11 +138,11 @@ func TestRoomJoin(t *testing.T) { t.Run("cannot exceed max participants", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) - rm.Room.MaxParticipants = 1 + rm.protoRoom.MaxParticipants = 1 p := newMockParticipant("second", types.ProtocolVersion(0), false, false) err := rm.Join(p, nil, iceServersForRoom, "") - require.Equal(t, rtc.ErrMaxParticipantsExceeded, err) + require.Equal(t, ErrMaxParticipantsExceeded, err) }) } @@ -214,7 +213,7 @@ func TestRoomClosure(t *testing.T) { }) p := rm.GetParticipants()[0] // allows immediate close after - rm.Room.EmptyTimeout = 0 + rm.protoRoom.EmptyTimeout = 0 rm.RemoveParticipant(p.Identity()) time.Sleep(defaultDelay) @@ -223,7 +222,7 @@ func TestRoomClosure(t *testing.T) { require.Len(t, rm.GetParticipants(), 0) require.True(t, isClosed) - require.Equal(t, rtc.ErrRoomClosed, rm.Join(p, nil, iceServersForRoom, "")) + require.Equal(t, ErrRoomClosed, rm.Join(p, nil, iceServersForRoom, "")) }) t.Run("room does not close before empty timeout", func(t *testing.T) { @@ -232,7 +231,7 @@ func TestRoomClosure(t *testing.T) { rm.OnClose(func() { isClosed = true }) - require.NotZero(t, rm.Room.EmptyTimeout) + require.NotZero(t, rm.protoRoom.EmptyTimeout) rm.CloseIfEmpty() require.False(t, isClosed) }) @@ -243,7 +242,7 @@ func TestRoomClosure(t *testing.T) { rm.OnClose(func() { isClosed = true }) - rm.Room.EmptyTimeout = 1 + rm.protoRoom.EmptyTimeout = 1 time.Sleep(1010 * time.Millisecond) rm.CloseIfEmpty() @@ -507,7 +506,7 @@ func TestHiddenParticipants(t *testing.T) { // expect new participant to get a JoinReply info, participants, iceServers, region := pNew.SendJoinResponseArgsForCall(0) - require.Equal(t, info.Sid, rm.Room.Sid) + require.Equal(t, livekit.RoomID(info.Sid), rm.ID()) require.Len(t, participants, 2) require.Len(t, rm.GetParticipants(), 4) require.NotEmpty(t, iceServers) @@ -518,7 +517,7 @@ func TestHiddenParticipants(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1}) p := newMockParticipant("new", types.DefaultProtocol, false, true) - err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") + err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") require.NoError(t, err) stateChangeCB := p.OnStateChangeArgsForCall(0) @@ -562,10 +561,10 @@ type testRoomOpts struct { audioSmoothIntervals uint32 } -func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { - rm := rtc.NewRoom( +func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { + rm := NewRoom( &livekit.Room{Name: "room"}, - rtc.WebRTCConfig{}, + WebRTCConfig{}, &config.AudioConfig{ UpdateInterval: audioUpdateInterval, SmoothIntervals: opts.audioSmoothIntervals, @@ -575,7 +574,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { for i := 0; i < opts.num+opts.numHidden; i++ { identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i)) participant := newMockParticipant(identity, opts.protocol, i >= opts.num, true) - err := rm.Join(participant, &rtc.ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") + err := rm.Join(participant, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") require.NoError(t, err) participant.StateReturns(livekit.ParticipantInfo_ACTIVE) participant.IsReadyReturns(true) diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 25fb36598..b3d0d698e 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -75,7 +75,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant } case *livekit.SignalRequest_Leave: pLogger.Infow("client leaving room") - _ = participant.Close(true) + room.RemoveParticipant(participant.Identity()) case *livekit.SignalRequest_SubscriptionPermission: err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission) if err != nil { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index a138709b2..ddb24ef38 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -170,6 +170,7 @@ type LocalParticipant interface { type Room interface { Name() livekit.RoomName ID() livekit.RoomID + RemoveParticipant(identity livekit.ParticipantIdentity) UpdateSubscriptions(participant LocalParticipant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) error UpdateSubscriptionPermission(participant LocalParticipant, permissions *livekit.SubscriptionPermission) error SyncState(participant LocalParticipant, state *livekit.SyncState) error diff --git a/pkg/rtc/types/typesfakes/fake_room.go b/pkg/rtc/types/typesfakes/fake_room.go index 6dcea7862..a4599b55a 100644 --- a/pkg/rtc/types/typesfakes/fake_room.go +++ b/pkg/rtc/types/typesfakes/fake_room.go @@ -29,6 +29,11 @@ type FakeRoom struct { nameReturnsOnCall map[int]struct { result1 livekit.RoomName } + RemoveParticipantStub func(livekit.ParticipantIdentity) + removeParticipantMutex sync.RWMutex + removeParticipantArgsForCall []struct { + arg1 livekit.ParticipantIdentity + } SetParticipantPermissionStub func(types.LocalParticipant, *livekit.ParticipantPermission) error setParticipantPermissionMutex sync.RWMutex setParticipantPermissionArgsForCall []struct { @@ -213,6 +218,38 @@ func (fake *FakeRoom) NameReturnsOnCall(i int, result1 livekit.RoomName) { }{result1} } +func (fake *FakeRoom) RemoveParticipant(arg1 livekit.ParticipantIdentity) { + fake.removeParticipantMutex.Lock() + fake.removeParticipantArgsForCall = append(fake.removeParticipantArgsForCall, struct { + arg1 livekit.ParticipantIdentity + }{arg1}) + stub := fake.RemoveParticipantStub + fake.recordInvocation("RemoveParticipant", []interface{}{arg1}) + fake.removeParticipantMutex.Unlock() + if stub != nil { + fake.RemoveParticipantStub(arg1) + } +} + +func (fake *FakeRoom) RemoveParticipantCallCount() int { + fake.removeParticipantMutex.RLock() + defer fake.removeParticipantMutex.RUnlock() + return len(fake.removeParticipantArgsForCall) +} + +func (fake *FakeRoom) RemoveParticipantCalls(stub func(livekit.ParticipantIdentity)) { + fake.removeParticipantMutex.Lock() + defer fake.removeParticipantMutex.Unlock() + fake.RemoveParticipantStub = stub +} + +func (fake *FakeRoom) RemoveParticipantArgsForCall(i int) livekit.ParticipantIdentity { + fake.removeParticipantMutex.RLock() + defer fake.removeParticipantMutex.RUnlock() + argsForCall := fake.removeParticipantArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeRoom) SetParticipantPermission(arg1 types.LocalParticipant, arg2 *livekit.ParticipantPermission) error { fake.setParticipantPermissionMutex.Lock() ret, specificReturn := fake.setParticipantPermissionReturnsOnCall[len(fake.setParticipantPermissionArgsForCall)] @@ -604,6 +641,8 @@ func (fake *FakeRoom) Invocations() map[string][][]interface{} { defer fake.iDMutex.RUnlock() fake.nameMutex.RLock() defer fake.nameMutex.RUnlock() + fake.removeParticipantMutex.RLock() + defer fake.removeParticipantMutex.RUnlock() fake.setParticipantPermissionMutex.RLock() defer fake.setParticipantPermissionMutex.RUnlock() fake.simulateScenarioMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 9c4001ba1..de7e7b04d 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -239,6 +239,7 @@ func (r *RoomManager) StartSession( rtcConf.SetBufferFactory(room.GetBufferFactory()) sid := livekit.ParticipantID(utils.NewGuid(utils.ParticipantPrefix)) pLogger := rtc.LoggerWithParticipant(room.Logger, pi.Identity, sid) + protoRoom := room.ToProto() participant, err = rtc.NewParticipant(rtc.ParticipantParams{ Identity: pi.Identity, Name: pi.Name, @@ -251,7 +252,7 @@ func (r *RoomManager) StartSession( Telemetry: r.telemetry, PLIThrottleConfig: r.config.RTC.PLIThrottle, CongestionControlConfig: r.config.RTC.CongestionControl, - EnabledCodecs: room.Room.EnabledCodecs, + EnabledCodecs: protoRoom.EnabledCodecs, Grants: pi.Grants, Logger: pLogger, ClientConf: clientConf, @@ -266,7 +267,7 @@ func (r *RoomManager) StartSession( opts := rtc.ParticipantOptions{ AutoSubscribe: pi.AutoSubscribe, } - if err = room.Join(participant, &opts, r.iceServersForRoom(room.Room), r.currentNode.Region); err != nil { + if err = room.Join(participant, &opts, r.iceServersForRoom(protoRoom), r.currentNode.Region); err != nil { pLogger.Errorw("could not join room", err) _ = participant.Close(true) return err @@ -275,9 +276,9 @@ func (r *RoomManager) StartSession( pLogger.Errorw("could not store participant", err) } - updateParticipantCount := func() { + updateParticipantCount := func(proto *livekit.Room) { if !participant.Hidden() { - err = r.roomStore.StoreRoom(ctx, room.Room) + err = r.roomStore.StoreRoom(ctx, proto) if err != nil { logger.Errorw("could not store room", err) } @@ -285,18 +286,19 @@ func (r *RoomManager) StartSession( } // update room store with new numParticipants - updateParticipantCount() + updateParticipantCount(protoRoom) clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region, Node: r.currentNode.Id} - r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto(), pi.Client, clientMeta) + r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta) participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) { if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil { pLogger.Errorw("could not delete participant", err) } // update room store with new numParticipants - updateParticipantCount() - r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto()) + proto := room.ToProto() + updateParticipantCount(proto) + r.telemetry.ParticipantLeft(ctx, proto, p.ToProto()) room.RemoveDisallowedSubscriptions(p, disallowedSubscriptions) }) @@ -345,7 +347,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.telemetry) newRoom.OnClose(func() { - r.telemetry.RoomEnded(ctx, newRoom.Room) + r.telemetry.RoomEnded(ctx, newRoom.ToProto()) if err := r.DeleteRoom(ctx, roomName); err != nil { logger.Errorw("could not delete room", err) } @@ -354,7 +356,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room }) newRoom.OnMetadataUpdate(func(metadata string) { - if err := r.roomStore.StoreRoom(ctx, newRoom.Room); err != nil { + if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto()); err != nil { logger.Errorw("could not handle metadata update", err) } }) @@ -373,7 +375,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom.Hold() - r.telemetry.RoomStarted(ctx, newRoom.Room) + r.telemetry.RoomStarted(ctx, newRoom.ToProto()) return newRoom, nil } @@ -384,8 +386,8 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa logger.Debugw("RTC session finishing", "participant", participant.Identity(), "pID", participant.ID(), - "room", room.Room.Name, - "roomID", room.Room.Sid, + "room", room.Name(), + "roomID", room.ID(), ) _ = participant.Close(true) requestSource.Close() diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index bd55e1128..387784e30 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -154,7 +154,7 @@ type DownTrack struct { onTransportCCFeedback func(dt *DownTrack, cc *rtcp.TransportLayerCC) // simulcast layer availability change callback - onAvailableLayersChanged func(dt *DownTrack) + onAvailableLayersChanged atomic.Value // func(dt *DownTrack) // layer bitrate availability change callback onBitrateAvailabilityChanged func(dt *DownTrack) @@ -687,8 +687,8 @@ func (d *DownTrack) GetForwardingStatus() ForwardingStatus { func (d *DownTrack) UpTrackLayersChange(availableLayers []int32) { d.forwarder.UpTrackLayersChange(availableLayers) - if d.onAvailableLayersChanged != nil { - d.onAvailableLayersChanged(d) + if onAvailableLayersChanged, ok := d.onAvailableLayersChanged.Load().(func(dt *DownTrack)); ok { + onAvailableLayersChanged(d) } } @@ -723,7 +723,7 @@ func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) { } func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack)) { - d.onAvailableLayersChanged = fn + d.onAvailableLayersChanged.Store(fn) } func (d *DownTrack) OnBitrateAvailabilityChanged(fn func(dt *DownTrack)) {