diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 91008754c..ebe987428 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -34,7 +34,7 @@ import ( ) const ( - sdBatchSize = 20 + sdBatchSize = 30 rttUpdateInterval = 5 * time.Second disconnectCleanupDuration = 15 * time.Second @@ -391,7 +391,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio p.SubscriptionManager.queueReconcile("") } else { // revoke all subscriptions - for _, st := range p.GetSubscribedTracks() { + for _, st := range p.SubscriptionManager.GetSubscribedTracks() { st.MediaTrack().RemoveSubscriber(p.ID(), false) } } @@ -1330,57 +1330,38 @@ func (p *ParticipantImpl) subscriberRTCPWorker() { return } - var srs []rtcp.Packet - var sd []rtcp.SourceDescriptionChunk subscribedTracks := p.SubscriptionManager.GetSubscribedTracks() - p.lock.RLock() + + // send in batches of sdBatchSize + batchSize := 0 + var pkts []rtcp.Packet + var sd []rtcp.SourceDescriptionChunk for _, subTrack := range subscribedTracks { sr := subTrack.DownTrack().CreateSenderReport() chunks := subTrack.DownTrack().CreateSourceDescriptionChunks() if sr == nil || chunks == nil { continue } - srs = append(srs, sr) + + pkts = append(pkts, sr) sd = append(sd, chunks...) - } - p.lock.RUnlock() - - // now send in batches of sdBatchSize - var batch []rtcp.SourceDescriptionChunk - var pkts []rtcp.Packet - batchSize := 0 - for len(sd) > 0 || len(srs) > 0 { - numSRs := len(srs) - if numSRs > 0 { - if numSRs > sdBatchSize { - numSRs = sdBatchSize - } - pkts = append(pkts, srs[:numSRs]...) - srs = srs[numSRs:] - } - - size := len(sd) - spaceRemain := sdBatchSize - batchSize - if spaceRemain > 0 && size > 0 { - if size > spaceRemain { - size = spaceRemain - } - batch = sd[:size] - sd = sd[size:] - pkts = append(pkts, &rtcp.SourceDescription{Chunks: batch}) + batchSize = batchSize + 1 + len(chunks) + if batchSize >= sdBatchSize { + pkts = append(pkts, &rtcp.SourceDescription{Chunks: sd}) if err := p.TransportManager.WriteSubscriberRTCP(pkts); err != nil { if err == io.EOF || err == io.ErrClosedPipe { return } p.params.Logger.Errorw("could not send down track reports", err) } - } - pkts = pkts[:0] - batchSize = 0 + pkts = pkts[:0] + sd = sd[:0] + batchSize = 0 + } } - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) } } @@ -1987,7 +1968,7 @@ func (p *ParticipantImpl) postRtcp(pkts []rtcp.Packet) { } func (p *ParticipantImpl) setDowntracksConnected() { - for _, t := range p.GetSubscribedTracks() { + for _, t := range p.SubscriptionManager.GetSubscribedTracks() { if dt := t.DownTrack(); dt != nil { dt.SetConnected() }