Merge remote-tracking branch 'origin/master' into raja_1833

This commit is contained in:
boks1971
2023-07-18 14:14:05 +05:30
14 changed files with 246 additions and 222 deletions
+2 -2
View File
@@ -17,8 +17,8 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.4
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135
github.com/livekit/protocol v1.5.10-0.20230714010226-3c53edc91962
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a
github.com/livekit/protocol v1.5.10
github.com/livekit/psrpc v0.3.2
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
+4 -4
View File
@@ -122,10 +122,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 h1:lWYbsondvqG69czxoACDwaJ/BoyD57BahCo70ZH+m4U=
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34=
github.com/livekit/protocol v1.5.10-0.20230714010226-3c53edc91962 h1:y+rtYNMGmvpEgQlNG/wOUO16S497ygh83wQSUBKHpG4=
github.com/livekit/protocol v1.5.10-0.20230714010226-3c53edc91962/go.mod h1:eRzojAYSPJuNgDHMlvLji/CPauj9hrgvb6rVPUj6MoU=
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a h1:JWpPHcMFuw0fP4swE89CfMgeUXiSN5IKvCJL/5HLI3A=
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
github.com/livekit/protocol v1.5.10 h1:lnaHMa27cbRkHybi/jvOVuRSaLsho2wCLRjKiC6ce2Y=
github.com/livekit/protocol v1.5.10/go.mod h1:eRzojAYSPJuNgDHMlvLji/CPauj9hrgvb6rVPUj6MoU=
github.com/livekit/psrpc v0.3.2 h1:eAaJhASme33gtoBhCRLH9jsnWcdm1tHWf0WzaDk56ew=
github.com/livekit/psrpc v0.3.2/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
+32 -29
View File
@@ -370,13 +370,14 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
return false
}
p.GetLogger().Infow("updating participant permission", "permission", permission)
p.params.Logger.Infow("updating participant permission", "permission", permission)
video.UpdateFromPermission(permission)
p.dirty.Store(true)
canPublish := video.GetCanPublish()
canSubscribe := video.GetCanSubscribe()
onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
@@ -387,13 +388,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
// publish permission has been revoked then remove offending tracks
for _, track := range p.GetPublishedTracks() {
if !video.GetCanPublishSource(track.Source()) {
p.RemovePublishedTrack(track, false, false)
if p.ProtocolVersion().SupportsUnpublish() {
p.sendTrackUnpublished(track.ID())
} else {
// for older clients that don't support unpublish, mute to avoid them sending data
p.sendTrackMuted(track.ID(), true)
}
p.removePublishedTrack(track)
}
}
@@ -1190,22 +1185,24 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
}
func (p *ParticipantImpl) setIsPublisher(isPublisher bool) {
if p.isPublisher.Swap(isPublisher) != isPublisher {
p.lock.Lock()
p.requireBroadcast = true
p.lock.Unlock()
if p.isPublisher.Swap(isPublisher) == isPublisher {
return
}
p.dirty.Store(true)
p.lock.Lock()
p.requireBroadcast = true
p.lock.Unlock()
// trigger update as well if participant is already fully connected
if p.State() == livekit.ParticipantInfo_ACTIVE {
p.lock.RLock()
onParticipantUpdate := p.onParticipantUpdate
p.lock.RUnlock()
p.dirty.Store(true)
if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
// trigger update as well if participant is already fully connected
if p.State() == livekit.ParticipantInfo_ACTIVE {
p.lock.RLock()
onParticipantUpdate := p.onParticipantUpdate
p.lock.RUnlock()
if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
}
}
@@ -1220,6 +1217,16 @@ func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) err
})
}
func (p *ParticipantImpl) removePublishedTrack(track types.MediaTrack) {
p.RemovePublishedTrack(track, false, false)
if p.ProtocolVersion().SupportsUnpublish() {
p.sendTrackUnpublished(track.ID())
} else {
// for older clients that don't support unpublish, mute to avoid them sending data
p.sendTrackMuted(track.ID(), true)
}
}
// when a new remoteTrack is created, creates a Track and adds it to room
func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
if p.IsDisconnected() {
@@ -1242,12 +1249,12 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
p.params.Logger.Warnw("no permission to publish mediaTrack", nil,
"source", publishedTrack.Source(),
)
p.removePublishedTrack(publishedTrack)
return
}
if !p.IsPublisher() {
p.setIsPublisher(true)
}
p.setIsPublisher(true)
p.dirty.Store(true)
p.params.Logger.Infow("mediaTrack published",
"kind", track.Kind().String(),
@@ -1258,8 +1265,6 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
"mime", track.Codec().MimeType,
)
p.dirty.Store(true)
if !isNewTrack && !publishedTrack.HasPendingCodec() && p.IsReady() {
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
@@ -1300,9 +1305,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
p.params.Logger.Warnw("received unsupported data packet", nil, "payload", payload)
}
if !p.IsPublisher() {
p.setIsPublisher(true)
}
p.setIsPublisher(true)
}
func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error {
+1 -2
View File
@@ -828,7 +828,6 @@ func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceS
func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) {
// publish participant update, since track state is changed
r.broadcastParticipantState(participant, broadcastOptions{skipSource: true})
r.protoProxy.MarkDirty(false)
r.lock.RLock()
// subscribe all existing participants to this MediaTrack
@@ -888,7 +887,6 @@ func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) {
func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTrack) {
r.trackManager.RemoveTrack(track)
r.protoProxy.MarkDirty(false)
if !p.IsClosed() {
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
}
@@ -898,6 +896,7 @@ func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTra
}
func (r *Room) onParticipantUpdate(p types.LocalParticipant) {
r.protoProxy.MarkDirty(false)
// immediately notify when permissions or metadata changed
r.broadcastParticipantState(p, broadcastOptions{immediate: true})
if r.onParticipantChanged != nil {
+19 -12
View File
@@ -112,12 +112,6 @@ type signalService struct {
}
func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]) (err error) {
// copy the context to prevent a race between the session handler closing
// and the delivery of any parting messages from the client. take care to
// copy the incoming rpc headers to avoid dropping any session vars.
ctx, cancel := context.WithCancel(metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context())))
defer cancel()
req, ok := <-stream.Channel()
if !ok {
return nil
@@ -139,9 +133,6 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
"connID", ss.ConnectionId,
)
reqChan := routing.NewDefaultMessageChannel(livekit.ConnectionID(ss.ConnectionId))
defer reqChan.Close()
sink := routing.NewSignalMessageSink(routing.SignalSinkParams[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest]{
Logger: l,
Stream: stream,
@@ -149,6 +140,24 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
Writer: signalResponseMessageWriter{},
ConnectionID: livekit.ConnectionID(ss.ConnectionId),
})
reqChan := routing.NewDefaultMessageChannel(livekit.ConnectionID(ss.ConnectionId))
go func() {
err := routing.CopySignalStreamToMessageChannel[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest](
stream,
reqChan,
signalRequestMessageReader{},
r.config,
)
l.Infow("signal stream closed", "error", err)
reqChan.Close()
}()
// copy the context to prevent a race between the session handler closing
// and the delivery of any parting messages from the client. take care to
// copy the incoming rpc headers to avoid dropping any session vars.
ctx := metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context()))
err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink)
if err != nil {
@@ -156,9 +165,7 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
return
}
err = routing.CopySignalStreamToMessageChannel[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest](stream, reqChan, signalRequestMessageReader{}, r.config)
l.Infow("signal stream closed", "error", err)
stream.Hijack()
return
}
+5 -1
View File
@@ -888,8 +888,9 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
tsCycles++
}
nowRTPExt := getExtTS(nowRTP, tsCycles)
var nowRTPExtUsingRate uint64
if calculatedClockRate != 0 {
nowRTPExtUsingRate := r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds())
nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds())
if nowRTPExtUsingRate > nowRTPExt {
nowRTPExt = nowRTPExtUsingRate
nowRTP = uint32(nowRTPExtUsingRate)
@@ -939,6 +940,9 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
"reportDrift", reportDriftResult.String(),
"highestTS", r.highestTS,
"highestTime", r.highestTime.String(),
"calculatedClockRate", calculatedClockRate,
"nowRTPExt", nowRTPExt,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
}
+124 -68
View File
@@ -42,6 +42,8 @@ type TrackSender interface {
HandleRTCPSenderReportData(payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error
}
// -------------------------------------------------------------------
const (
RTPPaddingMaxPayloadSize = 255
RTPPaddingEstimatedHeaderSize = 20
@@ -60,6 +62,8 @@ const (
maxPaddingOnMuteDuration = 5 * time.Second
)
// -------------------------------------------------------------------
var (
ErrUnknownKind = errors.New("unknown kind of codec")
ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache")
@@ -197,14 +201,13 @@ type DownTrack struct {
transceiver *webrtc.RTPTransceiver
writeStream webrtc.TrackLocalWriter
rtcpReader *buffer.RTCPReader
onCloseHandler func(willBeResumed bool)
onBinding func(error)
listenerLock sync.RWMutex
receiverReportListeners []ReceiverReportListener
bindLock sync.Mutex
bound atomic.Bool
bindLock sync.Mutex
bound atomic.Bool
onBinding func(error)
isClosed atomic.Bool
connected atomic.Bool
@@ -235,14 +238,13 @@ type DownTrack struct {
pacer pacer.Pacer
// update stats
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
maxLayerNotifierCh chan struct{}
// when max subscribed layer changes
cbMu sync.RWMutex
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
onMaxSubscribedLayerChanged func(dt *DownTrack, layer int32)
// update rtt
onRttUpdate func(dt *DownTrack, rtt uint32)
onRttUpdate func(dt *DownTrack, rtt uint32)
onCloseHandler func(willBeResumed bool)
}
// NewDownTrack returns a DownTrack.
@@ -266,17 +268,18 @@ func NewDownTrack(
}
d := &DownTrack{
logger: logger,
id: r.TrackID(),
subscriberID: subID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
receiver: r,
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
pacer: pacer,
logger: logger,
id: r.TrackID(),
subscriberID: subID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
receiver: r,
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
pacer: pacer,
maxLayerNotifierCh: make(chan struct{}, 20),
}
d.forwarder = NewForwarder(
d.kind,
@@ -307,11 +310,15 @@ func NewDownTrack(
Logger: d.logger.WithValues("direction", "down"),
})
d.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) {
if d.onStatsUpdate != nil {
d.onStatsUpdate(d, stat)
if onStatsUpdate := d.getOnStatsUpdate(); onStatsUpdate != nil {
onStatsUpdate(d, stat)
}
})
if d.kind == webrtc.RTPCodecTypeVideo {
go d.maxLayerNotifierWorker()
}
return d, nil
}
@@ -541,6 +548,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
if d.IsClosed() || layer == buffer.InvalidLayerSpatial {
return
}
interval := 2 * d.rtpStats.GetRtt()
if interval < keyFrameIntervalMin {
interval = keyFrameIntervalMin
@@ -550,7 +558,13 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
for {
locked, _ := d.forwarder.CheckSync()
if locked {
return
}
if d.connected.Load() {
d.logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer)
d.receiver.SendPLI(layer, false)
@@ -565,6 +579,34 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
}
}
func (d *DownTrack) postMaxLayerNotifierEvent() {
if d.IsClosed() {
return
}
select {
case d.maxLayerNotifierCh <- struct{}{}:
default:
d.logger.Warnw("max layer notifier event queue full", nil)
}
}
func (d *DownTrack) maxLayerNotifierWorker() {
more := true
for more {
_, more = <-d.maxLayerNotifierCh
maxLayerSpatial := buffer.InvalidLayerSpatial
if more {
maxLayerSpatial = d.forwarder.GetMaxSubscribedSpatial()
}
if onMaxSubscribedLayerChanged := d.getOnMaxLayerChanged(); onMaxSubscribedLayerChanged != nil {
d.logger.Infow("max subscribed layer changed", "maxLayerSpatial", maxLayerSpatial)
onMaxSubscribedLayerChanged(d, maxLayerSpatial)
}
}
}
// WriteRTP writes an RTP Packet to the DownTrack
func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
if !d.bound.Load() || !d.connected.Load() {
@@ -725,17 +767,17 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
// Mute enables or disables media forwarding - subscriber triggered
func (d *DownTrack) Mute(muted bool) {
changed, maxLayer := d.forwarder.Mute(muted)
d.handleMute(muted, false, changed, maxLayer)
changed := d.forwarder.Mute(muted)
d.handleMute(muted, changed)
}
// PubMute enables or disables media forwarding - publisher side
func (d *DownTrack) PubMute(pubMuted bool) {
changed, maxLayer := d.forwarder.PubMute(pubMuted)
d.handleMute(pubMuted, true, changed, maxLayer)
changed := d.forwarder.PubMute(pubMuted)
d.handleMute(pubMuted, changed)
}
func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayer buffer.VideoLayer) {
func (d *DownTrack) handleMute(muted bool, changed bool) {
if !changed {
return
}
@@ -762,18 +804,7 @@ func (d *DownTrack) handleMute(muted bool, isPub bool, changed bool, maxLayer bu
// Note that while publisher mute is active, subscriber changes can also happen
// and that could turn on/off layers on publisher side.
//
if !isPub && d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
notifyLayer := buffer.InvalidLayerSpatial
if !muted {
//
// When unmuting, don't wait for layer lock as
// client might need to be notified to start layers
// before locking can happen in the forwarder.
//
notifyLayer = maxLayer.Spatial
}
d.onMaxSubscribedLayerChanged(d, notifyLayer)
}
d.postMaxLayerNotifierEvent()
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnSubscriptionChanged(d)
@@ -856,12 +887,10 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.rtpStats.Stop()
d.logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString())
if d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
d.onMaxSubscribedLayerChanged(d, buffer.InvalidLayerSpatial)
}
close(d.maxLayerNotifierCh)
if d.onCloseHandler != nil {
d.onCloseHandler(!flush)
if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil {
onCloseHandler(!flush)
}
d.stopKeyFrameRequester()
@@ -869,21 +898,12 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
}
func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) {
changed, maxLayer, currentLayer := d.forwarder.SetMaxSpatialLayer(spatialLayer)
changed, maxLayer := d.forwarder.SetMaxSpatialLayer(spatialLayer)
if !changed {
return
}
if d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo && maxLayer.SpatialGreaterThanOrEqual(currentLayer) {
//
// Notify when new max is
// 1. Equal to current -> already locked to the new max
// 2. Greater than current -> two scenarios
// a. is higher than previous max -> client may need to start higher layer before forwarder can lock
// b. is lower than previous max -> client can stop higher layer(s)
//
d.onMaxSubscribedLayerChanged(d, maxLayer.Spatial)
}
d.postMaxLayerNotifierEvent()
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnSubscribedLayerChanged(d, maxLayer)
@@ -891,7 +911,7 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) {
}
func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) {
changed, maxLayer, _ := d.forwarder.SetMaxTemporalLayer(temporalLayer)
changed, maxLayer := d.forwarder.SetMaxTemporalLayer(temporalLayer)
if !changed {
return
}
@@ -973,10 +993,23 @@ func (d *DownTrack) UpTrackBitrateReport(availableLayers []int32, bitrates Bitra
// OnCloseHandler method to be called on remote tracked removed
func (d *DownTrack) OnCloseHandler(fn func(willBeResumed bool)) {
d.cbMu.Lock()
defer d.cbMu.Unlock()
d.onCloseHandler = fn
}
func (d *DownTrack) getOnCloseHandler() func(willBeResumed bool) {
d.cbMu.RLock()
defer d.cbMu.RUnlock()
return d.onCloseHandler
}
func (d *DownTrack) OnBinding(fn func(error)) {
d.bindLock.Lock()
defer d.bindLock.Unlock()
d.onBinding = fn
}
@@ -988,17 +1021,47 @@ func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener) {
}
func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) {
d.cbMu.Lock()
defer d.cbMu.Unlock()
d.onStatsUpdate = fn
}
func (d *DownTrack) getOnStatsUpdate() func(dt *DownTrack, stat *livekit.AnalyticsStat) {
d.cbMu.RLock()
defer d.cbMu.RUnlock()
return d.onStatsUpdate
}
func (d *DownTrack) OnRttUpdate(fn func(dt *DownTrack, rtt uint32)) {
d.cbMu.Lock()
defer d.cbMu.Unlock()
d.onRttUpdate = fn
}
func (d *DownTrack) getOnRttUpdate() func(dt *DownTrack, rtt uint32) {
d.cbMu.RLock()
defer d.cbMu.RUnlock()
return d.onRttUpdate
}
func (d *DownTrack) OnMaxLayerChanged(fn func(dt *DownTrack, layer int32)) {
d.cbMu.Lock()
defer d.cbMu.Unlock()
d.onMaxSubscribedLayerChanged = fn
}
func (d *DownTrack) getOnMaxLayerChanged() func(dt *DownTrack, layer int32) {
d.cbMu.RLock()
defer d.cbMu.RUnlock()
return d.onMaxSubscribedLayerChanged
}
func (d *DownTrack) IsDeficient() bool {
return d.forwarder.IsDeficient()
}
@@ -1355,8 +1418,8 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
d.sequencer.setRTT(rttToReport)
}
if d.onRttUpdate != nil {
d.onRttUpdate(d, rttToReport)
if onRttUpdate := d.getOnRttUpdate(); onRttUpdate != nil {
onRttUpdate(d, rttToReport)
}
}
}
@@ -1744,15 +1807,8 @@ func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int,
}
if spmd.tp != nil {
if spmd.tp.isSwitchingToMaxSpatial && d.onMaxSubscribedLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
d.onMaxSubscribedLayerChanged(d, spmd.tp.maxSpatialLayer)
}
if spmd.tp.isSwitchingToRequestSpatial {
locked, _ := d.forwarder.CheckSync()
if locked {
d.stopKeyFrameRequester()
}
if spmd.tp.isSwitching {
d.postMaxLayerNotifierEvent()
}
if spmd.tp.isResuming {
+42 -27
View File
@@ -129,15 +129,13 @@ func (v VideoTransition) String() string {
// -------------------------------------------------------------------
type TranslationParams struct {
shouldDrop bool
isResuming bool
isSwitchingToRequestSpatial bool
isSwitchingToMaxSpatial bool
maxSpatialLayer int32
rtp *TranslationParamsRTP
codecBytes []byte
ddBytes []byte
marker bool
shouldDrop bool
isResuming bool
isSwitching bool
rtp *TranslationParamsRTP
codecBytes []byte
ddBytes []byte
marker bool
}
// -------------------------------------------------------------------
@@ -360,12 +358,12 @@ func (f *Forwarder) SeedState(state ForwarderState) {
f.refTSOffset = state.RefTSOffset
}
func (f *Forwarder) Mute(muted bool) (bool, buffer.VideoLayer) {
func (f *Forwarder) Mute(muted bool) bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.muted == muted {
return false, f.vls.GetMax()
return false
}
// Do not mute when paused due to bandwidth limitation.
@@ -384,7 +382,7 @@ func (f *Forwarder) Mute(muted bool) (bool, buffer.VideoLayer) {
// the case of intentional mute.
if muted && f.isDeficientLocked() && f.lastAllocation.PauseReason == VideoPauseReasonBandwidth {
f.logger.Infow("ignoring forwarder mute, paused due to congestion")
return false, f.vls.GetMax()
return false
}
f.logger.Debugw("setting forwarder mute", "muted", muted)
@@ -395,7 +393,7 @@ func (f *Forwarder) Mute(muted bool) (bool, buffer.VideoLayer) {
f.resyncLocked()
}
return true, f.vls.GetMax()
return true
}
func (f *Forwarder) IsMuted() bool {
@@ -405,12 +403,12 @@ func (f *Forwarder) IsMuted() bool {
return f.muted
}
func (f *Forwarder) PubMute(pubMuted bool) (bool, buffer.VideoLayer) {
func (f *Forwarder) PubMute(pubMuted bool) bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.pubMuted == pubMuted {
return false, f.vls.GetMax()
return false
}
f.logger.Debugw("setting forwarder pub mute", "pubMuted", pubMuted)
@@ -432,7 +430,7 @@ func (f *Forwarder) PubMute(pubMuted bool) (bool, buffer.VideoLayer) {
}
}
return true, f.vls.GetMax()
return true
}
func (f *Forwarder) IsPubMuted() bool {
@@ -449,17 +447,17 @@ func (f *Forwarder) IsAnyMuted() bool {
return f.muted || f.pubMuted
}
func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLayer, buffer.VideoLayer) {
func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLayer) {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio {
return false, buffer.InvalidLayer, buffer.InvalidLayer
return false, buffer.InvalidLayer
}
existingMax := f.vls.GetMax()
if spatialLayer == existingMax.Spatial {
return false, existingMax, f.vls.GetCurrent()
return false, existingMax
}
f.logger.Debugw("setting max spatial layer", "layer", spatialLayer)
@@ -467,20 +465,20 @@ func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLa
f.clearParkedLayer()
return true, f.vls.GetMax(), f.vls.GetCurrent()
return true, f.vls.GetMax()
}
func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.VideoLayer, buffer.VideoLayer) {
func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.VideoLayer) {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio {
return false, buffer.InvalidLayer, buffer.InvalidLayer
return false, buffer.InvalidLayer
}
existingMax := f.vls.GetMax()
if temporalLayer == existingMax.Temporal {
return false, existingMax, f.vls.GetCurrent()
return false, existingMax
}
f.logger.Debugw("setting max temporal layer", "layer", temporalLayer)
@@ -488,7 +486,7 @@ func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.Video
f.clearParkedLayer()
return true, f.vls.GetMax(), f.vls.GetCurrent()
return true, f.vls.GetMax()
}
func (f *Forwarder) MaxLayer() buffer.VideoLayer {
@@ -512,6 +510,25 @@ func (f *Forwarder) TargetLayer() buffer.VideoLayer {
return f.vls.GetTarget()
}
func (f *Forwarder) GetMaxSubscribedSpatial() int32 {
f.lock.RLock()
defer f.lock.RUnlock()
layer := buffer.InvalidLayerSpatial // covers muted case
if !f.muted {
layer = f.vls.GetMax().Spatial
// If current is higher, mark the current layer as max subscribed layer
// to prevent the current layer from stopping before forwarder switches
// to the new and lower max layer,
if layer < f.vls.GetCurrent().Spatial {
layer = f.vls.GetCurrent().Spatial
}
}
return layer
}
func (f *Forwarder) isDeficientLocked() bool {
return f.lastAllocation.IsDeficient
}
@@ -1712,9 +1729,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
return tp, nil
}
tp.isResuming = result.IsResuming
tp.isSwitchingToRequestSpatial = result.IsSwitchingToRequestSpatial
tp.isSwitchingToMaxSpatial = result.IsSwitchingToMaxSpatial
tp.maxSpatialLayer = result.MaxSpatialLayer
tp.isSwitching = result.IsSwitching
tp.ddBytes = result.DependencyDescriptorExtension
tp.marker = result.RTPMarker
+13 -21
View File
@@ -26,13 +26,13 @@ func newForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Fo
func TestForwarderMute(t *testing.T) {
f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio)
require.False(t, f.IsMuted())
muted, _ := f.Mute(false)
muted := f.Mute(false)
require.False(t, muted) // no change in mute state
require.False(t, f.IsMuted())
muted, _ = f.Mute(true)
muted = f.Mute(true)
require.True(t, muted)
require.True(t, f.IsMuted())
muted, _ = f.Mute(false)
muted = f.Mute(false)
require.True(t, muted)
require.False(t, f.IsMuted())
}
@@ -45,15 +45,13 @@ func TestForwarderLayersAudio(t *testing.T) {
require.Equal(t, buffer.InvalidLayer, f.CurrentLayer())
require.Equal(t, buffer.InvalidLayer, f.TargetLayer())
changed, maxLayer, currentLayer := f.SetMaxSpatialLayer(1)
changed, maxLayer := f.SetMaxSpatialLayer(1)
require.False(t, changed)
require.Equal(t, buffer.InvalidLayer, maxLayer)
require.Equal(t, buffer.InvalidLayer, currentLayer)
changed, maxLayer, currentLayer = f.SetMaxTemporalLayer(1)
changed, maxLayer = f.SetMaxTemporalLayer(1)
require.False(t, changed)
require.Equal(t, buffer.InvalidLayer, maxLayer)
require.Equal(t, buffer.InvalidLayer, currentLayer)
require.Equal(t, buffer.InvalidLayer, f.MaxLayer())
}
@@ -72,12 +70,11 @@ func TestForwarderLayersVideo(t *testing.T) {
Spatial: buffer.DefaultMaxLayerSpatial,
Temporal: buffer.DefaultMaxLayerTemporal,
}
changed, maxLayer, currentLayer := f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial)
changed, maxLayer := f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial)
require.True(t, changed)
require.Equal(t, expectedLayers, maxLayer)
require.Equal(t, buffer.InvalidLayer, currentLayer)
changed, maxLayer, currentLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1)
changed, maxLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1)
require.True(t, changed)
expectedLayers = buffer.VideoLayer{
Spatial: buffer.DefaultMaxLayerSpatial - 1,
@@ -85,21 +82,18 @@ func TestForwarderLayersVideo(t *testing.T) {
}
require.Equal(t, expectedLayers, maxLayer)
require.Equal(t, expectedLayers, f.MaxLayer())
require.Equal(t, buffer.InvalidLayer, currentLayer)
f.vls.SetCurrent(buffer.VideoLayer{Spatial: 0, Temporal: 1})
changed, maxLayer, currentLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1)
changed, maxLayer = f.SetMaxSpatialLayer(buffer.DefaultMaxLayerSpatial - 1)
require.False(t, changed)
require.Equal(t, expectedLayers, maxLayer)
require.Equal(t, expectedLayers, f.MaxLayer())
require.Equal(t, buffer.VideoLayer{Spatial: 0, Temporal: 1}, currentLayer)
changed, maxLayer, currentLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal)
changed, maxLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal)
require.False(t, changed)
require.Equal(t, expectedLayers, maxLayer)
require.Equal(t, buffer.VideoLayer{Spatial: 0, Temporal: 1}, currentLayer)
changed, maxLayer, currentLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal - 1)
changed, maxLayer = f.SetMaxTemporalLayer(buffer.DefaultMaxLayerTemporal - 1)
require.True(t, changed)
expectedLayers = buffer.VideoLayer{
Spatial: buffer.DefaultMaxLayerSpatial - 1,
@@ -107,7 +101,6 @@ func TestForwarderLayersVideo(t *testing.T) {
}
require.Equal(t, expectedLayers, maxLayer)
require.Equal(t, expectedLayers, f.MaxLayer())
require.Equal(t, buffer.VideoLayer{Spatial: 0, Temporal: 1}, currentLayer)
}
func TestForwarderAllocateOptimal(t *testing.T) {
@@ -1404,8 +1397,8 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
marshalledVP8, err := expectedVP8.Marshal()
require.NoError(t, err)
expectedTP = TranslationParams{
isSwitchingToMaxSpatial: true,
isResuming: true,
isSwitching: true,
isResuming: true,
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23333,
@@ -1716,8 +1709,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
marshalledVP8, err = expectedVP8.Marshal()
require.NoError(t, err)
expectedTP = TranslationParams{
isSwitchingToMaxSpatial: true,
maxSpatialLayer: 1,
isSwitching: true,
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23339,
@@ -548,6 +548,10 @@ func (s *StreamAllocator) postEvent(event Event) {
func (s *StreamAllocator) processEvents() {
for event := range s.eventCh {
if s.isStopped.Load() {
break
}
s.handleEvent(&event)
}
@@ -204,26 +204,6 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
d.previousActiveDecodeTargetsBitmask = d.activeDecodeTargetsBitmask
d.activeDecodeTargetsBitmask = buffer.GetActiveDecodeTargetBitmask(d.currentLayer, ddwdt.DecodeTargets)
if d.currentLayer.Spatial == d.requestSpatial {
result.IsSwitchingToRequestSpatial = true
}
if d.currentLayer.Spatial == d.maxLayer.Spatial {
result.IsSwitchingToMaxSpatial = true
result.MaxSpatialLayer = d.currentLayer.Spatial
d.logger.Infow(
"reached max layer",
"previous", d.previousLayer,
"current", d.currentLayer,
"previousTarget", d.previousTargetLayer,
"target", d.targetLayer,
"max", d.maxLayer,
"layer", fd.SpatialId,
"req", d.requestSpatial,
"maxSeen", d.maxSeenLayer,
"feed", extPkt.Packet.SSRC,
)
}
}
ddExtension := &dede.DependencyDescriptorExtension{
-13
View File
@@ -35,19 +35,6 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL
result.IsResuming = true
}
if s.currentLayer.Spatial == s.requestSpatial {
result.IsSwitchingToRequestSpatial = true
}
if s.currentLayer.Spatial >= s.maxLayer.Spatial {
result.IsSwitchingToMaxSpatial = true
result.MaxSpatialLayer = s.currentLayer.Spatial
if reason != "" {
reason += ", "
}
reason += "reached max layer"
}
if reason != "" {
s.logger.Infow(
reason,
@@ -10,9 +10,6 @@ type VideoLayerSelectorResult struct {
IsRelevant bool
IsSwitching bool
IsResuming bool
IsSwitchingToRequestSpatial bool
IsSwitchingToMaxSpatial bool
MaxSpatialLayer int32
RTPMarker bool
DependencyDescriptorExtension []byte
}
-20
View File
@@ -80,26 +80,6 @@ func (v *VP9) Select(extPkt *buffer.ExtPacket, _layer int32) (result VideoLayerS
result.IsResuming = true
}
if v.currentLayer.Spatial != v.requestSpatial && updatedLayer.Spatial == v.requestSpatial {
result.IsSwitchingToRequestSpatial = true
}
if v.currentLayer.Spatial != v.maxLayer.Spatial && updatedLayer.Spatial == v.maxLayer.Spatial {
result.IsSwitchingToMaxSpatial = true
result.MaxSpatialLayer = updatedLayer.Spatial
v.logger.Infow(
"reached max layer",
"current", v.currentLayer,
"updated", updatedLayer,
"target", v.targetLayer,
"max", v.maxLayer,
"layer", extPkt.VideoLayer.Spatial,
"req", v.requestSpatial,
"maxSeen", v.maxSeenLayer,
"feed", extPkt.Packet.SSRC,
)
}
v.previousLayer = v.currentLayer
v.currentLayer = updatedLayer
}