mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 07:35:17 +00:00
Merge remote-tracking branch 'origin/master' into raja_fr
This commit is contained in:
@@ -14,7 +14,7 @@ require (
|
||||
github.com/google/wire v0.5.0
|
||||
github.com/gorilla/websocket v1.5.0
|
||||
github.com/hashicorp/go-version v1.6.0
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.4
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
|
||||
github.com/livekit/mediatransportutil v0.0.0-20230716190407-fc4944cbc33a
|
||||
@@ -45,7 +45,7 @@ require (
|
||||
github.com/urfave/cli/v2 v2.25.7
|
||||
github.com/urfave/negroni/v3 v3.0.0
|
||||
go.uber.org/atomic v1.11.0
|
||||
golang.org/x/exp v0.0.0-20230807204917-050eac23e9de
|
||||
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad
|
||||
golang.org/x/sync v0.3.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
|
||||
@@ -87,8 +87,8 @@ github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZn
|
||||
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
|
||||
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
|
||||
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.4 h1:7GHuZcgid37q8o5i3QI9KMT4nCWQQ3Kx3Ov6bb9MfK0=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.4/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
|
||||
github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
|
||||
@@ -293,8 +293,8 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE
|
||||
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
|
||||
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
|
||||
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
|
||||
golang.org/x/exp v0.0.0-20230807204917-050eac23e9de h1:l5Za6utMv/HsBWWqzt4S8X17j+kt1uVETUX5UFhn2rE=
|
||||
golang.org/x/exp v0.0.0-20230807204917-050eac23e9de/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
|
||||
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU=
|
||||
golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
|
||||
+25
-9
@@ -188,8 +188,9 @@ type Forwarder struct {
|
||||
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error)
|
||||
getExpectedRTPTimestamp func(at time.Time) (uint64, error)
|
||||
|
||||
muted bool
|
||||
pubMuted bool
|
||||
muted bool
|
||||
pubMuted bool
|
||||
resumeBehindThreshold float64
|
||||
|
||||
started bool
|
||||
preStartTime time.Time
|
||||
@@ -1360,6 +1361,9 @@ func (f *Forwarder) Resync() {
|
||||
func (f *Forwarder) resyncLocked() {
|
||||
f.vls.SetCurrent(buffer.InvalidLayer)
|
||||
f.lastSSRC = 0
|
||||
if f.pubMuted {
|
||||
f.resumeBehindThreshold = ResumeBehindThresholdSeconds
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Forwarder) CheckSync() (locked bool, layer int32) {
|
||||
@@ -1435,6 +1439,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
return nil
|
||||
}
|
||||
|
||||
logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) {
|
||||
f.logger.Infow(
|
||||
message,
|
||||
"layer", layer,
|
||||
"expectedTS", expectedTS,
|
||||
"refTS", refTS,
|
||||
"lastTS", lastTS,
|
||||
"diffSeconds", math.Abs(diffSeconds),
|
||||
)
|
||||
}
|
||||
|
||||
if f.referenceLayerSpatial == buffer.InvalidLayerSpatial {
|
||||
// on a resume, reference layer may not be set, so only set when it is invalid
|
||||
f.referenceLayerSpatial = layer
|
||||
@@ -1517,18 +1532,19 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
// for refTS > expectedTS
|
||||
diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
|
||||
if diffSeconds >= 0.0 {
|
||||
if diffSeconds > ResumeBehindThresholdSeconds {
|
||||
f.logger.Infow("resume, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", diffSeconds)
|
||||
if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold {
|
||||
logTransition("resume, reference too far behind", expectedTS, refTS, lastTS, diffSeconds)
|
||||
nextTS = expectedTS
|
||||
} else {
|
||||
nextTS = refTS
|
||||
}
|
||||
} else {
|
||||
if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
|
||||
f.logger.Infow("resume, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds))
|
||||
logTransition("resume, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds)
|
||||
}
|
||||
nextTS = refTS
|
||||
}
|
||||
f.resumeBehindThreshold = 0.0
|
||||
} else {
|
||||
// switching between layers, check if refTS is too far behind the last sent
|
||||
diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate)
|
||||
@@ -1537,16 +1553,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
// this could be due to pacer trickling out this layer. Error out and wait for a more opportune time.
|
||||
// AVSYNC-TODO: Consider some forcing function to do the switch
|
||||
// (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition).
|
||||
f.logger.Infow("layer switch, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds))
|
||||
logTransition("layer switch, reference too far behind", expectedTS, refTS, lastTS, diffSeconds)
|
||||
return errors.New("switch point too far behind")
|
||||
}
|
||||
// use a nominal increase to ensure that timestamp is always moving forward
|
||||
f.logger.Infow("layer switch, reference is slghtly behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds))
|
||||
logTransition("layer switch, reference is slightly behind", expectedTS, refTS, lastTS, diffSeconds)
|
||||
nextTS = lastTS + 1
|
||||
} else {
|
||||
diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
|
||||
if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
|
||||
f.logger.Infow("layer switch, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds))
|
||||
logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds)
|
||||
nextTS = expectedTS
|
||||
} else {
|
||||
nextTS = refTS
|
||||
@@ -1585,7 +1601,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
|
||||
if f.lastSSRC != extPkt.Packet.SSRC {
|
||||
if err := f.processSourceSwitch(extPkt, layer); err != nil {
|
||||
tp.shouldDrop = true
|
||||
return tp, err
|
||||
return tp, nil
|
||||
}
|
||||
f.logger.Debugw("switching feed", "from", f.lastSSRC, "to", extPkt.Packet.SSRC)
|
||||
f.lastSSRC = extPkt.Packet.SSRC
|
||||
|
||||
Reference in New Issue
Block a user