Merge remote-tracking branch 'origin/master' into raja_fr

This commit is contained in:
boks1971
2023-10-18 22:05:50 +05:30
15 changed files with 160 additions and 71 deletions
+4 -4
View File
@@ -17,7 +17,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.8.0
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
@@ -27,14 +27,14 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/pion/dtls/v2 v2.2.7
github.com/pion/ice/v2 v2.3.11
github.com/pion/interceptor v0.1.19
github.com/pion/interceptor v0.1.23
github.com/pion/rtcp v1.2.10
github.com/pion/rtp v1.8.1
github.com/pion/rtp v1.8.2
github.com/pion/sctp v1.8.9
github.com/pion/sdp/v3 v3.0.6
github.com/pion/transport/v2 v2.2.4
github.com/pion/turn/v2 v2.1.4
github.com/pion/webrtc/v3 v3.2.20
github.com/pion/webrtc/v3 v3.2.21
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/redis/go-redis/v9 v9.2.1
+8 -7
View File
@@ -124,8 +124,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c h1:eTghhsCfx2ltyzArXZ7wiNoFFzbfLXJ4uI/IsLXFZQc=
github.com/livekit/mediatransportutil v0.0.0-20231005043905-c137afffe71c/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.8.0 h1:0z2eRmEXFFXiJ7WPAxRLMNCyUu55w41iikbbeT8dvlQ=
github.com/livekit/protocol v1.8.0/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
@@ -189,8 +189,8 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ
github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw=
github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E=
github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I=
github.com/pion/interceptor v0.1.19 h1:tq0TGBzuZQqipyBhaC1mVUCfCh8XjDKUuibq9rIl5t4=
github.com/pion/interceptor v0.1.19/go.mod h1:VANhFxdJezB8mwToMMmrmyHyP9gym6xLqIUch31xryg=
github.com/pion/interceptor v0.1.23 h1:BZmayeasUYVDam891RtvE5rs6syqmSK3Wzy+xu+UNw0=
github.com/pion/interceptor v0.1.23/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.8 h1:HhicWIg7OX5PVilyBO6plhMetInbzkVJAhbdJiAeVaI=
@@ -199,8 +199,9 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc=
github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I=
github.com/pion/rtp v1.8.1 h1:26OxTc6lKg/qLSGir5agLyj0QKaOv8OP5wps2SFnVNQ=
github.com/pion/rtp v1.8.1/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.2 h1:oKMM0K1/QYQ5b5qH+ikqDSZRipP5mIxPJcgcvw5sH0w=
github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0=
github.com/pion/sctp v1.8.8/go.mod h1:igF9nZBrjh5AtmKc7U30jXltsFHicFCXSmWA2GWRaWs=
github.com/pion/sctp v1.8.9 h1:TP5ZVxV5J7rz7uZmbyvnUvsn7EJ2x/5q9uhsTtXbI3g=
@@ -223,8 +224,8 @@ github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/turn/v2 v2.1.4 h1:2xn8rduI5W6sCZQkEnIUDAkrBQNl2eYIBCHMZ3QMmP8=
github.com/pion/turn/v2 v2.1.4/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/webrtc/v3 v3.2.20 h1:BQJiXQsJq9LgLp3op7rLy1y8d2WD+LtiS9cpY0uQ22A=
github.com/pion/webrtc/v3 v3.2.20/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg=
github.com/pion/webrtc/v3 v3.2.21 h1:c8fy5JcqJkAQBwwy3Sk9huQLTBUSqaggyRlv9Lnh2zY=
github.com/pion/webrtc/v3 v3.2.21/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+1 -1
View File
@@ -228,7 +228,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
case *rtcp.SourceDescription:
// do nothing for now
case *rtcp.SenderReport:
buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime, pkt.PacketCount)
buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime)
}
}
})
+2 -2
View File
@@ -324,9 +324,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 {
return 0
}
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
}
return 0, errors.New("receiver not available")
}
+2 -2
View File
@@ -84,8 +84,8 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
if err != nil {
return nil, err
}
sink.Close()
source.Close()
defer sink.Close()
defer source.Close()
// ensure it's created correctly
err = s.confirmExecution(func() error {
+5 -9
View File
@@ -84,8 +84,7 @@ type Buffer struct {
closed atomic.Bool
mime string
snRangeMap *utils.RangeMap[uint64, uint64]
paddingOnlyDrops uint64
snRangeMap *utils.RangeMap[uint64, uint64]
latestTSForAudioLevelInitialized bool
latestTSForAudioLevel uint32
@@ -455,7 +454,6 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil {
b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber)
}
b.paddingOnlyDrops++
}
return
}
@@ -726,14 +724,12 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
return b.rtpStats.GetRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId)
}
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packetCount uint32) {
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
b.RLock()
srData := &RTCPSenderReportData{
RTPTimestamp: rtpTime,
NTPTimestamp: mediatransportutil.NtpTime(ntpTime),
PacketCount: packetCount,
PaddingOnlyDrops: b.paddingOnlyDrops,
At: time.Now(),
RTPTimestamp: rtpTime,
NTPTimestamp: mediatransportutil.NtpTime(ntpTime),
At: time.Now(),
}
if b.rtpStats != nil {
+11 -14
View File
@@ -32,7 +32,7 @@ const (
cFirstSnapshotID = 1
cFirstPacketTimeAdjustWindow = 2 * time.Minute
cFirstPacketTimeAdjustThreshold = 5 * time.Second
cFirstPacketTimeAdjustThreshold = 5 * time.Minute
)
// -------------------------------------------------------
@@ -109,10 +109,7 @@ type RTCPSenderReportData struct {
RTPTimestamp uint32
RTPTimestampExt uint64
NTPTimestamp mediatransportutil.NtpTime
PacketCount uint32
// RAJA-REMOVE PacketCountExt uint64
PaddingOnlyDrops uint64
At time.Time
At time.Time
}
type RTPStatsParams struct {
@@ -456,7 +453,7 @@ func (r *rtpStatsBase) GetRtt() uint32 {
return r.rtt
}
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64) {
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ts uint32, startTS uint32) {
if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow {
return
}
@@ -467,7 +464,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64)
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDiff := int64(ets - extStartTS)
samplesDiff := int32(ts - startTS)
if samplesDiff < 0 {
// out-of-order, skip
return
@@ -484,9 +481,9 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64)
"nowTime", now.String(),
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"extNowTS", ets,
"extStartTS", extStartTS,
"adjustment", r.firstTime.Sub(firstTime).String(),
"nowTS", ts,
"startTS", startTS,
)
if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold {
r.logger.Infow("first packet time adjustment too big, ignoring",
@@ -494,9 +491,9 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(ets uint64, extStartTS uint64)
"nowTime", now.String(),
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"extNowTS", ets,
"extStartTS", extStartTS,
"adjustment", r.firstTime.Sub(firstTime).String(),
"nowTS", ts,
"startTS", startTS,
)
} else {
r.firstTime = firstTime
@@ -537,7 +534,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
"packetsExpected", packetsExpected,
"startTime", startTime,
"endTime", endTime,
"duration", endTime.Sub(startTime),
"duration", endTime.Sub(startTime).String(),
)
return nil
}
+33 -4
View File
@@ -151,7 +151,19 @@ func (r *RTPStatsReceiver) Update(
}
}
if -gapSN >= cNumSequenceNumbers {
r.logger.Warnw("large sequence number gap negative", nil, "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN)
r.logger.Warnw(
"large sequence number gap negative", nil,
"prev", resSN.PreExtendedHighest,
"curr", resSN.ExtendedVal,
"gap", gapSN,
"packetTime", packetTime.String(),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
)
}
if gapSN != 0 {
@@ -205,7 +217,19 @@ func (r *RTPStatsReceiver) Update(
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
if gapSN >= cNumSequenceNumbers {
r.logger.Warnw("large sequence number gap", nil, "prev", resSN.PreExtendedHighest, "curr", resSN.ExtendedVal, "gap", gapSN)
r.logger.Warnw(
"large sequence number gap", nil,
"prev", resSN.PreExtendedHighest,
"curr", resSN.ExtendedVal,
"gap", gapSN,
"packetTime", packetTime.String(),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
)
}
// update gap histogram
@@ -284,7 +308,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + tsCycles
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt, r.timestamp.GetExtendedStart())
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp, r.timestamp.GetStart())
if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
@@ -359,6 +383,11 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin
fracLost = proxyFracLost
}
totalLost := r.packetsLost
if totalLost > 0xffffff { // 24-bits max
totalLost = 0xffffff
}
lastSR := uint32(0)
dlsr := uint32(0)
if r.srNewest != nil {
@@ -372,7 +401,7 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin
return &rtcp.ReceptionReport{
SSRC: ssrc,
FractionLost: fracLost,
TotalLost: uint32(r.packetsLost),
TotalLost: uint32(totalLost),
LastSequenceNumber: uint32(now.extStartSN),
Jitter: uint32(r.jitter),
LastSenderReport: lastSR,
+39 -13
View File
@@ -282,7 +282,19 @@ func (r *RTPStatsSender) Update(
return
}
if -gapSN >= cNumSequenceNumbers {
r.logger.Warnw("large sequence number gap negative", nil, "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN)
r.logger.Warnw(
"large sequence number gap negative", nil,
"prev", r.extHighestSN,
"curr", extSequenceNumber,
"gap", gapSN,
"packetTime", packetTime.String(),
"sequenceNumber", extSequenceNumber,
"timestamp", extTimestamp,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
)
}
if extSequenceNumber < r.extStartSN {
@@ -341,7 +353,19 @@ func (r *RTPStatsSender) Update(
}
} else { // in-order
if gapSN >= cNumSequenceNumbers {
r.logger.Warnw("large sequence number gap", nil, "prev", r.extHighestSN, "curr", extSequenceNumber, "gap", gapSN)
r.logger.Warnw(
"large sequence number gap", nil,
"prev", r.extHighestSN,
"curr", extSequenceNumber,
"gap", gapSN,
"packetTime", packetTime.String(),
"sequenceNumber", extSequenceNumber,
"timestamp", extTimestamp,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
)
}
// update gap histogram
@@ -416,9 +440,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR {
r.logger.Debugw(
fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR),
"lastRRTime", r.lastRRTime,
"lastRRTime", r.lastRRTime.String(),
"lastRR", r.lastRR,
"sinceLastRR", time.Since(r.lastRRTime),
"sinceLastRR", time.Since(r.lastRRTime).String(),
"receivedRR", rr,
)
return
@@ -438,6 +462,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
}
}
// This is 24-bit max in the protocol. So, technically doesn't need extended type. But, done for consistency.
packetsLostFromRR := r.packetsLostFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost)
if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost {
packetsLostFromRR += (1 << 32)
@@ -482,9 +507,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
if is.packetsNotFound != 0 {
r.logger.Warnw(
"potential sequence number de-sync", nil,
"lastRRTime", r.lastRRTime,
"lastRRTime", r.lastRRTime.String(),
"lastRR", r.lastRR,
"sinceLastRR", time.Since(r.lastRRTime),
"sinceLastRR", time.Since(r.lastRRTime).String(),
"receivedRR", rr,
"extStartSN", r.extStartSN,
"extHighestSN", r.extHighestSN,
@@ -512,11 +537,11 @@ func (r *RTPStatsSender) LastReceiverReportTime() time.Time {
return r.lastRRTime
}
func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(ets uint64) {
func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(ts uint32) {
r.lock.Lock()
defer r.lock.Unlock()
r.maybeAdjustFirstPacketTime(ets, r.extStartTS)
r.maybeAdjustFirstPacketTime(ts, uint32(r.extStartTS))
}
func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64, err error) {
@@ -581,14 +606,15 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate ui
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"extHighestTS", r.extHighestTS,
"currTSExt", nowRTPExt,
"currRTP", nowRTP,
"currNTP", nowNTP.Time().String(),
"timeNow", time.Now().String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst,
"timeSinceFirst", timeSinceFirst.String(),
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest,
"timeSinceHighest", timeSinceHighest.String(),
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
@@ -646,9 +672,9 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
"startSN", then.extStartSN,
"endSN", now.extStartSN,
"packetsExpected", packetsExpected,
"startTime", startTime,
"endTime", endTime,
"duration", endTime.Sub(startTime),
"startTime", startTime.String(),
"endTime", endTime.String(),
"duration", endTime.Sub(startTime).String(),
)
return nil
}
+1 -1
View File
@@ -1884,7 +1884,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error {
if layer == d.forwarder.GetReferenceLayerSpatial() && srData != nil {
d.rtpStats.MaybeAdjustFirstPacketTime(srData.RTPTimestampExt + d.forwarder.GetReferenceTimestampOffset())
d.rtpStats.MaybeAdjustFirstPacketTime(srData.RTPTimestamp + uint32(d.forwarder.GetReferenceTimestampOffset()))
}
return nil
}
+25 -5
View File
@@ -42,6 +42,7 @@ const (
TransitionCostSpatial = 10
ResumeBehindThresholdSeconds = float64(0.2) // 200ms
ResumeBehindHighTresholdSeconds = float64(2.0) // 2 seconds
LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms
SwitchAheadThresholdSeconds = float64(0.025) // 25ms
)
@@ -187,7 +188,7 @@ type Forwarder struct {
codec webrtc.RTPCodecCapability
kind webrtc.RTPCodecType
logger logger.Logger
getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error)
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error)
getExpectedRTPTimestamp func(at time.Time) (uint64, error)
muted bool
@@ -215,7 +216,7 @@ type Forwarder struct {
func NewForwarder(
kind webrtc.RTPCodecType,
logger logger.Logger,
getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error),
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error),
getExpectedRTPTimestamp func(at time.Time) (uint64, error),
) *Forwarder {
f := &Forwarder{
@@ -1499,11 +1500,11 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always.
rtpMungerState := f.rtpMunger.GetLast()
extLastTS := rtpMungerState.ExtLastTS
extRefTS := extLastTS
extExpectedTS := extLastTS
extRefTS := extExpectedTS
switchingAt := time.Now()
if f.getReferenceLayerRTPTimestamp != nil {
ets, err := f.getReferenceLayerRTPTimestamp(extPkt.ExtTimestamp, layer, f.referenceLayerSpatial)
ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial)
if err != nil {
// error out if extRefTS is not available. It can happen when there is no sender report
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
@@ -1513,7 +1514,15 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
return err
}
extRefTS = ets
extRefTS = (extRefTS & 0xFFFF_FFFF_0000_0000) + uint64(ts)
expectedTS32 := uint32(extExpectedTS)
if (ts-expectedTS32) < 1<<31 && ts < expectedTS32 {
extRefTS += (1 << 32)
}
if (expectedTS32-ts) < 1<<31 && expectedTS32 < ts && extRefTS >= 1<<32 {
extRefTS -= (1 << 32)
}
}
if f.getExpectedRTPTimestamp != nil {
@@ -1569,6 +1578,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold {
logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extExpectedTS
} else if diffSeconds > ResumeBehindHighTresholdSeconds {
// could be due to incorrect reference calculation
f.logger.Infow(
"resume, reference very far behind",
"layer", layer,
"extExpectedTS", extExpectedTS,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", diffSeconds,
)
extNextTS = extExpectedTS
} else {
extNextTS = extRefTS
}
+3 -3
View File
@@ -82,7 +82,7 @@ type TrackReceiver interface {
GetTemporalLayerFpsForSpatial(layer int32) (bool, []float32)
GetCalculatedClockRate(layer int32) uint32
GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error)
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
}
// WebRTCReceiver receives a media track
@@ -824,8 +824,8 @@ func (w *WebRTCReceiver) GetCalculatedClockRate(layer int32) uint32 {
return w.streamTrackerManager.GetCalculatedClockRate(layer)
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
}
// closes all track senders in parallel, returns when all are closed
+17 -1
View File
@@ -202,9 +202,25 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
}, ErrOutOfOrderSequenceNumberCacheMiss
}
extSequenceNumber := extPkt.ExtSequenceNumber - snOffset
if extSequenceNumber >= r.extLastSN {
// should not happen, just being paranoid
r.logger.Errorw(
"unexpected packet ordering", nil,
"extIncomingSN", extPkt.ExtSequenceNumber,
"extHighestIncominSN", r.extHighestIncomingSN,
"extLastSN", r.extLastSN,
"snOffsetIncoming", snOffset,
"snOffsetHighest", r.snOffset,
)
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
}, ErrOutOfOrderSequenceNumberCacheMiss
}
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
extSequenceNumber: extPkt.ExtSequenceNumber - snOffset,
extSequenceNumber: extSequenceNumber,
extTimestamp: extPkt.ExtTimestamp - r.tsOffset,
}, nil
}
@@ -192,6 +192,10 @@ func (s *StreamTrackerDependencyDescriptor) Observe(temporalLayer int32, pktSize
dtis := ddVal.Descriptor.FrameDependencies.DecodeTargetIndications
for _, dt := range ddVal.DecodeTargets {
if len(dtis) <= dt.Target {
s.params.Logger.Errorw("len(dtis) less than target", nil, "target", dt.Target, "dtls", dtis)
continue
}
// we are not dropping discardable frames now, so only ingore not present frames
if dtis[dt.Target] == dd.DecodeTargetNotPresent {
continue
+5 -5
View File
@@ -76,7 +76,7 @@ type StreamTrackerManager struct {
senderReportMu sync.RWMutex
senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport
layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint64
layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint32
closed core.Fuse
@@ -563,10 +563,10 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) {
rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9
// calculate other layer's time stamp at the same time as ref layer's NTP time
normalizedOtherTS := srOther.RTPTimestampExt + uint64(rtpDiff)
normalizedOtherTS := srOther.RTPTimestamp + uint32(rtpDiff)
// now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers
offset := srRef.RTPTimestampExt - normalizedOtherTS
offset := srRef.RTPTimestamp - normalizedOtherTS
// use minimal offset to indicate value availability in the extremely unlikely case of
// both layers using the same timestamp
@@ -643,7 +643,7 @@ func (s *StreamTrackerManager) GetCalculatedClockRate(layer int32) uint32 {
return uint32(float64(rdsf) / tsf.Seconds())
}
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
@@ -655,7 +655,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer i
return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer)
}
return ets + s.layerOffsets[referenceLayer][layer], nil
return ts + s.layerOffsets[referenceLayer][layer], nil
}
func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 {