Return ServerInfo to clients on join (#904)

* checkpoint

* Return ServerInfo in join response

* also include node information

* less verbose quality score

* update go modules
This commit is contained in:
David Zhao
2022-08-10 17:04:17 -07:00
committed by GitHub
parent 1b7c8ddba4
commit f09885825e
11 changed files with 183 additions and 99 deletions

2
go.mod
View File

@@ -16,7 +16,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-version v1.6.0
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e
github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a
github.com/mackerelio/go-osstat v0.2.2
github.com/magefile/mage v1.13.0

7
go.sum
View File

@@ -240,10 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v1.0.0 h1:41aGhSIHtyPJBwzw4Aw1Y4BQpKxLBlS1wK31G8uME8A=
github.com/livekit/protocol v1.0.0/go.mod h1:x51sLXmdYpzHvw+xtaootF4EP5Tasg+CDOpv0UYA3DY=
github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e h1:J8Y/cu8AFW/1KbHXVaKpHEGxqActUQpmcaoxot2DmJQ=
github.com/livekit/protocol v1.0.1-0.20220809040042-d76090cba26e/go.mod h1:JBAOkbmwYmZc4yMTpDrjLFs4RVPApEmoWP1idrjBjdI=
github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d h1:e0esC1DzNhhH4r9GZUQQzuaZd5/lb9pLZqBTdBTVAhI=
github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.2 h1:7jVyXGXTkQL3+6lDVUDBY+Fpo8VQPfyOkZeXxxsXX4c=
@@ -432,7 +430,6 @@ go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.22.0 h1:Zcye5DUgBloQ9BaT4qc9BnjOFog5TvBSAGkJ3Nf70c0=
go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View File

@@ -392,6 +392,10 @@ func (p *ParticipantImpl) ConnectedAt() time.Time {
return p.connectedAt
}
func (p *ParticipantImpl) GetClientConfiguration() *livekit.ClientConfiguration {
return p.params.ClientConf
}
// SetMetadata attaches metadata to the participant
func (p *ParticipantImpl) SetMetadata(metadata string) {
p.lock.Lock()

View File

@@ -9,7 +9,6 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/version"
)
func (p *ParticipantImpl) GetResponseSink() routing.MessageSink {
@@ -31,12 +30,7 @@ func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) {
}
}
func (p *ParticipantImpl) SendJoinResponse(
roomInfo *livekit.Room,
otherParticipants []*livekit.ParticipantInfo,
iceServers []*livekit.ICEServer,
region string,
) error {
func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error {
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
}
@@ -44,20 +38,7 @@ func (p *ParticipantImpl) SendJoinResponse(
// send Join response
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Join{
Join: &livekit.JoinResponse{
Room: roomInfo,
Participant: p.ToProto(),
OtherParticipants: otherParticipants,
ServerVersion: version.Version,
ServerRegion: region,
IceServers: iceServers,
// indicates both server and client support subscriber as primary
SubscriberPrimary: p.SubscriberAsPrimary(),
ClientConfiguration: p.params.ClientConf,
// sane defaults for ping interval & timeout
PingInterval: 10,
PingTimeout: 20,
},
Join: joinResponse,
},
})
}

View File

