From f09885825ee06efd2a3a04ff4cedf7fdbbb98127 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 10 Aug 2022 17:04:17 -0700 Subject: [PATCH] Return ServerInfo to clients on join (#904) * checkpoint * Return ServerInfo in join response * also include node information * less verbose quality score * update go modules --- go.mod | 2 +- go.sum | 7 +- pkg/rtc/participant.go | 4 + pkg/rtc/participant_signal.go | 23 +--- pkg/rtc/room.go | 53 ++++++--- pkg/rtc/room_test.go | 48 +++++---- pkg/rtc/types/interfaces.go | 16 +-- pkg/rtc/types/protocol_version.go | 2 +- .../typesfakes/fake_local_participant.go | 101 +++++++++++++----- pkg/service/roommanager.go | 14 ++- pkg/sfu/connectionquality/connectionstats.go | 12 ++- 11 files changed, 183 insertions(+), 99 deletions(-) diff --git a/go.mod b/go.mod index cc115bc1a..d2cf80a92 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e + github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.2 github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index 48b33b6a6..e24936bfd 100644 --- a/go.sum +++ b/go.sum @@ -240,10 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v1.0.0 h1:41aGhSIHtyPJBwzw4Aw1Y4BQpKxLBlS1wK31G8uME8A= -github.com/livekit/protocol v1.0.0/go.mod h1:x51sLXmdYpzHvw+xtaootF4EP5Tasg+CDOpv0UYA3DY= -github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e h1:J8Y/cu8AFW/1KbHXVaKpHEGxqActUQpmcaoxot2DmJQ= -github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e/go.mod h1:JBAOkbmwYmZc4yMTpDrjLFs4RVPApEmoWP1idrjBjdI= +github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d h1:e0esC1DzNhhH4r9GZUQQzuaZd5/lb9pLZqBTdBTVAhI= +github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.2 h1:7jVyXGXTkQL3+6lDVUDBY+Fpo8VQPfyOkZeXxxsXX4c= @@ -432,7 +430,6 @@ go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= -go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.22.0 h1:Zcye5DUgBloQ9BaT4qc9BnjOFog5TvBSAGkJ3Nf70c0= go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 8f15b139b..85e6ee71a 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -392,6 +392,10 @@ func (p *ParticipantImpl) ConnectedAt() time.Time { return p.connectedAt } +func (p *ParticipantImpl) GetClientConfiguration() *livekit.ClientConfiguration { + return p.params.ClientConf +} + // SetMetadata attaches metadata to the participant func (p *ParticipantImpl) SetMetadata(metadata string) { p.lock.Lock() diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index ea0090286..e416e6d45 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -9,7 +9,6 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/version" ) func (p *ParticipantImpl) GetResponseSink() routing.MessageSink { @@ -31,12 +30,7 @@ func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) { } } -func (p *ParticipantImpl) SendJoinResponse( - roomInfo *livekit.Room, - otherParticipants []*livekit.ParticipantInfo, - iceServers []*livekit.ICEServer, - region string, -) error { +func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error { if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) } @@ -44,20 +38,7 @@ func (p *ParticipantImpl) SendJoinResponse( // send Join response return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Join{ - Join: &livekit.JoinResponse{ - Room: roomInfo, - Participant: p.ToProto(), - OtherParticipants: otherParticipants, - ServerVersion: version.Version, - ServerRegion: region, - IceServers: iceServers, - // indicates both server and client support subscriber as primary - SubscriberPrimary: p.SubscriberAsPrimary(), - ClientConfiguration: p.params.ClientConf, - // sane defaults for ping interval & timeout - PingInterval: 10, - PingTimeout: 20, - }, + Join: joinResponse, }, }) } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 8eeaf35cf..38b8d8d5c 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -43,6 +43,7 @@ type Room struct { config WebRTCConfig audioConfig *config.AudioConfig + serverInfo *livekit.ServerInfo telemetry telemetry.TelemetryService // map of identity -> Participant @@ -70,13 +71,20 @@ type ParticipantOptions struct { AutoSubscribe bool } -func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room { +func NewRoom( + room *livekit.Room, + config WebRTCConfig, + audioConfig *config.AudioConfig, + serverInfo *livekit.ServerInfo, + telemetry telemetry.TelemetryService, +) *Room { r := &Room{ protoRoom: proto.Clone(room).(*livekit.Room), Logger: LoggerWithRoom(logger.GetDefaultLogger(), livekit.RoomName(room.Name), livekit.RoomID(room.Sid)), config: config, audioConfig: audioConfig, telemetry: telemetry, + serverInfo: serverInfo, participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant), participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions), bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize), @@ -196,7 +204,7 @@ func (r *Room) Release() { r.holds.Dec() } -func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions, iceServers []*livekit.ICEServer, region string) error { +func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error { r.lock.Lock() defer r.lock.Unlock() @@ -259,7 +267,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions speakers := r.GetActiveSpeakers() for _, speaker := range speakers { if livekit.ParticipantID(speaker.Sid) == publisherID { - p.SendSpeakerUpdate(speakers) + _ = p.SendSpeakerUpdate(speakers) break } } @@ -269,7 +277,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions if pub != nil && pub.State() == livekit.ParticipantInfo_ACTIVE { update := &livekit.ConnectionQualityUpdate{} update.Updates = append(update.Updates, pub.GetConnectionQuality()) - p.SendConnectionQualityUpdate(update) + _ = p.SendConnectionQualityUpdate(update) } }() }) @@ -287,14 +295,6 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions r.participants[participant.Identity()] = participant r.participantOpts[participant.Identity()] = opts - // gather other participants and send join response - otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants)) - for _, p := range r.participants { - if p.ID() != participant.ID() && !p.Hidden() { - otherParticipants = append(otherParticipants, p.ToProto()) - } - } - if r.onParticipantChanged != nil { r.onParticipantChanged(participant) } @@ -306,7 +306,8 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions } }) - if err := participant.SendJoinResponse(proto.Clone(r.protoRoom).(*livekit.Room), otherParticipants, iceServers, region); err != nil { + joinResponse := r.createJoinResponseLocked(participant, iceServers) + if err := participant.SendJoinResponse(joinResponse); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) return err } @@ -663,6 +664,32 @@ func (r *Room) autoSubscribe(participant types.LocalParticipant) bool { return true } +func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceServers []*livekit.ICEServer) *livekit.JoinResponse { + // gather other participants and send join response + otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants)) + for _, p := range r.participants { + if p.ID() != participant.ID() && !p.Hidden() { + otherParticipants = append(otherParticipants, p.ToProto()) + } + } + + return &livekit.JoinResponse{ + Room: r.protoRoom, + Participant: participant.ToProto(), + OtherParticipants: otherParticipants, + ServerVersion: r.serverInfo.Version, + ServerRegion: r.serverInfo.Region, + IceServers: iceServers, + // indicates both server and client support subscriber as primary + SubscriberPrimary: participant.SubscriberAsPrimary(), + ClientConfiguration: participant.GetClientConfiguration(), + // sane defaults for ping interval & timeout + PingInterval: 10, + PingTimeout: 20, + ServerInfo: r.serverInfo, + } +} + // a ParticipantImpl in the room added a new remoteTrack, subscribe other participants to it func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) { // publish participant update, since track state is changed diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 8e1e3eb82..b81785c35 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/version" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" @@ -69,24 +70,24 @@ func TestJoinedState(t *testing.T) { func TestRoomJoin(t *testing.T) { t.Run("joining returns existing participant data", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants}) - pNew := newMockParticipant("new", types.DefaultProtocol, false, false) + pNew := newMockParticipant("new", types.CurrentProtocol, false, false) - _ = rm.Join(pNew, nil, iceServersForRoom, "test") + _ = rm.Join(pNew, nil, iceServersForRoom) // expect new participant to get a JoinReply - info, participants, iceServers, _ := pNew.SendJoinResponseArgsForCall(0) - require.Equal(t, livekit.RoomID(info.Sid), rm.ID()) - require.Len(t, participants, numParticipants) + res := pNew.SendJoinResponseArgsForCall(0) + require.Equal(t, livekit.RoomID(res.Room.Sid), rm.ID()) + require.Len(t, res.OtherParticipants, numParticipants) require.Len(t, rm.GetParticipants(), numParticipants+1) - require.NotEmpty(t, iceServers) + require.NotEmpty(t, res.IceServers) }) t.Run("subscribe to existing channels upon join", func(t *testing.T) { numExisting := 3 rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting}) - p := newMockParticipant("new", types.DefaultProtocol, false, false) + p := newMockParticipant("new", types.CurrentProtocol, false, false) - err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") + err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom) require.NoError(t, err) stateChangeCB := p.OnStateChangeArgsForCall(0) @@ -142,7 +143,7 @@ func TestRoomJoin(t *testing.T) { rm.protoRoom.MaxParticipants = 1 p := newMockParticipant("second", types.ProtocolVersion(0), false, false) - err := rm.Join(p, nil, iceServersForRoom, "") + err := rm.Join(p, nil, iceServersForRoom) require.Equal(t, ErrMaxParticipantsExceeded, err) }) } @@ -340,7 +341,7 @@ func TestRoomClosure(t *testing.T) { require.Len(t, rm.GetParticipants(), 0) require.True(t, isClosed) - require.Equal(t, ErrRoomClosed, rm.Join(p, nil, iceServersForRoom, "")) + require.Equal(t, ErrRoomClosed, rm.Join(p, nil, iceServersForRoom)) }) t.Run("room does not close before empty timeout", func(t *testing.T) { @@ -619,23 +620,23 @@ func TestHiddenParticipants(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1}) defer rm.Close() - pNew := newMockParticipant("new", types.DefaultProtocol, false, false) - rm.Join(pNew, nil, iceServersForRoom, "testregion") + pNew := newMockParticipant("new", types.CurrentProtocol, false, false) + rm.Join(pNew, nil, iceServersForRoom) // expect new participant to get a JoinReply - info, participants, iceServers, region := pNew.SendJoinResponseArgsForCall(0) - require.Equal(t, livekit.RoomID(info.Sid), rm.ID()) - require.Len(t, participants, 2) + res := pNew.SendJoinResponseArgsForCall(0) + require.Equal(t, livekit.RoomID(res.Room.Sid), rm.ID()) + require.Len(t, res.OtherParticipants, 2) require.Len(t, rm.GetParticipants(), 4) - require.NotEmpty(t, iceServers) - require.Equal(t, "testregion", region) + require.NotEmpty(t, res.IceServers) + require.Equal(t, "testregion", res.ServerRegion) }) t.Run("hidden participant subscribes to tracks", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1}) - p := newMockParticipant("new", types.DefaultProtocol, false, true) + p := newMockParticipant("new", types.CurrentProtocol, false, true) - err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") + err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom) require.NoError(t, err) stateChangeCB := p.OnStateChangeArgsForCall(0) @@ -687,12 +688,19 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { UpdateInterval: audioUpdateInterval, SmoothIntervals: opts.audioSmoothIntervals, }, + &livekit.ServerInfo{ + Edition: livekit.ServerInfo_Standard, + Version: version.Version, + Protocol: types.CurrentProtocol, + NodeId: "testnode", + Region: "testregion", + }, telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}), ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i)) participant := newMockParticipant(identity, opts.protocol, i >= opts.num, true) - err := rm.Join(participant, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "") + err := rm.Join(participant, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom) require.NoError(t, err) participant.StateReturns(livekit.ParticipantInfo_ACTIVE) participant.IsReadyReturns(true) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index a3928d1a7..472cef525 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -204,16 +204,15 @@ type IceConfig struct { type LocalParticipant interface { Participant + // getters GetLogger() logger.Logger GetAdaptiveStream() bool - ProtocolVersion() ProtocolVersion - ConnectedAt() time.Time - State() livekit.ParticipantInfo_State IsReady() bool SubscriberAsPrimary() bool + GetClientConfiguration() *livekit.ClientConfiguration GetResponseSink() routing.MessageSink SetResponseSink(sink routing.MessageSink) @@ -225,13 +224,11 @@ type LocalParticipant interface { CanSubscribe() bool CanPublishData() bool + // PeerConnection AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error - HandleOffer(sdp webrtc.SessionDescription) error - AddTrack(req *livekit.AddTrackRequest) SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool) - SubscriberMediaEngine() *webrtc.MediaEngine SubscriberPC() *webrtc.PeerConnection HandleAnswer(sdp webrtc.SessionDescription) error @@ -239,6 +236,8 @@ type LocalParticipant interface { AddNegotiationPending(publisherID livekit.ParticipantID) IsNegotiationPending(publisherID livekit.ParticipantID) bool ICERestart(iceConfig *IceConfig) error + + // subscriptions AddSubscribedTrack(st SubscribedTrack) RemoveSubscribedTrack(st SubscribedTrack) UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error @@ -253,7 +252,7 @@ type LocalParticipant interface { GetConnectionQuality() *livekit.ConnectionQualityInfo // server sent messages - SendJoinResponse(info *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer, region string) error + SendJoinResponse(joinResponse *livekit.JoinResponse) error SendParticipantUpdate(participants []*livekit.ParticipantInfo) error SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error SendDataPacket(packet *livekit.DataPacket) error @@ -299,6 +298,7 @@ type LocalParticipant interface { } // Room is a container of participants, and can provide room-level actions +// //counterfeiter:generate . Room type Room interface { Name() livekit.RoomName @@ -313,6 +313,7 @@ type Room interface { } // MediaTrack represents a media track +// //counterfeiter:generate . MediaTrack type MediaTrack interface { ID() livekit.TrackID @@ -369,6 +370,7 @@ type LocalMediaTrack interface { } // MediaTrack is the main interface representing a track published to the room +// //counterfeiter:generate . SubscribedTrack type SubscribedTrack interface { OnBind(f func()) diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index 071ea512e..d1260cd90 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -2,7 +2,7 @@ package types type ProtocolVersion int -const DefaultProtocol = 6 +const CurrentProtocol = 8 func (v ProtocolVersion) SupportsPackedStreamId() bool { return v > 0 diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index d4331b7a3..6a900aa81 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -188,6 +188,16 @@ type FakeLocalParticipant struct { result1 *webrtc.RTPTransceiver result2 sfu.ForwarderState } + GetClientConfigurationStub func() *livekit.ClientConfiguration + getClientConfigurationMutex sync.RWMutex + getClientConfigurationArgsForCall []struct { + } + getClientConfigurationReturns struct { + result1 *livekit.ClientConfiguration + } + getClientConfigurationReturnsOnCall map[int]struct { + result1 *livekit.ClientConfiguration + } GetConnectionQualityStub func() *livekit.ConnectionQualityInfo getConnectionQualityMutex sync.RWMutex getConnectionQualityArgsForCall []struct { @@ -483,13 +493,10 @@ type FakeLocalParticipant struct { sendDataPacketReturnsOnCall map[int]struct { result1 error } - SendJoinResponseStub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer, string) error + SendJoinResponseStub func(*livekit.JoinResponse) error sendJoinResponseMutex sync.RWMutex sendJoinResponseArgsForCall []struct { - arg1 *livekit.Room - arg2 []*livekit.ParticipantInfo - arg3 []*livekit.ICEServer - arg4 string + arg1 *livekit.JoinResponse } sendJoinResponseReturns struct { result1 error @@ -1645,6 +1652,59 @@ func (fake *FakeLocalParticipant) GetCachedDownTrackReturnsOnCall(i int, result1 }{result1, result2} } +func (fake *FakeLocalParticipant) GetClientConfiguration() *livekit.ClientConfiguration { + fake.getClientConfigurationMutex.Lock() + ret, specificReturn := fake.getClientConfigurationReturnsOnCall[len(fake.getClientConfigurationArgsForCall)] + fake.getClientConfigurationArgsForCall = append(fake.getClientConfigurationArgsForCall, struct { + }{}) + stub := fake.GetClientConfigurationStub + fakeReturns := fake.getClientConfigurationReturns + fake.recordInvocation("GetClientConfiguration", []interface{}{}) + fake.getClientConfigurationMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetClientConfigurationCallCount() int { + fake.getClientConfigurationMutex.RLock() + defer fake.getClientConfigurationMutex.RUnlock() + return len(fake.getClientConfigurationArgsForCall) +} + +func (fake *FakeLocalParticipant) GetClientConfigurationCalls(stub func() *livekit.ClientConfiguration) { + fake.getClientConfigurationMutex.Lock() + defer fake.getClientConfigurationMutex.Unlock() + fake.GetClientConfigurationStub = stub +} + +func (fake *FakeLocalParticipant) GetClientConfigurationReturns(result1 *livekit.ClientConfiguration) { + fake.getClientConfigurationMutex.Lock() + defer fake.getClientConfigurationMutex.Unlock() + fake.GetClientConfigurationStub = nil + fake.getClientConfigurationReturns = struct { + result1 *livekit.ClientConfiguration + }{result1} +} + +func (fake *FakeLocalParticipant) GetClientConfigurationReturnsOnCall(i int, result1 *livekit.ClientConfiguration) { + fake.getClientConfigurationMutex.Lock() + defer fake.getClientConfigurationMutex.Unlock() + fake.GetClientConfigurationStub = nil + if fake.getClientConfigurationReturnsOnCall == nil { + fake.getClientConfigurationReturnsOnCall = make(map[int]struct { + result1 *livekit.ClientConfiguration + }) + } + fake.getClientConfigurationReturnsOnCall[i] = struct { + result1 *livekit.ClientConfiguration + }{result1} +} + func (fake *FakeLocalParticipant) GetConnectionQuality() *livekit.ConnectionQualityInfo { fake.getConnectionQualityMutex.Lock() ret, specificReturn := fake.getConnectionQualityReturnsOnCall[len(fake.getConnectionQualityArgsForCall)] @@ -3293,31 +3353,18 @@ func (fake *FakeLocalParticipant) SendDataPacketReturnsOnCall(i int, result1 err }{result1} } -func (fake *FakeLocalParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []*livekit.ParticipantInfo, arg3 []*livekit.ICEServer, arg4 string) error { - var arg2Copy []*livekit.ParticipantInfo - if arg2 != nil { - arg2Copy = make([]*livekit.ParticipantInfo, len(arg2)) - copy(arg2Copy, arg2) - } - var arg3Copy []*livekit.ICEServer - if arg3 != nil { - arg3Copy = make([]*livekit.ICEServer, len(arg3)) - copy(arg3Copy, arg3) - } +func (fake *FakeLocalParticipant) SendJoinResponse(arg1 *livekit.JoinResponse) error { fake.sendJoinResponseMutex.Lock() ret, specificReturn := fake.sendJoinResponseReturnsOnCall[len(fake.sendJoinResponseArgsForCall)] fake.sendJoinResponseArgsForCall = append(fake.sendJoinResponseArgsForCall, struct { - arg1 *livekit.Room - arg2 []*livekit.ParticipantInfo - arg3 []*livekit.ICEServer - arg4 string - }{arg1, arg2Copy, arg3Copy, arg4}) + arg1 *livekit.JoinResponse + }{arg1}) stub := fake.SendJoinResponseStub fakeReturns := fake.sendJoinResponseReturns - fake.recordInvocation("SendJoinResponse", []interface{}{arg1, arg2Copy, arg3Copy, arg4}) + fake.recordInvocation("SendJoinResponse", []interface{}{arg1}) fake.sendJoinResponseMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4) + return stub(arg1) } if specificReturn { return ret.result1 @@ -3331,17 +3378,17 @@ func (fake *FakeLocalParticipant) SendJoinResponseCallCount() int { return len(fake.sendJoinResponseArgsForCall) } -func (fake *FakeLocalParticipant) SendJoinResponseCalls(stub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer, string) error) { +func (fake *FakeLocalParticipant) SendJoinResponseCalls(stub func(*livekit.JoinResponse) error) { fake.sendJoinResponseMutex.Lock() defer fake.sendJoinResponseMutex.Unlock() fake.SendJoinResponseStub = stub } -func (fake *FakeLocalParticipant) SendJoinResponseArgsForCall(i int) (*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer, string) { +func (fake *FakeLocalParticipant) SendJoinResponseArgsForCall(i int) *livekit.JoinResponse { fake.sendJoinResponseMutex.RLock() defer fake.sendJoinResponseMutex.RUnlock() argsForCall := fake.sendJoinResponseArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1 } func (fake *FakeLocalParticipant) SendJoinResponseReturns(result1 error) { @@ -4690,6 +4737,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getAudioLevelMutex.RUnlock() fake.getCachedDownTrackMutex.RLock() defer fake.getCachedDownTrackMutex.RUnlock() + fake.getClientConfigurationMutex.RLock() + defer fake.getClientConfigurationMutex.RUnlock() fake.getConnectionQualityMutex.RLock() defer fake.getConnectionQualityMutex.RUnlock() fake.getLoggerMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index d2c8d4b30..ccf5efb33 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" + "github.com/livekit/livekit-server/version" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -40,6 +41,7 @@ type RoomManager struct { config *config.Config rtcConfig *rtc.WebRTCConfig + serverInfo *livekit.ServerInfo currentNode routing.LocalNode router routing.Router roomStore ObjectStore @@ -77,6 +79,14 @@ func NewLocalRoomManager( rooms: make(map[livekit.RoomName]*rtc.Room), iceConfigCache: make(map[livekit.ParticipantIdentity]*iceConfigCacheEntry), + + serverInfo: &livekit.ServerInfo{ + Edition: livekit.ServerInfo_Standard, + Version: version.Version, + Protocol: types.CurrentProtocol, + Region: conf.Region, + NodeId: currentNode.Id, + }, } // hook up to router @@ -281,7 +291,7 @@ func (r *RoomManager) StartSession( opts := rtc.ParticipantOptions{ AutoSubscribe: pi.AutoSubscribe, } - if err = room.Join(participant, &opts, r.iceServersForRoom(protoRoom), r.currentNode.Region); err != nil { + if err = room.Join(participant, &opts, r.iceServersForRoom(protoRoom)); err != nil { pLogger.Errorw("could not join room", err) _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed) return err @@ -366,7 +376,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room } // construct ice servers - newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.telemetry) + newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry) newRoom.OnClose(func() { r.telemetry.RoomEnded(ctx, newRoom.ToProto()) diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index ea1f5b020..54e77b921 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -41,6 +41,7 @@ type ConnectionStats struct { lock sync.RWMutex score float32 lastUpdate time.Time + isLowQuality atomic.Bool maxExpectedLayer int32 done chan struct{} @@ -162,10 +163,15 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit params.ExpectedWidth = expectedWidth params.ExpectedHeight = expectedHeight cs.score = VideoTrackScore(params) - } - if cs.score < 4.0 { - cs.params.Logger.Debugw("low connection quality score", "score", cs.score, "params", params) + if cs.score < 3.5 { + if !cs.isLowQuality.Swap(true) { + // changed from good to low quality, log + cs.params.Logger.Debugw("low connection quality", "score", cs.score, "params", params) + } + } else { + cs.isLowQuality.Store(false) + } } return cs.score