diff --git a/go.mod b/go.mod index 0cefc83e6..e17bd663d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 - github.com/hashicorp/golang-lru/v2 v2.0.4 + github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a @@ -45,7 +45,7 @@ require ( github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20230807204917-050eac23e9de + golang.org/x/exp v0.0.0-20230810033253-352e893a4cad golang.org/x/sync v0.3.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 3988853e9..9adab1ff4 100644 --- a/go.sum +++ b/go.sum @@ -87,8 +87,8 @@ github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZn github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/hashicorp/golang-lru/v2 v2.0.4 h1:7GHuZcgid37q8o5i3QI9KMT4nCWQQ3Kx3Ov6bb9MfK0= -github.com/hashicorp/golang-lru/v2 v2.0.4/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= +github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= @@ -293,8 +293,8 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= -golang.org/x/exp v0.0.0-20230807204917-050eac23e9de h1:l5Za6utMv/HsBWWqzt4S8X17j+kt1uVETUX5UFhn2rE= -golang.org/x/exp v0.0.0-20230807204917-050eac23e9de/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= +golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index abfd9a4e3..74cb87de8 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -188,8 +188,9 @@ type Forwarder struct { getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) getExpectedRTPTimestamp func(at time.Time) (uint64, error) - muted bool - pubMuted bool + muted bool + pubMuted bool + resumeBehindThreshold float64 started bool preStartTime time.Time @@ -1360,6 +1361,9 @@ func (f *Forwarder) Resync() { func (f *Forwarder) resyncLocked() { f.vls.SetCurrent(buffer.InvalidLayer) f.lastSSRC = 0 + if f.pubMuted { + f.resumeBehindThreshold = ResumeBehindThresholdSeconds + } } func (f *Forwarder) CheckSync() (locked bool, layer int32) { @@ -1435,6 +1439,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e return nil } + logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) { + f.logger.Infow( + message, + "layer", layer, + "expectedTS", expectedTS, + "refTS", refTS, + "lastTS", lastTS, + "diffSeconds", math.Abs(diffSeconds), + ) + } + if f.referenceLayerSpatial == buffer.InvalidLayerSpatial { // on a resume, reference layer may not be set, so only set when it is invalid f.referenceLayerSpatial = layer @@ -1517,18 +1532,19 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // for refTS > expectedTS diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) if diffSeconds >= 0.0 { - if diffSeconds > ResumeBehindThresholdSeconds { - f.logger.Infow("resume, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", diffSeconds) + if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold { + logTransition("resume, reference too far behind", expectedTS, refTS, lastTS, diffSeconds) nextTS = expectedTS } else { nextTS = refTS } } else { if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { - f.logger.Infow("resume, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds)) + logTransition("resume, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds) } nextTS = refTS } + f.resumeBehindThreshold = 0.0 } else { // switching between layers, check if refTS is too far behind the last sent diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate) @@ -1537,16 +1553,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // this could be due to pacer trickling out this layer. Error out and wait for a more opportune time. // AVSYNC-TODO: Consider some forcing function to do the switch // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). - f.logger.Infow("layer switch, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) + logTransition("layer switch, reference too far behind", expectedTS, refTS, lastTS, diffSeconds) return errors.New("switch point too far behind") } // use a nominal increase to ensure that timestamp is always moving forward - f.logger.Infow("layer switch, reference is slghtly behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) + logTransition("layer switch, reference is slightly behind", expectedTS, refTS, lastTS, diffSeconds) nextTS = lastTS + 1 } else { diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { - f.logger.Infow("layer switch, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds)) + logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds) nextTS = expectedTS } else { nextTS = refTS @@ -1585,7 +1601,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i if f.lastSSRC != extPkt.Packet.SSRC { if err := f.processSourceSwitch(extPkt, layer); err != nil { tp.shouldDrop = true - return tp, err + return tp, nil } f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC) f.lastSSRC = extPkt.Packet.SSRC