diff --git a/go.mod b/go.mod index 1e924e5a0..bfa324f7b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/gammazero/workerpool v1.1.2 - github.com/go-logr/logr v1.1.0 + github.com/go-logr/logr v1.2.0 github.com/go-logr/zapr v1.1.0 github.com/go-redis/redis/v8 v8.11.3 github.com/google/subcommands v1.2.0 // indirect @@ -15,7 +15,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.9.12-0.20211102204637-f3bd2c316e7b + github.com/livekit/protocol v0.10.0 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 @@ -42,11 +42,8 @@ require ( go.uber.org/zap v1.19.1 golang.org/x/mod v0.5.1 // indirect golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect - golang.org/x/tools v0.1.7 // indirect google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) -replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.14 - -// replace github.com/livekit/protocol => ../protocol +replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.16 diff --git a/go.sum b/go.sum index 8bd71a7fb..a18406e21 100644 --- a/go.sum +++ b/go.sum @@ -112,11 +112,13 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logr/logr v1.0.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.1.0 h1:nAbevmWlS2Ic4m4+/An5NXkaGqlqpbBgdcuThZxnZyI= github.com/go-logr/logr v1.1.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/zapr v1.1.0 h1:rZHor2gcVGCG11UlKl+WUsfCMOOi2k/mTCDKDK6zZws= github.com/go-logr/zapr v1.1.0/go.mod h1:YShqdLLTU346TNVu8Tvwe3bOo6gc75oZ1joeE+1lYdQ= +github.com/go-logr/zerologr v1.2.0 h1:oS1fjSSEHwpv8Lam3SNmPTLTUw6V4DoB2ZzryqrkMB0= +github.com/go-logr/zerologr v1.2.0/go.mod h1:O82obOiXzyxiBNgAMRT1m+XbOvY8K18Kf6XhT52oqoc= github.com/go-redis/redis/v8 v8.11.3 h1:GCjoYp8c+yQTJfc0n69iwSiHjvuAdruxl7elnZCxgt8= github.com/go-redis/redis/v8 v8.11.3/go.mod h1:xNJ9xDG09FsIPwh3bWdk+0oDWHbtF9rPN0F/oD9XeKc= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -248,10 +250,10 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/ion-sfu v1.20.14 h1:/eCaRUIa8KxyaTdjOQFJJJGDrjIVClL14BkYO2NqVSw= -github.com/livekit/ion-sfu v1.20.14/go.mod h1:Nmf1dro+y5vr0laNSJSrYDMDYrjERcEWVbWyenEWZ0A= -github.com/livekit/protocol v0.9.12-0.20211102204637-f3bd2c316e7b h1:jdos5NJ+yOn6AdOLrGp6IeGit+hJyN6W9XtAP5wC9YY= -github.com/livekit/protocol v0.9.12-0.20211102204637-f3bd2c316e7b/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM= +github.com/livekit/ion-sfu v1.20.16 h1:B4+z0sf4t3zZSXFIwHive8malNn6Vje+7b1OW4ETDOM= +github.com/livekit/ion-sfu v1.20.16/go.mod h1:sUjL3tZRROs3NjCm6ZLT+IsisdYVRtxfq4OhVFHVd/A= +github.com/livekit/protocol v0.10.0 h1:s2zf1+G1Tcx6UKIf8mbRzbQ4ELdyS0mlLGsFkTVT5Aw= +github.com/livekit/protocol v0.10.0/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= @@ -418,9 +420,10 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= -github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/rs/zerolog v1.23.0 h1:UskrK+saS9P9Y789yNNulYKdARjPZuS35B8gJF2x60g= -github.com/rs/zerolog v1.23.0/go.mod h1:6c7hFfxPOy7TacJc4Fcdi24/J0NKYGzjG8FWRI916Qo= +github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.25.0/go.mod h1:7KHcEGe0QZPOm2IE4Kpb5rTh6n1h2hIgS5OOnu1rUaI= +github.com/rs/zerolog v1.26.0 h1:ORM4ibhEZeTeQlCojCK2kPz1ogAY4bGs4tD+SaAdGaE= +github.com/rs/zerolog v1.26.0/go.mod h1:yBiM87lvSqX8h0Ww4sdzNSkVYZ8dL2xjZJG1lAuGZEo= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -615,14 +618,12 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -664,7 +665,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20201023174141-c8cfbd0f21e6/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 2fbd49084..51df002d3 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -28,6 +28,10 @@ var ( {Type: webrtc.TypeRTCPFBNACK, Parameter: "pli"}} ) +const ( + lostUpdateDelta = time.Second +) + // MediaTrack represents a WebRTC track that needs to be forwarded // Implements the PublishedTrack interface type MediaTrack struct { @@ -38,6 +42,7 @@ type MediaTrack struct { muted utils.AtomicFlag numUpTracks uint32 simulcasted utils.AtomicFlag + buffer *buffer.Buffer // channel to send RTCP packets to the source lock sync.RWMutex @@ -48,6 +53,14 @@ type MediaTrack struct { receiver sfu.Receiver lastPLI time.Time + // track audio fraction lost + fracLostLock sync.Mutex + maxDownFracLost uint8 + maxDownFracLostTs time.Time + currentUpFracLost uint32 + maxUpFracLost uint8 + maxUpFracLostTs time.Time + onClose func() } @@ -62,6 +75,7 @@ type MediaTrackParams struct { ReceiverConfig ReceiverConfig AudioConfig config.AudioConfig Stats *stats.RoomStatsReporter + Logger logger.Logger } func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTrack { @@ -130,6 +144,10 @@ func (t *MediaTrack) IsSubscriber(subId string) bool { return t.subscribedTracks[subId] != nil } +func (t *MediaTrack) PublishLossPercentage() uint32 { + return FixedPointToPercent(uint8(atomic.LoadUint32(&t.currentUpFracLost))) +} + // AddSubscriber subscribes sub to current mediaTrack func (t *MediaTrack) AddSubscriber(sub types.Participant) error { if !sub.CanSubscribe() { @@ -241,7 +259,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { if sender == nil { return } - logger.Debugw("removing peerconnection track", + t.params.Logger.Debugw("removing peerconnection track", "track", t.ID(), "pIDs", []string{t.params.ParticipantID, sub.ID()}, "participant", sub.Identity(), @@ -252,15 +270,18 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { return } if _, ok := err.(*rtcerr.InvalidStateError); !ok { - logger.Warnw("could not remove remoteTrack from forwarder", err, + t.params.Logger.Warnw("could not remove remoteTrack from forwarder", err, "participant", sub.Identity(), "pID", sub.ID()) } } - sub.RemoveSubscribedTrack(t.params.ParticipantID, subTrack) + sub.RemoveSubscribedTrack(subTrack) sub.Negotiate() }() }) + if t.Kind() == livekit.TrackType_AUDIO { + downTrack.AddReceiverReportListener(t.handleMaxLossFeedback) + } t.subscribedTracks[sub.ID()] = subTrack subTrack.SetPublisherMuted(t.IsMuted()) @@ -268,7 +289,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { t.receiver.AddDownTrack(downTrack, t.shouldStartWithBestQuality()) // since sub will lock, run it in a goroutine to avoid deadlocks go func() { - sub.AddSubscribedTrack(t.params.ParticipantID, subTrack) + sub.AddSubscribedTrack(subTrack) sub.Negotiate() }() @@ -276,8 +297,24 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { return nil } -func (t *MediaTrack) NumUpTracks() uint32 { - return atomic.LoadUint32(&t.numUpTracks) +func (t *MediaTrack) NumUpTracks() (uint32, uint32) { + numRegistered := atomic.LoadUint32(&t.numUpTracks) + numPublishing := uint32(0) + if t.simulcasted.Get() { + t.lock.RLock() + if t.receiver != nil { + for i := int32(0); i < 3; i++ { + if t.receiver.HasSpatialLayer(i) { + numPublishing += 1 + } + } + } + t.lock.RUnlock() + } else { + numPublishing = 1 + } + + return numPublishing, numRegistered } // AddReceiver adds a new RTP receiver to the track @@ -286,13 +323,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra defer t.lock.Unlock() buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC())) - buff.OnFeedback(func(fb []rtcp.Packet) { - if t.params.Stats != nil { - t.params.Stats.Incoming.HandleRTCP(fb) - } - // feedback for the source RTCP - t.params.RTCPChan <- fb - }) + buff.OnFeedback(t.handlePublisherFeedback) if t.Kind() == livekit.TrackType_AUDIO { t.audioLevel = NewAudioLevel(t.params.AudioConfig.ActiveLevel, t.params.AudioConfig.MinPercentile) @@ -310,7 +341,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra rtcpReader.OnPacket(func(bytes []byte) { pkts, err := rtcp.Unmarshal(bytes) if err != nil { - logger.Errorw("could not unmarshal RTCP", err) + t.params.Logger.Errorw("could not unmarshal RTCP", err) return } @@ -341,16 +372,17 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra onclose() } }) - t.receiver.OnFractionLostFB(func(lost uint8) { - buff.SetLastFractionLostReport(lost) - }) t.params.Stats.AddPublishedTrack(t.Kind().String()) + + if t.Kind() == livekit.TrackType_AUDIO { + t.buffer = buff + } } t.receiver.AddUpTrack(track, buff, t.shouldStartWithBestQuality()) - // when RID is set, track is simulcasted - // TODO: how does this work with FF, SSRC based simulcast? - t.simulcasted.TrySet(track.RID() != "") atomic.AddUint32(&t.numUpTracks, 1) + if atomic.LoadUint32(&t.numUpTracks) > 1 { + t.simulcasted.TrySet(true) + } buff.Bind(receiver.GetParameters(), buffer.Options{ MaxBitRate: t.params.ReceiverConfig.maxBitrate, @@ -369,7 +401,7 @@ func (t *MediaTrack) RemoveSubscriber(participantId string) { } func (t *MediaTrack) RemoveAllSubscribers() { - logger.Debugw("removing all subscribers", "track", t.ID()) + t.params.Logger.Debugw("removing all subscribers", "track", t.ID()) t.lock.Lock() defer t.lock.Unlock() for _, subTrack := range t.subscribedTracks { @@ -449,7 +481,7 @@ func (t *MediaTrack) sendDownTrackBindingReports(sub types.Participant) { i := 0 for { if err := sub.SubscriberPC().WriteRTCP(batch); err != nil { - logger.Errorw("could not write RTCP", err) + t.params.Logger.Errorw("could not write RTCP", err) return } if i > 5 { @@ -461,6 +493,72 @@ func (t *MediaTrack) sendDownTrackBindingReports(sub types.Participant) { }() } +func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { + var maxLost uint8 + var hasSenderReport bool + for _, p := range packets { + switch pkt := p.(type) { + case *rtcp.SenderReport: + for _, rr := range pkt.Reports { + if rr.FractionLost > maxLost { + maxLost = rr.FractionLost + } + hasSenderReport = true + } + } + } + + if hasSenderReport { + t.fracLostLock.Lock() + if maxLost > t.maxUpFracLost { + t.maxUpFracLost = maxLost + } + + now := time.Now() + if now.Sub(t.maxUpFracLostTs) > lostUpdateDelta { + atomic.StoreUint32(&t.currentUpFracLost, uint32(t.maxUpFracLost)) + t.maxUpFracLost = 0 + t.maxUpFracLostTs = now + } + t.fracLostLock.Unlock() + } + + if t.params.Stats != nil { + t.params.Stats.Incoming.HandleRTCP(packets) + } + // also look for sender reports + // feedback for the source RTCP + t.params.RTCPChan <- packets +} + +// handles max loss for audio packets +func (t *MediaTrack) handleMaxLossFeedback(_ *sfu.DownTrack, report *rtcp.ReceiverReport) { + var ( + shouldUpdate bool + maxLost uint8 + ) + t.fracLostLock.Lock() + for _, rr := range report.Reports { + if t.maxDownFracLost < rr.FractionLost { + t.maxDownFracLost = rr.FractionLost + } + } + + now := time.Now() + if now.Sub(t.maxDownFracLostTs) > lostUpdateDelta { + shouldUpdate = true + maxLost = t.maxDownFracLost + t.maxDownFracLost = 0 + t.maxDownFracLostTs = now + } + t.fracLostLock.Unlock() + + if shouldUpdate && t.buffer != nil { + // ok to access buffer since receivers are added before subscribers + t.buffer.SetLastFractionLostReport(maxLost) + } +} + func (t *MediaTrack) DebugInfo() map[string]interface{} { info := map[string]interface{}{ "ID": t.ID(), diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 8b5527616..3f83a22b8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -78,6 +78,8 @@ type ParticipantImpl struct { publishedTracks map[string]types.PublishedTrack // client intended to publish, yet to be reconciled pendingTracks map[string]*livekit.TrackInfo + // keep track of other publishers identities that we are subscribed to + subscribedTo sync.Map // string => struct{} lock sync.RWMutex once sync.Once @@ -600,6 +602,14 @@ func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error { }) } +func (p *ParticipantImpl) SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error { + return p.writeMessage(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_ConnectionQuality{ + ConnectionQuality: update, + }, + }) +} + func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool, fromAdmin bool) { isPending := false p.lock.RLock() @@ -664,6 +674,68 @@ func (p *ParticipantImpl) GetAudioLevel() (level uint8, active bool) { return } +func (p *ParticipantImpl) GetConnectionQuality() livekit.ConnectionQuality { + // avg loss across all tracks, weigh published the same as subscribed + var pubLoss, subLoss uint32 + var reducedQualityPub bool + var reducedQualitySub bool + p.lock.RLock() + defer p.lock.RUnlock() + for _, pubTrack := range p.publishedTracks { + if pubTrack.IsMuted() { + continue + } + pubLoss += pubTrack.PublishLossPercentage() + publishing, registered := pubTrack.NumUpTracks() + if registered > 0 && publishing != registered { + reducedQualityPub = true + } + } + numTracks := uint32(len(p.publishedTracks)) + if numTracks > 0 { + pubLoss /= numTracks + } + + for _, subTrack := range p.subscribedTracks { + if subTrack.IsMuted() { + continue + } + if subTrack.DownTrack().TargetSpatialLayer() < subTrack.DownTrack().MaxSpatialLayer() { + reducedQualitySub = true + } + subLoss += subTrack.SubscribeLossPercentage() + } + numTracks = uint32(len(p.subscribedTracks)) + if numTracks > 0 { + subLoss /= numTracks + } + + avgLoss := (pubLoss + subLoss) / 2 + if avgLoss >= 4 { + return livekit.ConnectionQuality_POOR + } else if avgLoss <= 2 && !reducedQualityPub && !reducedQualitySub { + return livekit.ConnectionQuality_EXCELLENT + } + + return livekit.ConnectionQuality_GOOD +} + +func (p *ParticipantImpl) IsSubscribedTo(identity string) bool { + _, ok := p.subscribedTo.Load(identity) + return ok +} + +func (p *ParticipantImpl) GetSubscribedParticipants() []string { + var identities []string + p.subscribedTo.Range(func(key, _ interface{}) bool { + if identity, ok := key.(string); ok { + identities = append(identities, identity) + } + return true + }) + return identities +} + func (p *ParticipantImpl) CanPublish() bool { return p.permission == nil || p.permission.CanPublish } @@ -717,21 +789,32 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack { } // AddSubscribedTrack adds a track to the participant's subscribed list -func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.SubscribedTrack) { - p.params.Logger.Debugw("added subscribedTrack", "pIDs", []string{pubId, p.ID()}, +func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) { + p.params.Logger.Debugw("added subscribedTrack", "publisher", subTrack.PublisherIdentity(), "participant", p.Identity(), "track", subTrack.ID()) p.lock.Lock() p.subscribedTracks[subTrack.ID()] = subTrack p.lock.Unlock() + p.subscribedTo.Store(subTrack.PublisherIdentity(), struct{}{}) } // RemoveSubscribedTrack removes a track to the participant's subscribed list -func (p *ParticipantImpl) RemoveSubscribedTrack(pubId string, subTrack types.SubscribedTrack) { - p.params.Logger.Debugw("removed subscribedTrack", "pIDs", []string{pubId, p.ID()}, +func (p *ParticipantImpl) RemoveSubscribedTrack(subTrack types.SubscribedTrack) { + p.params.Logger.Debugw("removed subscribedTrack", "publisher", subTrack.PublisherIdentity(), "participant", p.Identity(), "track", subTrack.ID()) p.lock.Lock() delete(p.subscribedTracks, subTrack.ID()) + // remove from subscribed map + numRemaining := 0 + for _, st := range p.subscribedTracks { + if st.PublisherIdentity() == subTrack.PublisherIdentity() { + numRemaining++ + } + } p.lock.Unlock() + if numRemaining == 0 { + p.subscribedTo.Delete(subTrack.PublisherIdentity()) + } } func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) { @@ -856,6 +939,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w ReceiverConfig: p.params.Config.Receiver, AudioConfig: p.params.AudioConfig, Stats: p.params.Stats, + Logger: p.params.Logger, }) // add to published and clean up pending diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 1416b2955..69f05c0de 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -243,6 +243,34 @@ func TestMuteSetting(t *testing.T) { }) } +func TestConnectionQuality(t *testing.T) { + testPublishedTrack := func(loss, numPublishing, numRegistered uint32) *typesfakes.FakePublishedTrack { + t := &typesfakes.FakePublishedTrack{} + t.PublishLossPercentageReturns(loss) + t.NumUpTracksReturns(numPublishing, numRegistered) + return t + } + + // TODO: this test is rather limited since we cannot mock DownTrack's Target & Max spatial layers + // to improve this after split + + t.Run("smooth sailing", func(t *testing.T) { + p := newParticipantForTest("test") + p.publishedTracks["video"] = testPublishedTrack(1, 3, 3) + p.publishedTracks["audio"] = testPublishedTrack(0, 1, 1) + + require.Equal(t, livekit.ConnectionQuality_EXCELLENT, p.GetConnectionQuality()) + }) + + t.Run("reduced publishing", func(t *testing.T) { + p := newParticipantForTest("test") + p.publishedTracks["video"] = testPublishedTrack(3, 2, 3) + p.publishedTracks["audio"] = testPublishedTrack(3, 1, 1) + + require.Equal(t, livekit.ConnectionQuality_GOOD, p.GetConnectionQuality()) + }) +} + func newParticipantForTest(identity string) *ParticipantImpl { conf, _ := config.NewConfig("", nil) // disable mux, it doesn't play too well with unit test diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index d4e71d4cf..848297ca9 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -62,7 +62,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)), config: config, audioConfig: audioConfig, - statsReporter: stats.NewRoomStatsReporter(room.Name), + statsReporter: stats.NewRoomStatsReporter(), participants: make(map[string]types.Participant), participantOpts: make(map[string]*ParticipantOptions), bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, logr.Logger{}), @@ -76,6 +76,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC } r.statsReporter.RoomStarted() go r.audioUpdateWorker() + go r.connectionQualityWorker() return r } @@ -703,6 +704,47 @@ func (r *Room) audioUpdateWorker() { } } +func (r *Room) connectionQualityWorker() { + // send updates to only users that are subscribed to each other + for { + if r.IsClosed() { + return + } + + participants := r.GetParticipants() + connectionInfos := make(map[string]*livekit.ConnectionQualityInfo, len(participants)) + + for _, p := range participants { + connectionInfos[p.Identity()] = &livekit.ConnectionQualityInfo{ + ParticipantSid: p.ID(), + Quality: p.GetConnectionQuality(), + } + } + + for _, op := range participants { + update := &livekit.ConnectionQualityUpdate{} + + // send to user itself + if info, ok := connectionInfos[op.Identity()]; ok { + update.Updates = append(update.Updates, info) + } + + // send to other participants its subscribed to + for _, identity := range op.GetSubscribedParticipants() { + if info, ok := connectionInfos[identity]; ok { + update.Updates = append(update.Updates, info) + } + } + if err := op.SendConnectionQualityUpdate(update); err != nil { + r.Logger.Warnw("could not send connection quality update", err, + "participant", op.Identity()) + } + } + + time.Sleep(time.Second * 5) + } +} + func (r *Room) DebugInfo() map[string]interface{} { info := map[string]interface{}{ "Name": r.Room.Name, diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index eeb5ffdea..ec54e678f 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -19,7 +19,8 @@ type SubscribedTrack struct { publisherIdentity string subMuted utils.AtomicFlag pubMuted utils.AtomicFlag - debouncer func(func()) + + debouncer func(func()) } func NewSubscribedTrack(publisherIdentity string, dt *sfu.DownTrack) *SubscribedTrack { @@ -42,6 +43,10 @@ func (t *SubscribedTrack) DownTrack() *sfu.DownTrack { return t.dt } +func (t *SubscribedTrack) SubscribeLossPercentage() uint32 { + return FixedPointToPercent(t.DownTrack().CurrentMaxLossFraction()) +} + // has subscriber indicated it wants to mute this track func (t *SubscribedTrack) IsMuted() bool { return t.subMuted.Get() diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 4100c4302..56bda3f93 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -53,8 +53,13 @@ type Participant interface { SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error SendDataPacket(packet *livekit.DataPacket) error SendRoomUpdate(room *livekit.Room) error + SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error SetTrackMuted(trackId string, muted bool, fromAdmin bool) GetAudioLevel() (level uint8, active bool) + GetConnectionQuality() livekit.ConnectionQuality + IsSubscribedTo(identity string) bool + // returns list of participant identities that the current participant is subscribed to + GetSubscribedParticipants() []string // permissions @@ -78,9 +83,8 @@ type Participant interface { OnClose(func(Participant)) // package methods - - AddSubscribedTrack(participantId string, st SubscribedTrack) - RemoveSubscribedTrack(participantId string, st SubscribedTrack) + AddSubscribedTrack(st SubscribedTrack) + RemoveSubscribedTrack(st SubscribedTrack) SubscriberPC() *webrtc.PeerConnection DebugInfo() map[string]interface{} @@ -104,6 +108,9 @@ type PublishedTrack interface { RemoveAllSubscribers() // returns quality information that's appropriate for width & height GetQualityForDimension(width, height uint32) livekit.VideoQuality + // returns number of uptracks that are publishing, registered + NumUpTracks() (uint32, uint32) + PublishLossPercentage() uint32 ToProto() *livekit.TrackInfo // callbacks @@ -118,6 +125,7 @@ type SubscribedTrack interface { IsMuted() bool SetPublisherMuted(muted bool) UpdateSubscriberSettings(enabled bool, quality livekit.VideoQuality) + SubscribeLossPercentage() uint32 } // interface for properties of webrtc.TrackRemote diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index dafa60412..61e66614c 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -25,11 +25,10 @@ type FakeParticipant struct { addICECandidateReturnsOnCall map[int]struct { result1 error } - AddSubscribedTrackStub func(string, types.SubscribedTrack) + AddSubscribedTrackStub func(types.SubscribedTrack) addSubscribedTrackMutex sync.RWMutex addSubscribedTrackArgsForCall []struct { - arg1 string - arg2 types.SubscribedTrack + arg1 types.SubscribedTrack } AddSubscriberStub func(types.Participant) (int, error) addSubscriberMutex sync.RWMutex @@ -121,6 +120,16 @@ type FakeParticipant struct { result1 uint8 result2 bool } + GetConnectionQualityStub func() livekit.ConnectionQuality + getConnectionQualityMutex sync.RWMutex + getConnectionQualityArgsForCall []struct { + } + getConnectionQualityReturns struct { + result1 livekit.ConnectionQuality + } + getConnectionQualityReturnsOnCall map[int]struct { + result1 livekit.ConnectionQuality + } GetPublishedTrackStub func(string) types.PublishedTrack getPublishedTrackMutex sync.RWMutex getPublishedTrackArgsForCall []struct { @@ -152,6 +161,16 @@ type FakeParticipant struct { getResponseSinkReturnsOnCall map[int]struct { result1 routing.MessageSink } + GetSubscribedParticipantsStub func() []string + getSubscribedParticipantsMutex sync.RWMutex + getSubscribedParticipantsArgsForCall []struct { + } + getSubscribedParticipantsReturns struct { + result1 []string + } + getSubscribedParticipantsReturnsOnCall map[int]struct { + result1 []string + } GetSubscribedTrackStub func(string) types.SubscribedTrack getSubscribedTrackMutex sync.RWMutex getSubscribedTrackArgsForCall []struct { @@ -247,6 +266,17 @@ type FakeParticipant struct { isReadyReturnsOnCall map[int]struct { result1 bool } + IsSubscribedToStub func(string) bool + isSubscribedToMutex sync.RWMutex + isSubscribedToArgsForCall []struct { + arg1 string + } + isSubscribedToReturns struct { + result1 bool + } + isSubscribedToReturnsOnCall map[int]struct { + result1 bool + } NegotiateStub func() negotiateMutex sync.RWMutex negotiateArgsForCall []struct { @@ -301,17 +331,27 @@ type FakeParticipant struct { rTCPChanReturnsOnCall map[int]struct { result1 chan []rtcp.Packet } - RemoveSubscribedTrackStub func(string, types.SubscribedTrack) + RemoveSubscribedTrackStub func(types.SubscribedTrack) removeSubscribedTrackMutex sync.RWMutex removeSubscribedTrackArgsForCall []struct { - arg1 string - arg2 types.SubscribedTrack + arg1 types.SubscribedTrack } RemoveSubscriberStub func(string) removeSubscriberMutex sync.RWMutex removeSubscriberArgsForCall []struct { arg1 string } + SendConnectionQualityUpdateStub func(*livekit.ConnectionQualityUpdate) error + sendConnectionQualityUpdateMutex sync.RWMutex + sendConnectionQualityUpdateArgsForCall []struct { + arg1 *livekit.ConnectionQualityUpdate + } + sendConnectionQualityUpdateReturns struct { + result1 error + } + sendConnectionQualityUpdateReturnsOnCall map[int]struct { + result1 error + } SendDataPacketStub func(*livekit.DataPacket) error sendDataPacketMutex sync.RWMutex sendDataPacketArgsForCall []struct { @@ -502,17 +542,16 @@ func (fake *FakeParticipant) AddICECandidateReturnsOnCall(i int, result1 error) }{result1} } -func (fake *FakeParticipant) AddSubscribedTrack(arg1 string, arg2 types.SubscribedTrack) { +func (fake *FakeParticipant) AddSubscribedTrack(arg1 types.SubscribedTrack) { fake.addSubscribedTrackMutex.Lock() fake.addSubscribedTrackArgsForCall = append(fake.addSubscribedTrackArgsForCall, struct { - arg1 string - arg2 types.SubscribedTrack - }{arg1, arg2}) + arg1 types.SubscribedTrack + }{arg1}) stub := fake.AddSubscribedTrackStub - fake.recordInvocation("AddSubscribedTrack", []interface{}{arg1, arg2}) + fake.recordInvocation("AddSubscribedTrack", []interface{}{arg1}) fake.addSubscribedTrackMutex.Unlock() if stub != nil { - fake.AddSubscribedTrackStub(arg1, arg2) + fake.AddSubscribedTrackStub(arg1) } } @@ -522,17 +561,17 @@ func (fake *FakeParticipant) AddSubscribedTrackCallCount() int { return len(fake.addSubscribedTrackArgsForCall) } -func (fake *FakeParticipant) AddSubscribedTrackCalls(stub func(string, types.SubscribedTrack)) { +func (fake *FakeParticipant) AddSubscribedTrackCalls(stub func(types.SubscribedTrack)) { fake.addSubscribedTrackMutex.Lock() defer fake.addSubscribedTrackMutex.Unlock() fake.AddSubscribedTrackStub = stub } -func (fake *FakeParticipant) AddSubscribedTrackArgsForCall(i int) (string, types.SubscribedTrack) { +func (fake *FakeParticipant) AddSubscribedTrackArgsForCall(i int) types.SubscribedTrack { fake.addSubscribedTrackMutex.RLock() defer fake.addSubscribedTrackMutex.RUnlock() argsForCall := fake.addSubscribedTrackArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1 } func (fake *FakeParticipant) AddSubscriber(arg1 types.Participant) (int, error) { @@ -1005,6 +1044,59 @@ func (fake *FakeParticipant) GetAudioLevelReturnsOnCall(i int, result1 uint8, re }{result1, result2} } +func (fake *FakeParticipant) GetConnectionQuality() livekit.ConnectionQuality { + fake.getConnectionQualityMutex.Lock() + ret, specificReturn := fake.getConnectionQualityReturnsOnCall[len(fake.getConnectionQualityArgsForCall)] + fake.getConnectionQualityArgsForCall = append(fake.getConnectionQualityArgsForCall, struct { + }{}) + stub := fake.GetConnectionQualityStub + fakeReturns := fake.getConnectionQualityReturns + fake.recordInvocation("GetConnectionQuality", []interface{}{}) + fake.getConnectionQualityMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) GetConnectionQualityCallCount() int { + fake.getConnectionQualityMutex.RLock() + defer fake.getConnectionQualityMutex.RUnlock() + return len(fake.getConnectionQualityArgsForCall) +} + +func (fake *FakeParticipant) GetConnectionQualityCalls(stub func() livekit.ConnectionQuality) { + fake.getConnectionQualityMutex.Lock() + defer fake.getConnectionQualityMutex.Unlock() + fake.GetConnectionQualityStub = stub +} + +func (fake *FakeParticipant) GetConnectionQualityReturns(result1 livekit.ConnectionQuality) { + fake.getConnectionQualityMutex.Lock() + defer fake.getConnectionQualityMutex.Unlock() + fake.GetConnectionQualityStub = nil + fake.getConnectionQualityReturns = struct { + result1 livekit.ConnectionQuality + }{result1} +} + +func (fake *FakeParticipant) GetConnectionQualityReturnsOnCall(i int, result1 livekit.ConnectionQuality) { + fake.getConnectionQualityMutex.Lock() + defer fake.getConnectionQualityMutex.Unlock() + fake.GetConnectionQualityStub = nil + if fake.getConnectionQualityReturnsOnCall == nil { + fake.getConnectionQualityReturnsOnCall = make(map[int]struct { + result1 livekit.ConnectionQuality + }) + } + fake.getConnectionQualityReturnsOnCall[i] = struct { + result1 livekit.ConnectionQuality + }{result1} +} + func (fake *FakeParticipant) GetPublishedTrack(arg1 string) types.PublishedTrack { fake.getPublishedTrackMutex.Lock() ret, specificReturn := fake.getPublishedTrackReturnsOnCall[len(fake.getPublishedTrackArgsForCall)] @@ -1172,6 +1264,59 @@ func (fake *FakeParticipant) GetResponseSinkReturnsOnCall(i int, result1 routing }{result1} } +func (fake *FakeParticipant) GetSubscribedParticipants() []string { + fake.getSubscribedParticipantsMutex.Lock() + ret, specificReturn := fake.getSubscribedParticipantsReturnsOnCall[len(fake.getSubscribedParticipantsArgsForCall)] + fake.getSubscribedParticipantsArgsForCall = append(fake.getSubscribedParticipantsArgsForCall, struct { + }{}) + stub := fake.GetSubscribedParticipantsStub + fakeReturns := fake.getSubscribedParticipantsReturns + fake.recordInvocation("GetSubscribedParticipants", []interface{}{}) + fake.getSubscribedParticipantsMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) GetSubscribedParticipantsCallCount() int { + fake.getSubscribedParticipantsMutex.RLock() + defer fake.getSubscribedParticipantsMutex.RUnlock() + return len(fake.getSubscribedParticipantsArgsForCall) +} + +func (fake *FakeParticipant) GetSubscribedParticipantsCalls(stub func() []string) { + fake.getSubscribedParticipantsMutex.Lock() + defer fake.getSubscribedParticipantsMutex.Unlock() + fake.GetSubscribedParticipantsStub = stub +} + +func (fake *FakeParticipant) GetSubscribedParticipantsReturns(result1 []string) { + fake.getSubscribedParticipantsMutex.Lock() + defer fake.getSubscribedParticipantsMutex.Unlock() + fake.GetSubscribedParticipantsStub = nil + fake.getSubscribedParticipantsReturns = struct { + result1 []string + }{result1} +} + +func (fake *FakeParticipant) GetSubscribedParticipantsReturnsOnCall(i int, result1 []string) { + fake.getSubscribedParticipantsMutex.Lock() + defer fake.getSubscribedParticipantsMutex.Unlock() + fake.GetSubscribedParticipantsStub = nil + if fake.getSubscribedParticipantsReturnsOnCall == nil { + fake.getSubscribedParticipantsReturnsOnCall = make(map[int]struct { + result1 []string + }) + } + fake.getSubscribedParticipantsReturnsOnCall[i] = struct { + result1 []string + }{result1} +} + func (fake *FakeParticipant) GetSubscribedTrack(arg1 string) types.SubscribedTrack { fake.getSubscribedTrackMutex.Lock() ret, specificReturn := fake.getSubscribedTrackReturnsOnCall[len(fake.getSubscribedTrackArgsForCall)] @@ -1676,6 +1821,67 @@ func (fake *FakeParticipant) IsReadyReturnsOnCall(i int, result1 bool) { }{result1} } +func (fake *FakeParticipant) IsSubscribedTo(arg1 string) bool { + fake.isSubscribedToMutex.Lock() + ret, specificReturn := fake.isSubscribedToReturnsOnCall[len(fake.isSubscribedToArgsForCall)] + fake.isSubscribedToArgsForCall = append(fake.isSubscribedToArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.IsSubscribedToStub + fakeReturns := fake.isSubscribedToReturns + fake.recordInvocation("IsSubscribedTo", []interface{}{arg1}) + fake.isSubscribedToMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) IsSubscribedToCallCount() int { + fake.isSubscribedToMutex.RLock() + defer fake.isSubscribedToMutex.RUnlock() + return len(fake.isSubscribedToArgsForCall) +} + +func (fake *FakeParticipant) IsSubscribedToCalls(stub func(string) bool) { + fake.isSubscribedToMutex.Lock() + defer fake.isSubscribedToMutex.Unlock() + fake.IsSubscribedToStub = stub +} + +func (fake *FakeParticipant) IsSubscribedToArgsForCall(i int) string { + fake.isSubscribedToMutex.RLock() + defer fake.isSubscribedToMutex.RUnlock() + argsForCall := fake.isSubscribedToArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeParticipant) IsSubscribedToReturns(result1 bool) { + fake.isSubscribedToMutex.Lock() + defer fake.isSubscribedToMutex.Unlock() + fake.IsSubscribedToStub = nil + fake.isSubscribedToReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeParticipant) IsSubscribedToReturnsOnCall(i int, result1 bool) { + fake.isSubscribedToMutex.Lock() + defer fake.isSubscribedToMutex.Unlock() + fake.IsSubscribedToStub = nil + if fake.isSubscribedToReturnsOnCall == nil { + fake.isSubscribedToReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.isSubscribedToReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeParticipant) Negotiate() { fake.negotiateMutex.Lock() fake.negotiateArgsForCall = append(fake.negotiateArgsForCall, struct { @@ -1998,17 +2204,16 @@ func (fake *FakeParticipant) RTCPChanReturnsOnCall(i int, result1 chan []rtcp.Pa }{result1} } -func (fake *FakeParticipant) RemoveSubscribedTrack(arg1 string, arg2 types.SubscribedTrack) { +func (fake *FakeParticipant) RemoveSubscribedTrack(arg1 types.SubscribedTrack) { fake.removeSubscribedTrackMutex.Lock() fake.removeSubscribedTrackArgsForCall = append(fake.removeSubscribedTrackArgsForCall, struct { - arg1 string - arg2 types.SubscribedTrack - }{arg1, arg2}) + arg1 types.SubscribedTrack + }{arg1}) stub := fake.RemoveSubscribedTrackStub - fake.recordInvocation("RemoveSubscribedTrack", []interface{}{arg1, arg2}) + fake.recordInvocation("RemoveSubscribedTrack", []interface{}{arg1}) fake.removeSubscribedTrackMutex.Unlock() if stub != nil { - fake.RemoveSubscribedTrackStub(arg1, arg2) + fake.RemoveSubscribedTrackStub(arg1) } } @@ -2018,17 +2223,17 @@ func (fake *FakeParticipant) RemoveSubscribedTrackCallCount() int { return len(fake.removeSubscribedTrackArgsForCall) } -func (fake *FakeParticipant) RemoveSubscribedTrackCalls(stub func(string, types.SubscribedTrack)) { +func (fake *FakeParticipant) RemoveSubscribedTrackCalls(stub func(types.SubscribedTrack)) { fake.removeSubscribedTrackMutex.Lock() defer fake.removeSubscribedTrackMutex.Unlock() fake.RemoveSubscribedTrackStub = stub } -func (fake *FakeParticipant) RemoveSubscribedTrackArgsForCall(i int) (string, types.SubscribedTrack) { +func (fake *FakeParticipant) RemoveSubscribedTrackArgsForCall(i int) types.SubscribedTrack { fake.removeSubscribedTrackMutex.RLock() defer fake.removeSubscribedTrackMutex.RUnlock() argsForCall := fake.removeSubscribedTrackArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1 } func (fake *FakeParticipant) RemoveSubscriber(arg1 string) { @@ -2063,6 +2268,67 @@ func (fake *FakeParticipant) RemoveSubscriberArgsForCall(i int) string { return argsForCall.arg1 } +func (fake *FakeParticipant) SendConnectionQualityUpdate(arg1 *livekit.ConnectionQualityUpdate) error { + fake.sendConnectionQualityUpdateMutex.Lock() + ret, specificReturn := fake.sendConnectionQualityUpdateReturnsOnCall[len(fake.sendConnectionQualityUpdateArgsForCall)] + fake.sendConnectionQualityUpdateArgsForCall = append(fake.sendConnectionQualityUpdateArgsForCall, struct { + arg1 *livekit.ConnectionQualityUpdate + }{arg1}) + stub := fake.SendConnectionQualityUpdateStub + fakeReturns := fake.sendConnectionQualityUpdateReturns + fake.recordInvocation("SendConnectionQualityUpdate", []interface{}{arg1}) + fake.sendConnectionQualityUpdateMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) SendConnectionQualityUpdateCallCount() int { + fake.sendConnectionQualityUpdateMutex.RLock() + defer fake.sendConnectionQualityUpdateMutex.RUnlock() + return len(fake.sendConnectionQualityUpdateArgsForCall) +} + +func (fake *FakeParticipant) SendConnectionQualityUpdateCalls(stub func(*livekit.ConnectionQualityUpdate) error) { + fake.sendConnectionQualityUpdateMutex.Lock() + defer fake.sendConnectionQualityUpdateMutex.Unlock() + fake.SendConnectionQualityUpdateStub = stub +} + +func (fake *FakeParticipant) SendConnectionQualityUpdateArgsForCall(i int) *livekit.ConnectionQualityUpdate { + fake.sendConnectionQualityUpdateMutex.RLock() + defer fake.sendConnectionQualityUpdateMutex.RUnlock() + argsForCall := fake.sendConnectionQualityUpdateArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeParticipant) SendConnectionQualityUpdateReturns(result1 error) { + fake.sendConnectionQualityUpdateMutex.Lock() + defer fake.sendConnectionQualityUpdateMutex.Unlock() + fake.SendConnectionQualityUpdateStub = nil + fake.sendConnectionQualityUpdateReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeParticipant) SendConnectionQualityUpdateReturnsOnCall(i int, result1 error) { + fake.sendConnectionQualityUpdateMutex.Lock() + defer fake.sendConnectionQualityUpdateMutex.Unlock() + fake.SendConnectionQualityUpdateStub = nil + if fake.sendConnectionQualityUpdateReturnsOnCall == nil { + fake.sendConnectionQualityUpdateReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendConnectionQualityUpdateReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeParticipant) SendDataPacket(arg1 *livekit.DataPacket) error { fake.sendDataPacketMutex.Lock() ret, specificReturn := fake.sendDataPacketReturnsOnCall[len(fake.sendDataPacketArgsForCall)] @@ -2782,12 +3048,16 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.debugInfoMutex.RUnlock() fake.getAudioLevelMutex.RLock() defer fake.getAudioLevelMutex.RUnlock() + fake.getConnectionQualityMutex.RLock() + defer fake.getConnectionQualityMutex.RUnlock() fake.getPublishedTrackMutex.RLock() defer fake.getPublishedTrackMutex.RUnlock() fake.getPublishedTracksMutex.RLock() defer fake.getPublishedTracksMutex.RUnlock() fake.getResponseSinkMutex.RLock() defer fake.getResponseSinkMutex.RUnlock() + fake.getSubscribedParticipantsMutex.RLock() + defer fake.getSubscribedParticipantsMutex.RUnlock() fake.getSubscribedTrackMutex.RLock() defer fake.getSubscribedTrackMutex.RUnlock() fake.getSubscribedTracksMutex.RLock() @@ -2806,6 +3076,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.identityMutex.RUnlock() fake.isReadyMutex.RLock() defer fake.isReadyMutex.RUnlock() + fake.isSubscribedToMutex.RLock() + defer fake.isSubscribedToMutex.RUnlock() fake.negotiateMutex.RLock() defer fake.negotiateMutex.RUnlock() fake.onCloseMutex.RLock() @@ -2828,6 +3100,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.removeSubscribedTrackMutex.RUnlock() fake.removeSubscriberMutex.RLock() defer fake.removeSubscriberMutex.RUnlock() + fake.sendConnectionQualityUpdateMutex.RLock() + defer fake.sendConnectionQualityUpdateMutex.RUnlock() fake.sendDataPacketMutex.RLock() defer fake.sendDataPacketMutex.RUnlock() fake.sendJoinResponseMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_published_track.go b/pkg/rtc/types/typesfakes/fake_published_track.go index 5ba749019..ca83b1b00 100644 --- a/pkg/rtc/types/typesfakes/fake_published_track.go +++ b/pkg/rtc/types/typesfakes/fake_published_track.go @@ -83,11 +83,33 @@ type FakePublishedTrack struct { nameReturnsOnCall map[int]struct { result1 string } + NumUpTracksStub func() (uint32, uint32) + numUpTracksMutex sync.RWMutex + numUpTracksArgsForCall []struct { + } + numUpTracksReturns struct { + result1 uint32 + result2 uint32 + } + numUpTracksReturnsOnCall map[int]struct { + result1 uint32 + result2 uint32 + } OnCloseStub func(func()) onCloseMutex sync.RWMutex onCloseArgsForCall []struct { arg1 func() } + PublishLossPercentageStub func() uint32 + publishLossPercentageMutex sync.RWMutex + publishLossPercentageArgsForCall []struct { + } + publishLossPercentageReturns struct { + result1 uint32 + } + publishLossPercentageReturnsOnCall map[int]struct { + result1 uint32 + } RemoveAllSubscribersStub func() removeAllSubscribersMutex sync.RWMutex removeAllSubscribersArgsForCall []struct { @@ -536,6 +558,62 @@ func (fake *FakePublishedTrack) NameReturnsOnCall(i int, result1 string) { }{result1} } +func (fake *FakePublishedTrack) NumUpTracks() (uint32, uint32) { + fake.numUpTracksMutex.Lock() + ret, specificReturn := fake.numUpTracksReturnsOnCall[len(fake.numUpTracksArgsForCall)] + fake.numUpTracksArgsForCall = append(fake.numUpTracksArgsForCall, struct { + }{}) + stub := fake.NumUpTracksStub + fakeReturns := fake.numUpTracksReturns + fake.recordInvocation("NumUpTracks", []interface{}{}) + fake.numUpTracksMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakePublishedTrack) NumUpTracksCallCount() int { + fake.numUpTracksMutex.RLock() + defer fake.numUpTracksMutex.RUnlock() + return len(fake.numUpTracksArgsForCall) +} + +func (fake *FakePublishedTrack) NumUpTracksCalls(stub func() (uint32, uint32)) { + fake.numUpTracksMutex.Lock() + defer fake.numUpTracksMutex.Unlock() + fake.NumUpTracksStub = stub +} + +func (fake *FakePublishedTrack) NumUpTracksReturns(result1 uint32, result2 uint32) { + fake.numUpTracksMutex.Lock() + defer fake.numUpTracksMutex.Unlock() + fake.NumUpTracksStub = nil + fake.numUpTracksReturns = struct { + result1 uint32 + result2 uint32 + }{result1, result2} +} + +func (fake *FakePublishedTrack) NumUpTracksReturnsOnCall(i int, result1 uint32, result2 uint32) { + fake.numUpTracksMutex.Lock() + defer fake.numUpTracksMutex.Unlock() + fake.NumUpTracksStub = nil + if fake.numUpTracksReturnsOnCall == nil { + fake.numUpTracksReturnsOnCall = make(map[int]struct { + result1 uint32 + result2 uint32 + }) + } + fake.numUpTracksReturnsOnCall[i] = struct { + result1 uint32 + result2 uint32 + }{result1, result2} +} + func (fake *FakePublishedTrack) OnClose(arg1 func()) { fake.onCloseMutex.Lock() fake.onCloseArgsForCall = append(fake.onCloseArgsForCall, struct { @@ -568,6 +646,59 @@ func (fake *FakePublishedTrack) OnCloseArgsForCall(i int) func() { return argsForCall.arg1 } +func (fake *FakePublishedTrack) PublishLossPercentage() uint32 { + fake.publishLossPercentageMutex.Lock() + ret, specificReturn := fake.publishLossPercentageReturnsOnCall[len(fake.publishLossPercentageArgsForCall)] + fake.publishLossPercentageArgsForCall = append(fake.publishLossPercentageArgsForCall, struct { + }{}) + stub := fake.PublishLossPercentageStub + fakeReturns := fake.publishLossPercentageReturns + fake.recordInvocation("PublishLossPercentage", []interface{}{}) + fake.publishLossPercentageMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakePublishedTrack) PublishLossPercentageCallCount() int { + fake.publishLossPercentageMutex.RLock() + defer fake.publishLossPercentageMutex.RUnlock() + return len(fake.publishLossPercentageArgsForCall) +} + +func (fake *FakePublishedTrack) PublishLossPercentageCalls(stub func() uint32) { + fake.publishLossPercentageMutex.Lock() + defer fake.publishLossPercentageMutex.Unlock() + fake.PublishLossPercentageStub = stub +} + +func (fake *FakePublishedTrack) PublishLossPercentageReturns(result1 uint32) { + fake.publishLossPercentageMutex.Lock() + defer fake.publishLossPercentageMutex.Unlock() + fake.PublishLossPercentageStub = nil + fake.publishLossPercentageReturns = struct { + result1 uint32 + }{result1} +} + +func (fake *FakePublishedTrack) PublishLossPercentageReturnsOnCall(i int, result1 uint32) { + fake.publishLossPercentageMutex.Lock() + defer fake.publishLossPercentageMutex.Unlock() + fake.PublishLossPercentageStub = nil + if fake.publishLossPercentageReturnsOnCall == nil { + fake.publishLossPercentageReturnsOnCall = make(map[int]struct { + result1 uint32 + }) + } + fake.publishLossPercentageReturnsOnCall[i] = struct { + result1 uint32 + }{result1} +} + func (fake *FakePublishedTrack) RemoveAllSubscribers() { fake.removeAllSubscribersMutex.Lock() fake.removeAllSubscribersArgsForCall = append(fake.removeAllSubscribersArgsForCall, struct { @@ -856,8 +987,12 @@ func (fake *FakePublishedTrack) Invocations() map[string][][]interface{} { defer fake.kindMutex.RUnlock() fake.nameMutex.RLock() defer fake.nameMutex.RUnlock() + fake.numUpTracksMutex.RLock() + defer fake.numUpTracksMutex.RUnlock() fake.onCloseMutex.RLock() defer fake.onCloseMutex.RUnlock() + fake.publishLossPercentageMutex.RLock() + defer fake.publishLossPercentageMutex.RUnlock() fake.removeAllSubscribersMutex.RLock() defer fake.removeAllSubscribersMutex.RUnlock() fake.removeSubscriberMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_subscribed_track.go b/pkg/rtc/types/typesfakes/fake_subscribed_track.go index a982a4af2..77ef8a2d4 100644 --- a/pkg/rtc/types/typesfakes/fake_subscribed_track.go +++ b/pkg/rtc/types/typesfakes/fake_subscribed_track.go @@ -55,6 +55,16 @@ type FakeSubscribedTrack struct { setPublisherMutedArgsForCall []struct { arg1 bool } + SubscribeLossPercentageStub func() uint32 + subscribeLossPercentageMutex sync.RWMutex + subscribeLossPercentageArgsForCall []struct { + } + subscribeLossPercentageReturns struct { + result1 uint32 + } + subscribeLossPercentageReturnsOnCall map[int]struct { + result1 uint32 + } UpdateSubscriberSettingsStub func(bool, livekit.VideoQuality) updateSubscriberSettingsMutex sync.RWMutex updateSubscriberSettingsArgsForCall []struct { @@ -309,6 +319,59 @@ func (fake *FakeSubscribedTrack) SetPublisherMutedArgsForCall(i int) bool { return argsForCall.arg1 } +func (fake *FakeSubscribedTrack) SubscribeLossPercentage() uint32 { + fake.subscribeLossPercentageMutex.Lock() + ret, specificReturn := fake.subscribeLossPercentageReturnsOnCall[len(fake.subscribeLossPercentageArgsForCall)] + fake.subscribeLossPercentageArgsForCall = append(fake.subscribeLossPercentageArgsForCall, struct { + }{}) + stub := fake.SubscribeLossPercentageStub + fakeReturns := fake.subscribeLossPercentageReturns + fake.recordInvocation("SubscribeLossPercentage", []interface{}{}) + fake.subscribeLossPercentageMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeSubscribedTrack) SubscribeLossPercentageCallCount() int { + fake.subscribeLossPercentageMutex.RLock() + defer fake.subscribeLossPercentageMutex.RUnlock() + return len(fake.subscribeLossPercentageArgsForCall) +} + +func (fake *FakeSubscribedTrack) SubscribeLossPercentageCalls(stub func() uint32) { + fake.subscribeLossPercentageMutex.Lock() + defer fake.subscribeLossPercentageMutex.Unlock() + fake.SubscribeLossPercentageStub = stub +} + +func (fake *FakeSubscribedTrack) SubscribeLossPercentageReturns(result1 uint32) { + fake.subscribeLossPercentageMutex.Lock() + defer fake.subscribeLossPercentageMutex.Unlock() + fake.SubscribeLossPercentageStub = nil + fake.subscribeLossPercentageReturns = struct { + result1 uint32 + }{result1} +} + +func (fake *FakeSubscribedTrack) SubscribeLossPercentageReturnsOnCall(i int, result1 uint32) { + fake.subscribeLossPercentageMutex.Lock() + defer fake.subscribeLossPercentageMutex.Unlock() + fake.SubscribeLossPercentageStub = nil + if fake.subscribeLossPercentageReturnsOnCall == nil { + fake.subscribeLossPercentageReturnsOnCall = make(map[int]struct { + result1 uint32 + }) + } + fake.subscribeLossPercentageReturnsOnCall[i] = struct { + result1 uint32 + }{result1} +} + func (fake *FakeSubscribedTrack) UpdateSubscriberSettings(arg1 bool, arg2 livekit.VideoQuality) { fake.updateSubscriberSettingsMutex.Lock() fake.updateSubscriberSettingsArgsForCall = append(fake.updateSubscriberSettingsArgsForCall, struct { @@ -355,6 +418,8 @@ func (fake *FakeSubscribedTrack) Invocations() map[string][][]interface{} { defer fake.publisherIdentityMutex.RUnlock() fake.setPublisherMutedMutex.RLock() defer fake.setPublisherMutedMutex.RUnlock() + fake.subscribeLossPercentageMutex.RLock() + defer fake.subscribeLossPercentageMutex.RUnlock() fake.updateSubscriberSettingsMutex.RLock() defer fake.updateSubscriberSettingsMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/rtc/utils.go b/pkg/rtc/utils.go index f9079df15..c5c17e262 100644 --- a/pkg/rtc/utils.go +++ b/pkg/rtc/utils.go @@ -44,6 +44,11 @@ func UnpackDataTrackLabel(packed string) (peerId string, trackId string, label s return } +// converts a fixed point number to the number part of % +func FixedPointToPercent(frac uint8) uint32 { + return (uint32(frac) * 100) >> 8 +} + func ToProtoParticipants(participants []types.Participant) []*livekit.ParticipantInfo { infos := make([]*livekit.ParticipantInfo, 0, len(participants)) for _, op := range participants { diff --git a/pkg/utils/stats/packetstats.go b/pkg/utils/stats/packetstats.go index ae4971c84..cdb45d5f8 100644 --- a/pkg/utils/stats/packetstats.go +++ b/pkg/utils/stats/packetstats.go @@ -53,7 +53,6 @@ func initPacketStats() { } type PacketStats struct { - roomName string direction string // incoming or outgoing PacketBytes uint64 `json:"packetBytes"` @@ -63,9 +62,8 @@ type PacketStats struct { FIRTotal uint64 `json:"firTotal"` } -func newPacketStats(room, direction string) *PacketStats { +func newPacketStats(direction string) *PacketStats { return &PacketStats{ - roomName: room, direction: direction, } } @@ -121,7 +119,6 @@ func (s *PacketStats) HandleRTCP(pkts []rtcp.Packet) { func (s PacketStats) Copy() *PacketStats { return &PacketStats{ - roomName: s.roomName, direction: s.direction, PacketBytes: atomic.LoadUint64(&s.PacketBytes), PacketTotal: atomic.LoadUint64(&s.PacketTotal), diff --git a/pkg/utils/stats/roomstatsreporter.go b/pkg/utils/stats/roomstatsreporter.go index a416a6193..7bf944d41 100644 --- a/pkg/utils/stats/roomstatsreporter.go +++ b/pkg/utils/stats/roomstatsreporter.go @@ -60,11 +60,10 @@ type RoomStatsReporter struct { Outgoing *PacketStats } -func NewRoomStatsReporter(roomName string) *RoomStatsReporter { +func NewRoomStatsReporter() *RoomStatsReporter { return &RoomStatsReporter{ - roomName: roomName, - Incoming: newPacketStats(roomName, "incoming"), - Outgoing: newPacketStats(roomName, "outgoing"), + Incoming: newPacketStats("incoming"), + Outgoing: newPacketStats("outgoing"), } }