RTCP sender reports every three seconds. (#1692)

* RTCP sender reports every three seconds.

Ideally, we should be sending this based on data rate.
But, increasing frequency a little as a lost sender report
means the client may not have sender report for 10 seconds
and that could affect sync. We do receiver reports once a second.
Thought of setting this to that level too, but not making a big change
from existing rate.

Also, simplifying the RTCP send loop. Don't need to hold and
do the processing after collecting all reports.

* consistent use of GetSubscribedTracks
This commit is contained in:
Raja Subramanian
2023-05-07 10:09:30 +05:30
committed by GitHub
parent 3fb93135f5
commit 14a2d06bcd
+18 -37
View File
@@ -34,7 +34,7 @@ import (
)
const (
sdBatchSize = 20
sdBatchSize = 30
rttUpdateInterval = 5 * time.Second
disconnectCleanupDuration = 15 * time.Second
@@ -391,7 +391,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
p.SubscriptionManager.queueReconcile("")
} else {
// revoke all subscriptions
for _, st := range p.GetSubscribedTracks() {
for _, st := range p.SubscriptionManager.GetSubscribedTracks() {
st.MediaTrack().RemoveSubscriber(p.ID(), false)
}
}
@@ -1330,57 +1330,38 @@ func (p *ParticipantImpl) subscriberRTCPWorker() {
return
}
var srs []rtcp.Packet
var sd []rtcp.SourceDescriptionChunk
subscribedTracks := p.SubscriptionManager.GetSubscribedTracks()
p.lock.RLock()
// send in batches of sdBatchSize
batchSize := 0
var pkts []rtcp.Packet
var sd []rtcp.SourceDescriptionChunk
for _, subTrack := range subscribedTracks {
sr := subTrack.DownTrack().CreateSenderReport()
chunks := subTrack.DownTrack().CreateSourceDescriptionChunks()
if sr == nil || chunks == nil {
continue
}
srs = append(srs, sr)
pkts = append(pkts, sr)
sd = append(sd, chunks...)
}
p.lock.RUnlock()
// now send in batches of sdBatchSize
var batch []rtcp.SourceDescriptionChunk
var pkts []rtcp.Packet
batchSize := 0
for len(sd) > 0 || len(srs) > 0 {
numSRs := len(srs)
if numSRs > 0 {
if numSRs > sdBatchSize {
numSRs = sdBatchSize
}
pkts = append(pkts, srs[:numSRs]...)
srs = srs[numSRs:]
}
size := len(sd)
spaceRemain := sdBatchSize - batchSize
if spaceRemain > 0 && size > 0 {
if size > spaceRemain {
size = spaceRemain
}
batch = sd[:size]
sd = sd[size:]
pkts = append(pkts, &rtcp.SourceDescription{Chunks: batch})
batchSize = batchSize + 1 + len(chunks)
if batchSize >= sdBatchSize {
pkts = append(pkts, &rtcp.SourceDescription{Chunks: sd})
if err := p.TransportManager.WriteSubscriberRTCP(pkts); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
p.params.Logger.Errorw("could not send down track reports", err)
}
}
pkts = pkts[:0]
batchSize = 0
pkts = pkts[:0]
sd = sd[:0]
batchSize = 0
}
}
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)
}
}
@@ -1987,7 +1968,7 @@ func (p *ParticipantImpl) postRtcp(pkts []rtcp.Packet) {
}
func (p *ParticipantImpl) setDowntracksConnected() {
for _, t := range p.GetSubscribedTracks() {
for _, t := range p.SubscriptionManager.GetSubscribedTracks() {
if dt := t.DownTrack(); dt != nil {
dt.SetConnected()
}