@@ -43,6 +43,7 @@ type Room struct {
config WebRTCConfig
audioConfig *config.AudioConfig
serverInfo *livekit.ServerInfo
telemetry telemetry.TelemetryService
// map of identity -> Participant
@@ -70,13 +71,20 @@ type ParticipantOptions struct {
AutoSubscribe bool
}
func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry telemetry.TelemetryService) *Room {
func NewRoom(
room *livekit.Room,
config WebRTCConfig,
audioConfig *config.AudioConfig,
serverInfo *livekit.ServerInfo,
telemetry telemetry.TelemetryService,
) *Room {
r := &Room{
protoRoom: proto.Clone(room).(*livekit.Room),
Logger: LoggerWithRoom(logger.GetDefaultLogger(), livekit.RoomName(room.Name), livekit.RoomID(room.Sid)),
config: config,
audioConfig: audioConfig,
telemetry: telemetry,
serverInfo: serverInfo,
participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant),
participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions),
bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize),
@@ -196,7 +204,7 @@ func (r *Room) Release() {
r.holds.Dec()
}
func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions, iceServers []*livekit.ICEServer, region string) error {
func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error {
r.lock.Lock()
defer r.lock.Unlock()
@@ -259,7 +267,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
speakers := r.GetActiveSpeakers()
for _, speaker := range speakers {
if livekit.ParticipantID(speaker.Sid) == publisherID {
p.SendSpeakerUpdate(speakers)
_ = p.SendSpeakerUpdate(speakers)
break
}
}
@@ -269,7 +277,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
if pub != nil && pub.State() == livekit.ParticipantInfo_ACTIVE {
update := &livekit.ConnectionQualityUpdate{}
update.Updates = append(update.Updates, pub.GetConnectionQuality())
p.SendConnectionQualityUpdate(update)
_ = p.SendConnectionQualityUpdate(update)
}
}()
})
@@ -287,14 +295,6 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
r.participants[participant.Identity()] = participant
r.participantOpts[participant.Identity()] = opts
// gather other participants and send join response
otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants))
for _, p := range r.participants {
if p.ID() != participant.ID() && !p.Hidden() {
otherParticipants = append(otherParticipants, p.ToProto())
}
}
if r.onParticipantChanged != nil {
r.onParticipantChanged(participant)
}
@@ -306,7 +306,8 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
}
})
if err := participant.SendJoinResponse(proto.Clone(r.protoRoom).(*livekit.Room), otherParticipants, iceServers, region); err != nil {
joinResponse := r.createJoinResponseLocked(participant, iceServers)
if err := participant.SendJoinResponse(joinResponse); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
return err
}
@@ -663,6 +664,32 @@ func (r *Room) autoSubscribe(participant types.LocalParticipant) bool {
return true
}
func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceServers []*livekit.ICEServer) *livekit.JoinResponse {
// gather other participants and send join response
otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants))
for _, p := range r.participants {
if p.ID() != participant.ID() && !p.Hidden() {
otherParticipants = append(otherParticipants, p.ToProto())
}
}
return &livekit.JoinResponse{
Room: r.protoRoom,
Participant: participant.ToProto(),
OtherParticipants: otherParticipants,
ServerVersion: r.serverInfo.Version,
ServerRegion: r.serverInfo.Region,
IceServers: iceServers,
// indicates both server and client support subscriber as primary
SubscriberPrimary: participant.SubscriberAsPrimary(),
ClientConfiguration: participant.GetClientConfiguration(),
// sane defaults for ping interval & timeout
PingInterval: 10,
PingTimeout: 20,
ServerInfo: r.serverInfo,
}
}
// a ParticipantImpl in the room added a new remoteTrack, subscribe other participants to it
func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) {
// publish participant update, since track state is changed

View File

@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/version"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/webhook"
@@ -69,24 +70,24 @@ func TestJoinedState(t *testing.T) {
func TestRoomJoin(t *testing.T) {
t.Run("joining returns existing participant data", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants})
pNew := newMockParticipant("new", types.DefaultProtocol, false, false)
pNew := newMockParticipant("new", types.CurrentProtocol, false, false)
_ = rm.Join(pNew, nil, iceServersForRoom, "test")
_ = rm.Join(pNew, nil, iceServersForRoom)
// expect new participant to get a JoinReply
info, participants, iceServers, _ := pNew.SendJoinResponseArgsForCall(0)
require.Equal(t, livekit.RoomID(info.Sid), rm.ID())
require.Len(t, participants, numParticipants)
res := pNew.SendJoinResponseArgsForCall(0)
require.Equal(t, livekit.RoomID(res.Room.Sid), rm.ID())
require.Len(t, res.OtherParticipants, numParticipants)
require.Len(t, rm.GetParticipants(), numParticipants+1)
require.NotEmpty(t, iceServers)
require.NotEmpty(t, res.IceServers)
})
t.Run("subscribe to existing channels upon join", func(t *testing.T) {
numExisting := 3
rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting})
p := newMockParticipant("new", types.DefaultProtocol, false, false)
p := newMockParticipant("new", types.CurrentProtocol, false, false)
err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom)
require.NoError(t, err)
stateChangeCB := p.OnStateChangeArgsForCall(0)
@@ -142,7 +143,7 @@ func TestRoomJoin(t *testing.T) {
rm.protoRoom.MaxParticipants = 1
p := newMockParticipant("second", types.ProtocolVersion(0), false, false)
err := rm.Join(p, nil, iceServersForRoom, "")
err := rm.Join(p, nil, iceServersForRoom)
require.Equal(t, ErrMaxParticipantsExceeded, err)
})
}
@@ -340,7 +341,7 @@ func TestRoomClosure(t *testing.T) {
require.Len(t, rm.GetParticipants(), 0)
require.True(t, isClosed)
require.Equal(t, ErrRoomClosed, rm.Join(p, nil, iceServersForRoom, ""))
require.Equal(t, ErrRoomClosed, rm.Join(p, nil, iceServersForRoom))
})
t.Run("room does not close before empty timeout", func(t *testing.T) {
@@ -619,23 +620,23 @@ func TestHiddenParticipants(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1})
defer rm.Close()
pNew := newMockParticipant("new", types.DefaultProtocol, false, false)
rm.Join(pNew, nil, iceServersForRoom, "testregion")
pNew := newMockParticipant("new", types.CurrentProtocol, false, false)
rm.Join(pNew, nil, iceServersForRoom)
// expect new participant to get a JoinReply
info, participants, iceServers, region := pNew.SendJoinResponseArgsForCall(0)
require.Equal(t, livekit.RoomID(info.Sid), rm.ID())
require.Len(t, participants, 2)
res := pNew.SendJoinResponseArgsForCall(0)
require.Equal(t, livekit.RoomID(res.Room.Sid), rm.ID())
require.Len(t, res.OtherParticipants, 2)
require.Len(t, rm.GetParticipants(), 4)
require.NotEmpty(t, iceServers)
require.Equal(t, "testregion", region)
require.NotEmpty(t, res.IceServers)
require.Equal(t, "testregion", res.ServerRegion)
})
t.Run("hidden participant subscribes to tracks", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1})
p := newMockParticipant("new", types.DefaultProtocol, false, true)
p := newMockParticipant("new", types.CurrentProtocol, false, true)
err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
err := rm.Join(p, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom)
require.NoError(t, err)
stateChangeCB := p.OnStateChangeArgsForCall(0)
@@ -687,12 +688,19 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
UpdateInterval: audioUpdateInterval,
SmoothIntervals: opts.audioSmoothIntervals,
},
&livekit.ServerInfo{
Edition: livekit.ServerInfo_Standard,
Version: version.Version,
Protocol: types.CurrentProtocol,
NodeId: "testnode",
Region: "testregion",
},
telemetry.NewTelemetryService(webhook.NewNotifier("", "", nil), &telemetryfakes.FakeAnalyticsService{}),
)
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i))
participant := newMockParticipant(identity, opts.protocol, i >= opts.num, true)
err := rm.Join(participant, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom, "")
err := rm.Join(participant, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom)
require.NoError(t, err)
participant.StateReturns(livekit.ParticipantInfo_ACTIVE)
participant.IsReadyReturns(true)

