From 14a2d06bcd594bb7d2db5f9aeed99c4415f8e527 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 7 May 2023 10:09:30 +0530 Subject: [PATCH] RTCP sender reports every three seconds. (#1692) * RTCP sender reports every three seconds. Ideally, we should be sending this based on data rate. But, increasing frequency a little as a lost sender report means the client may not have sender report for 10 seconds and that could affect sync. We do receiver reports once a second. Thought of setting this to that level too, but not making a big change from existing rate. Also, simplifying the RTCP send loop. Don't need to hold and do the processing after collecting all reports. * consistent use of GetSubscribedTracks --- pkg/rtc/participant.go | 55 ++++++++++++++---------------------------- 1 file changed, 18 insertions(+), 37 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 91008754c..ebe987428 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -34,7 +34,7 @@ import ( ) const ( - sdBatchSize = 20 + sdBatchSize = 30 rttUpdateInterval = 5 * time.Second disconnectCleanupDuration = 15 * time.Second @@ -391,7 +391,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio p.SubscriptionManager.queueReconcile("") } else { // revoke all subscriptions - for _, st := range p.GetSubscribedTracks() { + for _, st := range p.SubscriptionManager.GetSubscribedTracks() { st.MediaTrack().RemoveSubscriber(p.ID(), false) } } @@ -1330,57 +1330,38 @@ func (p *ParticipantImpl) subscriberRTCPWorker() { return } - var srs []rtcp.Packet - var sd []rtcp.SourceDescriptionChunk subscribedTracks := p.SubscriptionManager.GetSubscribedTracks() - p.lock.RLock() + + // send in batches of sdBatchSize + batchSize := 0 + var pkts []rtcp.Packet + var sd []rtcp.SourceDescriptionChunk for _, subTrack := range subscribedTracks { sr := subTrack.DownTrack().CreateSenderReport() chunks := subTrack.DownTrack().CreateSourceDescriptionChunks() if sr == nil || chunks == nil { continue } - srs = append(srs, sr) + + pkts = append(pkts, sr) sd = append(sd, chunks...) - } - p.lock.RUnlock() - - // now send in batches of sdBatchSize - var batch []rtcp.SourceDescriptionChunk - var pkts []rtcp.Packet - batchSize := 0 - for len(sd) > 0 || len(srs) > 0 { - numSRs := len(srs) - if numSRs > 0 { - if numSRs > sdBatchSize { - numSRs = sdBatchSize - } - pkts = append(pkts, srs[:numSRs]...) - srs = srs[numSRs:] - } - - size := len(sd) - spaceRemain := sdBatchSize - batchSize - if spaceRemain > 0 && size > 0 { - if size > spaceRemain { - size = spaceRemain - } - batch = sd[:size] - sd = sd[size:] - pkts = append(pkts, &rtcp.SourceDescription{Chunks: batch}) + batchSize = batchSize + 1 + len(chunks) + if batchSize >= sdBatchSize { + pkts = append(pkts, &rtcp.SourceDescription{Chunks: sd}) if err := p.TransportManager.WriteSubscriberRTCP(pkts); err != nil { if err == io.EOF || err == io.ErrClosedPipe { return } p.params.Logger.Errorw("could not send down track reports", err) } - } - pkts = pkts[:0] - batchSize = 0 + pkts = pkts[:0] + sd = sd[:0] + batchSize = 0 + } } - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) } } @@ -1987,7 +1968,7 @@ func (p *ParticipantImpl) postRtcp(pkts []rtcp.Packet) { } func (p *ParticipantImpl) setDowntracksConnected() { - for _, t := range p.GetSubscribedTracks() { + for _, t := range p.SubscriptionManager.GetSubscribedTracks() { if dt := t.DownTrack(); dt != nil { dt.SetConnected() }