View File

@@ -204,16 +204,15 @@ type IceConfig struct {
type LocalParticipant interface {
Participant
// getters
GetLogger() logger.Logger
GetAdaptiveStream() bool
ProtocolVersion() ProtocolVersion
ConnectedAt() time.Time
State() livekit.ParticipantInfo_State
IsReady() bool
SubscriberAsPrimary() bool
GetClientConfiguration() *livekit.ClientConfiguration
GetResponseSink() routing.MessageSink
SetResponseSink(sink routing.MessageSink)
@@ -225,13 +224,11 @@ type LocalParticipant interface {
CanSubscribe() bool
CanPublishData() bool
// PeerConnection
AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error
HandleOffer(sdp webrtc.SessionDescription) error
AddTrack(req *livekit.AddTrackRequest)
SetTrackMuted(trackID livekit.TrackID, muted bool, fromAdmin bool)
SubscriberMediaEngine() *webrtc.MediaEngine
SubscriberPC() *webrtc.PeerConnection
HandleAnswer(sdp webrtc.SessionDescription) error
@@ -239,6 +236,8 @@ type LocalParticipant interface {
AddNegotiationPending(publisherID livekit.ParticipantID)
IsNegotiationPending(publisherID livekit.ParticipantID) bool
ICERestart(iceConfig *IceConfig) error
// subscriptions
AddSubscribedTrack(st SubscribedTrack)
RemoveSubscribedTrack(st SubscribedTrack)
UpdateSubscribedTrackSettings(trackID livekit.TrackID, settings *livekit.UpdateTrackSettings) error
@@ -253,7 +252,7 @@ type LocalParticipant interface {
GetConnectionQuality() *livekit.ConnectionQualityInfo
// server sent messages
SendJoinResponse(info *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer, region string) error
SendJoinResponse(joinResponse *livekit.JoinResponse) error
SendParticipantUpdate(participants []*livekit.ParticipantInfo) error
SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error
SendDataPacket(packet *livekit.DataPacket) error
@@ -299,6 +298,7 @@ type LocalParticipant interface {
}
// Room is a container of participants, and can provide room-level actions
//
//counterfeiter:generate . Room
type Room interface {
Name() livekit.RoomName
@@ -313,6 +313,7 @@ type Room interface {
}
// MediaTrack represents a media track
//
//counterfeiter:generate . MediaTrack
type MediaTrack interface {
ID() livekit.TrackID
@@ -369,6 +370,7 @@ type LocalMediaTrack interface {
}
// MediaTrack is the main interface representing a track published to the room
//
//counterfeiter:generate . SubscribedTrack
type SubscribedTrack interface {
OnBind(f func())

View File

@@ -2,7 +2,7 @@ package types
type ProtocolVersion int
const DefaultProtocol = 6
const CurrentProtocol = 8
func (v ProtocolVersion) SupportsPackedStreamId() bool {
return v > 0

View File

@@ -188,6 +188,16 @@ type FakeLocalParticipant struct {
result1 *webrtc.RTPTransceiver
result2 sfu.ForwarderState
}
GetClientConfigurationStub func() *livekit.ClientConfiguration
getClientConfigurationMutex sync.RWMutex
getClientConfigurationArgsForCall []struct {
}
getClientConfigurationReturns struct {
result1 *livekit.ClientConfiguration
}
getClientConfigurationReturnsOnCall map[int]struct {
result1 *livekit.ClientConfiguration
}
GetConnectionQualityStub func() *livekit.ConnectionQualityInfo
getConnectionQualityMutex sync.RWMutex
getConnectionQualityArgsForCall []struct {
@@ -483,13 +493,10 @@ type FakeLocalParticipant struct {
sendDataPacketReturnsOnCall map[int]struct {
result1 error
}
SendJoinResponseStub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer, string) error
SendJoinResponseStub func(*livekit.JoinResponse) error
sendJoinResponseMutex sync.RWMutex
sendJoinResponseArgsForCall []struct {
arg1 *livekit.Room
arg2 []*livekit.ParticipantInfo
arg3 []*livekit.ICEServer
arg4 string
arg1 *livekit.JoinResponse
}
sendJoinResponseReturns struct {
result1 error
@@ -1645,6 +1652,59 @@ func (fake *FakeLocalParticipant) GetCachedDownTrackReturnsOnCall(i int, result1
}{result1, result2}
}
func (fake *FakeLocalParticipant) GetClientConfiguration() *livekit.ClientConfiguration {
fake.getClientConfigurationMutex.Lock()
ret, specificReturn := fake.getClientConfigurationReturnsOnCall[len(fake.getClientConfigurationArgsForCall)]
fake.getClientConfigurationArgsForCall = append(fake.getClientConfigurationArgsForCall, struct {
}{})
stub := fake.GetClientConfigurationStub
fakeReturns := fake.getClientConfigurationReturns
fake.recordInvocation("GetClientConfiguration", []interface{}{})
fake.getClientConfigurationMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetClientConfigurationCallCount() int {
fake.getClientConfigurationMutex.RLock()
defer fake.getClientConfigurationMutex.RUnlock()
return len(fake.getClientConfigurationArgsForCall)
}
func (fake *FakeLocalParticipant) GetClientConfigurationCalls(stub func() *livekit.ClientConfiguration) {
fake.getClientConfigurationMutex.Lock()
defer fake.getClientConfigurationMutex.Unlock()
fake.GetClientConfigurationStub = stub
}
func (fake *FakeLocalParticipant) GetClientConfigurationReturns(result1 *livekit.ClientConfiguration) {
fake.getClientConfigurationMutex.Lock()
defer fake.getClientConfigurationMutex.Unlock()
fake.GetClientConfigurationStub = nil
fake.getClientConfigurationReturns = struct {
result1 *livekit.ClientConfiguration
}{result1}
}
func (fake *FakeLocalParticipant) GetClientConfigurationReturnsOnCall(i int, result1 *livekit.ClientConfiguration) {
fake.getClientConfigurationMutex.Lock()
defer fake.getClientConfigurationMutex.Unlock()
fake.GetClientConfigurationStub = nil
if fake.getClientConfigurationReturnsOnCall == nil {
fake.getClientConfigurationReturnsOnCall = make(map[int]struct {
result1 *livekit.ClientConfiguration
})
}
fake.getClientConfigurationReturnsOnCall[i] = struct {
result1 *livekit.ClientConfiguration
}{result1}
}
func (fake *FakeLocalParticipant) GetConnectionQuality() *livekit.ConnectionQualityInfo {
fake.getConnectionQualityMutex.Lock()
ret, specificReturn := fake.getConnectionQualityReturnsOnCall[len(fake.getConnectionQualityArgsForCall)]
@@ -3293,31 +3353,18 @@ func (fake *FakeLocalParticipant) SendDataPacketReturnsOnCall(i int, result1 err
}{result1}
}
func (fake *FakeLocalParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []*livekit.ParticipantInfo, arg3 []*livekit.ICEServer, arg4 string) error {
var arg2Copy []*livekit.ParticipantInfo
if arg2 != nil {
arg2Copy = make([]*livekit.ParticipantInfo, len(arg2))
copy(arg2Copy, arg2)
}
var arg3Copy []*livekit.ICEServer
if arg3 != nil {
arg3Copy = make([]*livekit.ICEServer, len(arg3))
copy(arg3Copy, arg3)
}
func (fake *FakeLocalParticipant) SendJoinResponse(arg1 *livekit.JoinResponse) error {
fake.sendJoinResponseMutex.Lock()
ret, specificReturn := fake.sendJoinResponseReturnsOnCall[len(fake.sendJoinResponseArgsForCall)]
fake.sendJoinResponseArgsForCall = append(fake.sendJoinResponseArgsForCall, struct {
arg1 *livekit.Room
arg2 []*livekit.ParticipantInfo
arg3 []*livekit.ICEServer
arg4 string
}{arg1, arg2Copy, arg3Copy, arg4})
arg1 *livekit.JoinResponse
}{arg1})
stub := fake.SendJoinResponseStub
fakeReturns := fake.sendJoinResponseReturns
fake.recordInvocation("SendJoinResponse", []interface{}{arg1, arg2Copy, arg3Copy, arg4})
fake.recordInvocation("SendJoinResponse", []interface{}{arg1})
fake.sendJoinResponseMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
return stub(arg1)
}
if specificReturn {
return ret.result1
@@ -3331,17 +3378,17 @@ func (fake *FakeLocalParticipant) SendJoinResponseCallCount() int {
return len(fake.sendJoinResponseArgsForCall)
}
func (fake *FakeLocalParticipant) SendJoinResponseCalls(stub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer, string) error) {
func (fake *FakeLocalParticipant) SendJoinResponseCalls(stub func(*livekit.JoinResponse) error) {
fake.sendJoinResponseMutex.Lock()
defer fake.sendJoinResponseMutex.Unlock()
fake.SendJoinResponseStub = stub
}
func (fake *FakeLocalParticipant) SendJoinResponseArgsForCall(i int) (*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer, string) {
func (fake *FakeLocalParticipant) SendJoinResponseArgsForCall(i int) *livekit.JoinResponse {
fake.sendJoinResponseMutex.RLock()
defer fake.sendJoinResponseMutex.RUnlock()
argsForCall := fake.sendJoinResponseArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SendJoinResponseReturns(result1 error) {
@@ -4690,6 +4737,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getAudioLevelMutex.RUnlock()
fake.getCachedDownTrackMutex.RLock()
defer fake.getCachedDownTrackMutex.RUnlock()
fake.getClientConfigurationMutex.RLock()
defer fake.getClientConfigurationMutex.RUnlock()
fake.getConnectionQualityMutex.RLock()
defer fake.getConnectionQualityMutex.RUnlock()
fake.getLoggerMutex.RLock()

View File

@@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/livekit/livekit-server/version"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -40,6 +41,7 @@ type RoomManager struct {
config *config.Config
rtcConfig *rtc.WebRTCConfig
serverInfo *livekit.ServerInfo
currentNode routing.LocalNode
router routing.Router
roomStore ObjectStore
@@ -77,6 +79,14 @@ func NewLocalRoomManager(
rooms: make(map[livekit.RoomName]*rtc.Room),
iceConfigCache: make(map[livekit.ParticipantIdentity]*iceConfigCacheEntry),
serverInfo: &livekit.ServerInfo{
Edition: livekit.ServerInfo_Standard,
Version: version.Version,
Protocol: types.CurrentProtocol,
Region: conf.Region,
NodeId: currentNode.Id,
},
}
// hook up to router
@@ -281,7 +291,7 @@ func (r *RoomManager) StartSession(
opts := rtc.ParticipantOptions{
AutoSubscribe: pi.AutoSubscribe,
}
if err = room.Join(participant, &opts, r.iceServersForRoom(protoRoom), r.currentNode.Region); err != nil {
if err = room.Join(participant, &opts, r.iceServersForRoom(protoRoom)); err != nil {
pLogger.Errorw("could not join room", err)
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed)
return err
@@ -366,7 +376,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
}
// construct ice servers
newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.telemetry)
newRoom := rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry)
newRoom.OnClose(func() {
r.telemetry.RoomEnded(ctx, newRoom.ToProto())

View File

@@ -41,6 +41,7 @@ type ConnectionStats struct {
lock sync.RWMutex
score float32
lastUpdate time.Time
isLowQuality atomic.Bool
maxExpectedLayer int32
done chan struct{}
@@ -162,10 +163,15 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit
params.ExpectedWidth = expectedWidth
params.ExpectedHeight = expectedHeight
cs.score = VideoTrackScore(params)
}
if cs.score < 4.0 {
cs.params.Logger.Debugw("low connection quality score", "score", cs.score, "params", params)
if cs.score < 3.5 {
if !cs.isLowQuality.Swap(true) {
// changed from good to low quality, log
cs.params.Logger.Debugw("low connection quality", "score", cs.score, "params", params)
}
} else {
cs.isLowQuality.Store(false)
}
}
return cs.score