From 0fd09d73c1eac96ce234afe8a63c1b4b8d4b517c Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 22 Jul 2024 08:44:13 -0700 Subject: [PATCH 01/17] Instantiate default agent dispatches in rtc for better backward compatibility (#2886) --- pkg/agent/client.go | 12 +++++++++++- pkg/rtc/room.go | 10 +++++++++- pkg/service/roomallocator.go | 10 ++-------- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/agent/client.go b/pkg/agent/client.go index 13d4aba9b..df1ff7dfe 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -122,7 +122,11 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut dispatcher := c.getDispatcher(desc.AgentName, desc.JobType) if dispatcher == nil { - logger.Infow("not dispatching agent job since no worker is available", "agentName", desc.AgentName, "jobType", desc.JobType) + logger.Infow("not dispatching agent job since no worker is available", + "agentName", desc.AgentName, + "jobType", desc.JobType, + "room", desc.Room.Name, + "roomID", desc.Room.Sid) return ret } @@ -183,6 +187,12 @@ func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *ser } c.mu.Unlock() + if agName == "" { + // if no agent name is given, we would need to dispatch backwards compatible mode + // which means dispatching to each of the namespaces + return target + } + done := make(chan *serverutils.IncrementalDispatcher[string], 1) c.workers.Submit(func() { agentNames.ForEach(func(ag string) { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index b0c515df0..dde6974aa 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1488,7 +1488,15 @@ func (r *Room) createAgentDispatchesFromRoomAgent() { return } - for _, ag := range r.internal.AgentDispatches { + roomDisp := r.internal.AgentDispatches + if len(roomDisp) == 0 { + // Backward compatibility: by default, start any agent in the empty JobName + roomDisp = []*livekit.RoomAgentDispatch{ + &livekit.RoomAgentDispatch{}, + } + } + + for _, ag := range roomDisp { ad := &livekit.AgentDispatch{ Id: guid.New(guid.AgentDispatchPrefix), AgentName: ag.AgentName, diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 131e77c41..2670985e5 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -106,15 +106,9 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre internal.TrackEgress = req.Egress.Tracks } } - if req.Agent == nil { - // Backward compatibility: by default, start any agent in the empty JobName - req.Agent = &livekit.RoomAgent{ - Dispatches: []*livekit.RoomAgentDispatch{ - &livekit.RoomAgentDispatch{}, - }, - } + if req.Agent != nil { + internal.AgentDispatches = req.Agent.Dispatches } - internal.AgentDispatches = req.Agent.Dispatches if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 { internal.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: true, From f6f6cca13354d5aae71395530dc9853059261893 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 23 Jul 2024 17:58:04 +0800 Subject: [PATCH 02/17] don't push 0 ssrc probing packets to pending queue (#2888) --- pkg/sfu/buffer/buffer.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 5e17e073b..023cade00 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -349,6 +349,13 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { } } + // libwebrtc will use 0 ssrc for probing, don't push the packet to pending queue to avoid memory increasing since + // the Bind will not be called to consume the pending packets. More details in https://github.com/pion/webrtc/pull/2816 + if rtpPacket.SSRC == 0 { + b.Unlock() + return + } + // handle RTX packet if pb := b.primaryBufferForRTX; pb != nil { b.Unlock() From d27ecc07328dccb646a3759a26e932d72eb5e26e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 26 Jul 2024 10:50:51 +0530 Subject: [PATCH 03/17] Switch order so that last calculated clock rate does not get (#2890) divide-by-zero. --- pkg/sfu/buffer/rtpstats_receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 7b2daa8d5..6a1c3bf41 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -585,8 +585,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) return false } - r.updatePropagationDelayAndRecordSenderReport(srDataExt) r.checkRTPClockSkewForSenderReport(srDataExt) + r.updatePropagationDelayAndRecordSenderReport(srDataExt) r.checkRTPClockSkewAgainstMediaPathForSenderReport(srDataExt) if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil { From 3e6e6e27322acc45e88e372867f2e3afc66b6e68 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 29 Jul 2024 10:51:01 +0530 Subject: [PATCH 04/17] Ignore really old packets. (#2891) * Ignore really old packets. There are cases where really old packets (time stamp is way back, but sequence number looks like it is moving forward) which cause the sequence number to update incorrectly. Drop those packets are they are very old. * test --- pkg/sfu/buffer/rtpstats_receiver.go | 17 ++++++++++++++++- pkg/sfu/buffer/rtpstats_receiver_test.go | 18 ++++++++++++++++++ pkg/sfu/utils/wraparound.go | 8 ++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 6a1c3bf41..9c24d6b53 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -167,6 +167,7 @@ func (r *RTPStatsReceiver) Update( var resSN utils.WrapAroundUpdateResult[uint64] var gapSN int64 var resTS utils.WrapAroundUpdateResult[uint64] + var tsRolloverCount int getLoggingFields := func() []interface{} { return []interface{}{ @@ -174,6 +175,7 @@ func (r *RTPStatsReceiver) Update( "gapSN", gapSN, "resTS", resTS, "gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest), + "tsRolloverCount", tsRolloverCount, "packetTime", time.Unix(0, packetTime).String(), "sequenceNumber", sequenceNumber, "timestamp", timestamp, @@ -219,13 +221,26 @@ func (r *RTPStatsReceiver) Update( } gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) - resTS = r.timestamp.Rollover(timestamp, r.getTSRolloverCount(packetTime-r.highestTime)) + tsRolloverCount = r.getTSRolloverCount(packetTime - r.highestTime) + resTS = r.timestamp.Rollover(timestamp, tsRolloverCount) if resTS.IsUnhandled { flowState.IsNotHandled = true return } gapTS := int64(resTS.ExtendedVal - resTS.PreExtendedHighest) + // it is possible to reecive old packets, + // as it is not possible to detect how far to roll back sequence number, ignore old packets + if gapTS < 0 && gapSN > 0 { + r.sequenceNumber.UndoUpdate(resSN) + r.logger.Warnw( + "dropping old packet", nil, + getLoggingFields()..., + ) + flowState.IsNotHandled = true + return + } + // it is possible that sequence number has rolled over too if gapSN < 0 && gapTS > 0 && payloadSize > 0 { // not possible to know how many cycles of sequence number roll over could have happened, diff --git a/pkg/sfu/buffer/rtpstats_receiver_test.go b/pkg/sfu/buffer/rtpstats_receiver_test.go index eb9ed72bf..b9005faf7 100644 --- a/pkg/sfu/buffer/rtpstats_receiver_test.go +++ b/pkg/sfu/buffer/rtpstats_receiver_test.go @@ -249,6 +249,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { // padding only sequenceNumber += 2 + timestamp += 3000 packet = getPacket(sequenceNumber, timestamp, 0) flowState = r.Update( time.Now().UnixNano(), @@ -266,5 +267,22 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.True(t, r.history.IsSet(uint64(sequenceNumber)-1)) require.True(t, r.history.IsSet(uint64(sequenceNumber)-2)) + // old packet, but simulating increasing sequence number after roll over + packet = getPacket(sequenceNumber+400, timestamp-6000, 300) + flowState = r.Update( + time.Now().UnixNano(), + packet.Header.SequenceNumber, + packet.Header.Timestamp, + packet.Header.Marker, + packet.Header.MarshalSize(), + len(packet.Payload), + 0, + ) + require.True(t, flowState.IsNotHandled) + require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) + require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) + require.Equal(t, timestamp, r.timestamp.GetHighest()) + require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) + r.Stop() } diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index d18d5a4ab..18e250f34 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -111,6 +111,14 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { return } +func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) { + if !w.initialized || result.PreExtendedHighest >= result.ExtendedVal { + return + } + + w.ResetHighest(result.PreExtendedHighest) +} + func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) { if !w.initialized || numCycles == 0 { return w.Update(val) From 4e29e18129d9b0c200f71c80978e49812431b0cc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 30 Jul 2024 07:48:06 +0530 Subject: [PATCH 05/17] Set gapSN when initing. (#2893) --- pkg/sfu/buffer/rtpstats_receiver.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 9c24d6b53..54445c1c1 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -255,9 +255,9 @@ func (r *RTPStatsReceiver) Update( "forcing sequence number rollover", nil, getLoggingFields()..., ) - gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) } } + gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) pktSize := uint64(hdrSize + payloadSize + paddingSize) if gapSN <= 0 { // duplicate OR out-of-order @@ -278,8 +278,6 @@ func (r *RTPStatsReceiver) Update( } flowState.IsOutOfOrder = true - flowState.ExtSequenceNumber = resSN.ExtendedVal - flowState.ExtTimestamp = resTS.ExtendedVal if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpNegativeCount++ @@ -331,9 +329,9 @@ func (r *RTPStatsReceiver) Update( flowState.LossStartInclusive = resSN.PreExtendedHighest + 1 flowState.LossEndExclusive = resSN.ExtendedVal } - flowState.ExtSequenceNumber = resSN.ExtendedVal - flowState.ExtTimestamp = resTS.ExtendedVal } + flowState.ExtSequenceNumber = resSN.ExtendedVal + flowState.ExtTimestamp = resTS.ExtendedVal if !flowState.IsDuplicate { if payloadSize == 0 { From d68dd3033d45fd747926a0d55d4d9da8292c797b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 30 Jul 2024 14:21:37 +0530 Subject: [PATCH 06/17] Use extended sequence number in bucket (#2895) --- go.mod | 2 +- go.sum | 4 ++-- pkg/rtc/wrappedreceiver.go | 4 ++-- pkg/sfu/buffer/buffer.go | 19 ++++++++++--------- pkg/sfu/buffer/rtpstats_sender.go | 7 +++++++ pkg/sfu/receiver.go | 6 +++--- pkg/sfu/redprimaryreceiver.go | 4 ++-- pkg/sfu/redreceiver.go | 2 +- pkg/sfu/sequencer.go | 8 ++++---- pkg/sfu/sequencer_test.go | 8 ++++---- 10 files changed, 36 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index add999cac..e9fb31d38 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jellydator/ttlcache/v3 v3.2.0 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 + github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 diff --git a/go.sum b/go.sum index 6b708f506..ad46d5297 100644 --- a/go.sum +++ b/go.sum @@ -165,8 +165,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= -github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= +github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= +github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b h1:Wn6D+B5YbMe1tH7WCazLJz+msBQzR69dK2wTdgJsF5k= github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 547c936e1..854d4f792 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -201,9 +201,9 @@ func (d *DummyReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter return d.headerExtensions } -func (d *DummyReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { +func (d *DummyReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.ReadRTP(buf, layer, sn) + return r.ReadRTP(buf, layer, esn) } return 0, errors.New("no receiver") } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 023cade00..1e38ef8f1 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -73,7 +73,7 @@ type ExtPacket struct { type Buffer struct { sync.RWMutex readCond *sync.Cond - bucket *bucket.Bucket + bucket *bucket.Bucket[uint64] nacker *nack.NackQueue maxVideoPkts int maxAudioPkts int @@ -252,10 +252,11 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili switch { case strings.HasPrefix(b.mime, "audio/"): b.codecType = webrtc.RTPCodecTypeAudio - b.bucket = bucket.NewBucket(InitPacketBufferSizeAudio) + b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeAudio) + case strings.HasPrefix(b.mime, "video/"): b.codecType = webrtc.RTPCodecTypeVideo - b.bucket = bucket.NewBucket(InitPacketBufferSizeVideo) + b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeVideo) if b.frameRateCalculator[0] == nil { if strings.EqualFold(codec.MimeType, webrtc.MimeTypeVP8) { b.frameRateCalculator[0] = NewFrameRateCalculatorVP8(b.clockRate, b.logger) @@ -625,7 +626,7 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, i } flowState.ExtSequenceNumber -= snAdjustment rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber) - _, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, rtpPacket.Header.SequenceNumber) + _, err = b.bucket.AddPacketWithSequenceNumber(rawPkt, flowState.ExtSequenceNumber) if err != nil { if !flowState.IsDuplicate { if errors.Is(err, bucket.ErrPacketTooOld) { @@ -671,7 +672,7 @@ func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, i } func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket { - n, err := b.getPacket(buf, ep.Packet.SequenceNumber) + n, err := b.getPacket(buf, ep.ExtSequenceNumber) if err != nil { packetNotFoundCount := b.packetNotFoundCount.Inc() if (packetNotFoundCount-1)%20 == 0 { @@ -1025,18 +1026,18 @@ func (b *Buffer) getRTCP() []rtcp.Packet { return pkts } -func (b *Buffer) GetPacket(buff []byte, sn uint16) (int, error) { +func (b *Buffer) GetPacket(buff []byte, esn uint64) (int, error) { b.Lock() defer b.Unlock() - return b.getPacket(buff, sn) + return b.getPacket(buff, esn) } -func (b *Buffer) getPacket(buff []byte, sn uint16) (int, error) { +func (b *Buffer) getPacket(buff []byte, esn uint64) (int, error) { if b.closed.Load() { return 0, io.EOF } - return b.bucket.GetPacket(buff, sn) + return b.bucket.GetPacket(buff, esn) } func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) { diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 6494013c6..43ac5eb4a 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -990,6 +990,13 @@ func (r *RTPStatsSender) getIntervalStats( return } +func (r *RTPStatsSender) ExtHighestSequenceNumber() uint64 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.extHighestSN +} + // ------------------------------------------------------------------- type lockedRTPStatsSenderLogEncoder struct { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 7841a8308..cb55b19c4 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -56,7 +56,7 @@ type TrackReceiver interface { HeaderExtensions() []webrtc.RTPHeaderExtensionParameter IsClosed() bool - ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) + ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) GetLayeredBitrate() ([]int32, Bitrates) GetAudioLevel() (float64, bool) @@ -575,13 +575,13 @@ func (w *WebRTCReceiver) getBufferLocked(layer int32) *buffer.Buffer { return w.buffers[layer] } -func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { +func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) { b := w.getBuffer(int32(layer)) if b == nil { return 0, ErrBufferNotFound } - return b.GetPacket(buf, sn) + return b.GetPacket(buf, esn) } func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats { diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index 956ee7251..e04d56f82 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -136,8 +136,8 @@ func (r *RedPrimaryReceiver) Close() { closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks()) } -func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { - n, err := r.TrackReceiver.ReadRTP(buf, layer, sn) +func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) { + n, err := r.TrackReceiver.ReadRTP(buf, layer, esn) if err != nil { return n, err } diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index 9d24b997b..4af4fb5f5 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -127,7 +127,7 @@ func (r *RedReceiver) Close() { closeTrackSenders(r.downTrackSpreader.ResetAndGetDownTracks()) } -func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { +func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, esn uint64) (int, error) { // red encoding doesn't support nack return 0, bucket.ErrPacketMismatch } diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index 22750a037..294029cc5 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -42,10 +42,10 @@ func itob(i int) bool { } type packetMeta struct { - // Original sequence number from stream. - // The original sequence number is used to find the original + // Original extended sequence number from stream. + // The original extended sequence number is used to find the original // packet from publisher - sourceSeqNo uint16 + sourceSeqNo uint64 // Modified sequence number after offset. // This sequence number is used for the associated // down track, is modified according the offsets, and @@ -199,7 +199,7 @@ func (s *sequencer) push( slot := extModifiedSNAdjusted % uint64(s.size) s.meta[slot] = packetMeta{ - sourceSeqNo: uint16(extIncomingSN), + sourceSeqNo: extIncomingSN, targetSeqNo: uint16(extModifiedSN), timestamp: uint32(extModifiedTS), marker: marker, diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index 773c54309..862ac531b 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -45,7 +45,7 @@ func Test_sequencer(t *testing.T) { require.Equal(t, len(req), len(res)) for i, val := range res { require.Equal(t, val.targetSeqNo, req[i]) - require.Equal(t, val.sourceSeqNo, req[i]-off) + require.Equal(t, val.sourceSeqNo, uint64(req[i]-off)) require.Equal(t, val.layer, int8(2)) require.Equal(t, val.extSequenceNumber, uint64(req[i])) require.Equal(t, val.extTimestamp, uint64(123)) @@ -57,7 +57,7 @@ func Test_sequencer(t *testing.T) { require.Equal(t, len(req), len(res)) for i, val := range res { require.Equal(t, val.targetSeqNo, req[i]) - require.Equal(t, val.sourceSeqNo, req[i]-off) + require.Equal(t, val.sourceSeqNo, uint64(req[i]-off)) require.Equal(t, val.layer, int8(2)) require.Equal(t, val.extSequenceNumber, uint64(req[i])) require.Equal(t, val.extTimestamp, uint64(123)) @@ -204,7 +204,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { g := n.getExtPacketMetas(tt.args.seqNo) var got []uint16 for _, sn := range g { - got = append(got, sn.sourceSeqNo) + got = append(got, uint16(sn.sourceSeqNo)) if sn.sourceSeqNo%5 == 0 { require.Equal(t, tt.fields.markerOdd, sn.marker) require.Equal(t, tt.fields.codecBytesOversized, sn.codecBytesSlice) @@ -343,7 +343,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { g := n.getExtPacketMetas(tt.args.seqNo) var got []uint16 for _, sn := range g { - got = append(got, sn.sourceSeqNo) + got = append(got, uint16(sn.sourceSeqNo)) if sn.sourceSeqNo%2 == 0 { require.Equal(t, tt.fields.markerEven, sn.marker) require.Equal(t, tt.fields.codecBytesEven, sn.codecBytes[:sn.numCodecBytesOut]) From c2c187202f9a87a34afb19f931a6de0c263175c6 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 31 Jul 2024 12:45:19 +0530 Subject: [PATCH 07/17] Fix forced rollover of RTP time stamp. (#2896) * Fix forced rollover of RTP time stamp. Was erroneously forcing a rollover when the timestamp jump actually has room to accommodate large jumps. For example, before pause ts = 10, then eight hour pause, restart ts = 10 + (8 * 00 * 60 * 90000) = 2592000010 (at 90000 clock rate for video). In normal processing, it will look like out-of-order as the difference 2592000000 is more than half the 32-bit range. But, forcing a roll over is incorrect. Fix by calculating excess over the full range and then account for wrap around. * log potential ts rollover * clamp at min 0 --- pkg/sfu/buffer/rtpstats_receiver.go | 26 +++++++++++++++++++++----- pkg/sfu/utils/wraparound.go | 2 +- pkg/sfu/utils/wraparound_test.go | 7 ++++--- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 54445c1c1..300c3f2c5 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -137,14 +137,21 @@ func (r *RTPStatsReceiver) NewSnapshotId() uint32 { return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest()) } -func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64) int { +func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64, ts uint32) int { if diffNano < r.tsRolloverThreshold { // time not more than rollover threshold - return 0 + return -1 } - excess := int((diffNano - r.tsRolloverThreshold) * int64(r.params.ClockRate) / 1e9) - return excess/(1<<32) + 1 + excess := int((diffNano - r.tsRolloverThreshold*2) * int64(r.params.ClockRate) / 1e9) + roc := excess / (1 << 32) + if roc < 0 { + roc = 0 + } + if r.timestamp.GetHighest() > ts { + roc++ + } + return roc } func (r *RTPStatsReceiver) Update( @@ -167,6 +174,7 @@ func (r *RTPStatsReceiver) Update( var resSN utils.WrapAroundUpdateResult[uint64] var gapSN int64 var resTS utils.WrapAroundUpdateResult[uint64] + var timeSinceHighest int64 var tsRolloverCount int getLoggingFields := func() []interface{} { @@ -175,6 +183,7 @@ func (r *RTPStatsReceiver) Update( "gapSN", gapSN, "resTS", resTS, "gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest), + "timeSinceHighest", time.Duration(timeSinceHighest), "tsRolloverCount", tsRolloverCount, "packetTime", time.Unix(0, packetTime).String(), "sequenceNumber", sequenceNumber, @@ -221,7 +230,14 @@ func (r *RTPStatsReceiver) Update( } gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) - tsRolloverCount = r.getTSRolloverCount(packetTime - r.highestTime) + timeSinceHighest = packetTime - r.highestTime + tsRolloverCount = r.getTSRolloverCount(timeSinceHighest, timestamp) + if tsRolloverCount >= 0 { + r.logger.Warnw( + "potential time stamp roll over", nil, + getLoggingFields()..., + ) + } resTS = r.timestamp.Rollover(timestamp, tsRolloverCount) if resTS.IsUnhandled { flowState.IsNotHandled = true diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 18e250f34..2a30a6325 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -120,7 +120,7 @@ func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) { } func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) { - if !w.initialized || numCycles == 0 { + if !w.initialized || numCycles < 0 { return w.Update(val) } diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go index 76cb8fdb3..4f2e505d5 100644 --- a/pkg/sfu/utils/wraparound_test.go +++ b/pkg/sfu/utils/wraparound_test.go @@ -476,10 +476,11 @@ func TestWrapAroundUint16Rollover(t *testing.T) { highest: 10, extendedHighest: 10, }, - // zero cycles - should just do an update + // negative cycles - should just do an update { - name: "zero", - input: 8, + name: "zero", + input: 8, + numCycles: -1, updated: WrapAroundUpdateResult[uint32]{ IsUnhandled: true, // the following fields are not valid when `IsUnhandled = true`, but code fills it in From 18fd62280220da0d9e92592cfd1c472c1bb8bddf Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 1 Aug 2024 13:26:25 +0530 Subject: [PATCH 08/17] Add API to get highest time stamp from RTPStatsReceiver. (#2898) --- pkg/sfu/buffer/rtpstats_receiver.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 300c3f2c5..46fa77331 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -760,6 +760,13 @@ func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool { return diff >= 0 && diff < cHistorySize } +func (r *RTPStatsReceiver) HighestTimestamp() uint32 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.timestamp.GetHighest() +} + // ---------------------------------- type lockedRTPStatsReceiverLogEncoder struct { From 1993c87fd8b05f3df94c6ae48b1d4e411d5f9d79 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 1 Aug 2024 22:08:53 +0530 Subject: [PATCH 09/17] Do not force rollover if ts rollover is not active. (#2899) There are cases of small negative sequence number jump and small positive time stamp jump. Those should not force rollover. Maybe, they should be dropped, but just logging for now till we learn more. --- pkg/sfu/buffer/rtpstats_receiver.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 46fa77331..8944b96ba 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -259,18 +259,25 @@ func (r *RTPStatsReceiver) Update( // it is possible that sequence number has rolled over too if gapSN < 0 && gapTS > 0 && payloadSize > 0 { - // not possible to know how many cycles of sequence number roll over could have happened, - // use 1 to ensure that it at least does not go backwards - resSN = r.sequenceNumber.Rollover(sequenceNumber, 1) - if resSN.IsUnhandled { - flowState.IsNotHandled = true - return - } + if tsRolloverCount >= 0 { + // not possible to know how many cycles of sequence number roll over could have happened, + // use 1 to ensure that it at least does not go backwards + resSN = r.sequenceNumber.Rollover(sequenceNumber, 1) + if resSN.IsUnhandled { + flowState.IsNotHandled = true + return + } - r.logger.Warnw( - "forcing sequence number rollover", nil, - getLoggingFields()..., - ) + r.logger.Warnw( + "forcing sequence number rollover", nil, + getLoggingFields()..., + ) + } else { + r.logger.Warnw( + "forcing sequence number rollover skipped", nil, + getLoggingFields()..., + ) + } } } gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) From b1fbca066f9e62c38f2052d8ae71bf389478e21b Mon Sep 17 00:00:00 2001 From: David Zhao Date: Thu, 1 Aug 2024 19:40:06 -0700 Subject: [PATCH 10/17] Send the correct room closed reason to clients (#2901) * Send the correct room closed reason to clients * update go mod --- go.mod | 8 ++++---- go.sum | 16 ++++++++-------- pkg/rtc/room.go | 2 +- pkg/rtc/types/interfaces.go | 5 +++++ 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index e9fb31d38..b7be18497 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b + github.com/livekit/protocol v1.19.2-0.20240802021657-b92b88cd1e91 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -40,7 +40,7 @@ require ( github.com/pion/webrtc/v3 v3.2.47 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.1 - github.com/redis/go-redis/v9 v9.5.4 + github.com/redis/go-redis/v9 v9.6.1 github.com/rs/cors v1.11.0 github.com/stretchr/testify v1.9.0 github.com/thoas/go-funk v0.9.3 @@ -51,7 +51,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8 + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/sync v0.7.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 @@ -135,7 +135,7 @@ require ( golang.org/x/text v0.16.0 // indirect golang.org/x/tools v0.23.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f // indirect google.golang.org/grpc v1.65.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index ad46d5297..534bc7811 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b h1:Wn6D+B5YbMe1tH7WCazLJz+msBQzR69dK2wTdgJsF5k= -github.com/livekit/protocol v1.19.2-0.20240719172332-0df8e893874b/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/protocol v1.19.2-0.20240802021657-b92b88cd1e91 h1:GnSc5fJNO8V0tiBRrlwTP3kD3zZ50wdjYJSDwQN5Nzg= +github.com/livekit/protocol v1.19.2-0.20240802021657-b92b88cd1e91/go.mod h1:oU5XbEaQlywdgXcSQDzrI5CPnwuGn/HuRXuQaDxVryQ= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -288,8 +288,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= -github.com/redis/go-redis/v9 v9.5.4 h1:vOFYDKKVgrI5u++QvnMT7DksSMYg7Aw/Np4vLJLKLwY= -github.com/redis/go-redis/v9 v9.5.4/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= @@ -365,8 +365,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= -golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8 h1:Z+vTUQyBb738QmIhbJx3z4htsxDeI+rd0EHvNm8jHkg= -golang.org/x/exp v0.0.0-20240716160929-1d5bc16f04a8/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -494,8 +494,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d h1:JU0iKnSg02Gmb5ZdV8nYsKEKsP6o/FGVWTrw4i1DA9A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f h1:RARaIm8pxYuxyNPbBQf5igT7XdOyCNtat1qAT2ZxjU4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index dde6974aa..74ddbe81d 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -791,7 +791,7 @@ func (r *Room) CloseIfEmpty() { r.lock.Unlock() if elapsed >= int64(timeout) { - r.Close(types.ParticipantCloseReasonNone) + r.Close(types.ParticipantCloseReasonRoomClosed) } } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index afe74ec91..c87b7d16a 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -106,6 +106,7 @@ const ( ParticipantCloseReasonDataChannelError ParticipantCloseReasonMigrateCodecMismatch ParticipantCloseReasonSignalSourceClose + ParticipantCloseReasonRoomClosed ) func (p ParticipantCloseReason) String() string { @@ -158,6 +159,8 @@ func (p ParticipantCloseReason) String() string { return "MIGRATE_CODEC_MISMATCH" case ParticipantCloseReasonSignalSourceClose: return "SIGNAL_SOURCE_CLOSE" + case ParticipantCloseReasonRoomClosed: + return "ROOM_CLOSED" default: return fmt.Sprintf("%d", int(p)) } @@ -188,6 +191,8 @@ func (p ParticipantCloseReason) ToDisconnectReason() livekit.DisconnectReason { return livekit.DisconnectReason_STATE_MISMATCH case ParticipantCloseReasonSignalSourceClose: return livekit.DisconnectReason_SIGNAL_CLOSE + case ParticipantCloseReasonRoomClosed: + return livekit.DisconnectReason_ROOM_CLOSED default: // the other types will map to unknown reason return livekit.DisconnectReason_UNKNOWN_REASON From 0c842594e542d91bfbe400373d537e1b44beaf85 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Fri, 2 Aug 2024 18:21:59 -0700 Subject: [PATCH 11/17] Update module github.com/livekit/protocol to v1.19.3 (#2733) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b7be18497..05c5c572a 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.19.2-0.20240802021657-b92b88cd1e91 + github.com/livekit/protocol v1.19.3 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 534bc7811..9a940e78f 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240802021657-b92b88cd1e91 h1:GnSc5fJNO8V0tiBRrlwTP3kD3zZ50wdjYJSDwQN5Nzg= -github.com/livekit/protocol v1.19.2-0.20240802021657-b92b88cd1e91/go.mod h1:oU5XbEaQlywdgXcSQDzrI5CPnwuGn/HuRXuQaDxVryQ= +github.com/livekit/protocol v1.19.3 h1:VOoQV21xm4nWVnGPdvCKUZAy08Q+njD167Pzo24uzBI= +github.com/livekit/protocol v1.19.3/go.mod h1:oU5XbEaQlywdgXcSQDzrI5CPnwuGn/HuRXuQaDxVryQ= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= From 815ba1933bcf3837acf5a568c73b5ad2bdd264ad Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sat, 3 Aug 2024 18:12:24 -0700 Subject: [PATCH 12/17] update readme (#2905) --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 896cf9d4c..313e8fe21 100644 --- a/README.md +++ b/README.md @@ -305,11 +305,11 @@ LiveKit server is licensed under Apache License v2.0.
- + - - + +
LiveKit Ecosystem
Real-time SDKsReact Components · Browser · iOS/macOS · Android · Flutter · React Native · Rust · Node.js · Python · Unity (web) · Unity (beta)
Realtime SDKsReact Components · Browser · Swift Components · iOS/macOS/visionOS · Android · Flutter · React Native · Rust · Node.js · Python · Unity (web) · Unity (beta)
Server APIsNode.js · Golang · Ruby · Java/Kotlin · Python · Rust · PHP (community)
Agents FrameworksPython · Playground
ServicesLivekit server · Egress · Ingress · SIP
ResourcesDocs · Example apps · Cloud · Self-hosting · CLI
ServicesLiveKit server · Egress · Ingress · SIP
ResourcesDocs · Example apps · Cloud · Self-hosting · CLI
From 9ac8be8bc200b954512ad77960c16c5a98f0c35a Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Sun, 4 Aug 2024 16:52:41 -0700 Subject: [PATCH 13/17] Update go deps (#2873) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 8 ++++---- go.sum | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 05c5c572a..84485e790 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/d5/tengo/v2 v2.17.0 github.com/dustin/go-humanize v1.0.1 github.com/elliotchance/orderedmap/v2 v2.2.0 - github.com/florianl/go-tc v0.4.3 + github.com/florianl/go-tc v0.4.4 github.com/frostbyte73/core v0.0.10 github.com/gammazero/deque v0.2.1 github.com/gammazero/workerpool v1.1.3 @@ -46,13 +46,13 @@ require ( github.com/thoas/go-funk v0.9.3 github.com/twitchtv/twirp v8.1.3+incompatible github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 - github.com/urfave/cli/v2 v2.27.2 + github.com/urfave/cli/v2 v2.27.3 github.com/urfave/negroni/v3 v3.1.1 go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 - golang.org/x/sync v0.7.0 + golang.org/x/sync v0.8.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -125,7 +125,7 @@ require ( github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect - github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect + github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/zap/exp v0.2.0 // indirect golang.org/x/crypto v0.25.0 // indirect diff --git a/go.sum b/go.sum index 9a940e78f..eb17f3941 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,8 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= -github.com/florianl/go-tc v0.4.3 h1:xpobG2gFNvEqbclU07zjddALSjqTQTWJkxg5/kRYDpw= -github.com/florianl/go-tc v0.4.3/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc= +github.com/florianl/go-tc v0.4.4 h1:q6lhEWEfyhGffRzdl3eIcNqX/yVIw0IJwXqa9Rdcctw= +github.com/florianl/go-tc v0.4.4/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJORsU= @@ -323,8 +323,8 @@ github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJX github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 h1:SIKIoA4e/5Y9ZOl0DCe3eVMLPOQzJxgZpfdHHeauNTM= github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6/go.mod h1:BUbeWZiieNxAuuADTBNb3/aeje6on3DhU3rpWsQSB1E= -github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI= -github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM= +github.com/urfave/cli/v2 v2.27.3 h1:/POWahRmdh7uztQ3CYnaDddk0Rm90PyOgIxgW2rr41M= +github.com/urfave/cli/v2 v2.27.3/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ= github.com/urfave/negroni/v3 v3.1.1 h1:6MS4nG9Jk/UuCACaUlNXCbiKa0ywF9LXz5dGu09v8hw= github.com/urfave/negroni/v3 v3.1.1/go.mod h1:jWvnX03kcSjDBl/ShB0iHvx5uOs7mAzZXW+JvJ5XYAs= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -334,8 +334,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= -github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw= -github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -411,8 +411,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190411185658-b44545bcd369/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= From 5e4c6d46fbd4b59860816c1bdc4311bdde89a23e Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 5 Aug 2024 13:46:56 +0800 Subject: [PATCH 14/17] rename log (#2906) --- pkg/rtc/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 146861937..4d775698c 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1409,7 +1409,7 @@ func (t *PCTransport) handleRemoteICECandidate(e event) error { t.params.Logger.Warnw("failed to add cached ICE candidate", err, "candidate", c) return errors.Wrap(err, "add ice candidate failed") } else { - t.params.Logger.Debugw("added cached ICE candidate", "candidate", c) + t.params.Logger.Debugw("added ICE candidate", "candidate", c) } return nil From 8c323330b6678e0224178338b64237f159017f5b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 5 Aug 2024 21:13:07 +0530 Subject: [PATCH 15/17] Store subscriber forwarder state (#2907) * Forwarder state for migrating participant. * clean up * update protocol deps * cleanup debug --- go.mod | 8 +- go.sum | 16 ++-- pkg/rtc/mediatracksubscriptions.go | 8 +- pkg/rtc/participant.go | 7 +- pkg/rtc/subscriptionmanager.go | 20 +++++ pkg/rtc/types/interfaces.go | 8 +- .../typesfakes/fake_local_participant.go | 83 +++++++++++++++++-- pkg/sfu/codecmunger/vp8.go | 34 ++------ pkg/sfu/downtrack.go | 24 ++++-- pkg/sfu/forwarder.go | 81 +++++++----------- pkg/sfu/rtpmunger.go | 49 ++++------- 11 files changed, 193 insertions(+), 145 deletions(-) diff --git a/go.mod b/go.mod index 84485e790..3977bd586 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.19.3 + github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -129,13 +129,13 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/zap/exp v0.2.0 // indirect golang.org/x/crypto v0.25.0 // indirect - golang.org/x/mod v0.19.0 // indirect + golang.org/x/mod v0.20.0 // indirect golang.org/x/net v0.27.0 // indirect - golang.org/x/sys v0.22.0 // indirect + golang.org/x/sys v0.23.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/tools v0.23.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect google.golang.org/grpc v1.65.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index eb17f3941..f78e4d63f 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.3 h1:VOoQV21xm4nWVnGPdvCKUZAy08Q+njD167Pzo24uzBI= -github.com/livekit/protocol v1.19.3/go.mod h1:oU5XbEaQlywdgXcSQDzrI5CPnwuGn/HuRXuQaDxVryQ= +github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1 h1:GP4QtOjYE6zDdtIi8AyM6ukse55HXr0174uOYXxb/H8= +github.com/livekit/protocol v1.19.4-0.20240805121416-5be7cb358ec1/go.mod h1:oU5XbEaQlywdgXcSQDzrI5CPnwuGn/HuRXuQaDxVryQ= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -373,8 +373,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91 golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= -golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -452,8 +452,8 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -494,8 +494,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f h1:RARaIm8pxYuxyNPbBQf5igT7XdOyCNtat1qAT2ZxjU4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index acd85037d..c16b4bb7f 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -21,7 +21,6 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" - "go.uber.org/atomic" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" @@ -162,7 +161,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * }) // Bind callback can happen from replaceTrack, so set it up early - var reusingTransceiver atomic.Bool var dtState sfu.DownTrackState downTrack.OnBinding(func(err error) { if err != nil { @@ -170,9 +168,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * return } wr.DetermineReceiver(downTrack.Codec()) - if reusingTransceiver.Load() { - downTrack.SeedState(dtState) - } + downTrack.SeedState(dtState) if err = wr.AddDownTrack(downTrack); err != nil && err != sfu.ErrReceiverClosed { sub.GetLogger().Errorw( "could not add down track", err, @@ -220,7 +216,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * "publisherID", subTrack.PublisherID(), "trackID", trackID, ) - reusingTransceiver.Store(true) rtpSender := existingTransceiver.Sender() if rtpSender != nil { // replaced track will bind immediately without negotiation, SetTransceiver first before bind @@ -251,7 +246,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * existingTransceiver.Stop() } } - reusingTransceiver.Store(false) // if cannot replace, find an unused transceiver or add new one if transceiver == nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index dc0828c66..0bf290ceb 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -862,6 +862,7 @@ func (p *ParticipantImpl) SetMigrateInfo( previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo, + forwarderStates map[livekit.TrackID]*livekit.RTPForwarderState, ) { p.pendingTracksLock.Lock() for _, t := range mediaTracks { @@ -882,6 +883,10 @@ func (p *ParticipantImpl) SetMigrateInfo( } p.TransportManager.SetMigrateInfo(previousOffer, previousAnswer, dataChannels) + + for trackID, fs := range forwarderStates { + p.CacheDownTrack(trackID, nil, sfu.DownTrackState{ForwarderState: fs}) + } } func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason, isExpectedToResume bool) error { @@ -2459,7 +2464,7 @@ func (p *ParticipantImpl) setDowntracksConnected() { func (p *ParticipantImpl) CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, downTrack sfu.DownTrackState) { p.lock.Lock() if existing := p.cachedDownTracks[trackID]; existing != nil && existing.transceiver != rtpTransceiver { - p.subLogger.Infow("cached transceiver changed", "trackID", trackID) + p.subLogger.Warnw("cached transceiver changed", nil, "trackID", trackID) } p.cachedDownTracks[trackID] = &downTrackState{transceiver: rtpTransceiver, downTrack: downTrack} p.subLogger.Debugw("caching downtrack", "trackID", trackID) diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 4a49b11db..37ca19561 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -180,6 +180,26 @@ func (m *SubscriptionManager) GetSubscribedTracks() []types.SubscribedTrack { return tracks } +func (m *SubscriptionManager) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState { + m.lock.RLock() + defer m.lock.RUnlock() + + states := make(map[livekit.TrackID]*livekit.RTPForwarderState, len(m.subscriptions)) + for trackID, t := range m.subscriptions { + st := t.getSubscribedTrack() + if st != nil { + dt := st.DownTrack() + if dt != nil { + state := dt.StopWriteAndGetState() + if state.ForwarderState != nil { + states[trackID] = state.ForwarderState + } + } + } + } + return states +} + func (m *SubscriptionManager) HasSubscriptions() bool { m.lock.RLock() defer m.lock.RUnlock() diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index c87b7d16a..593c47e96 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -369,6 +369,7 @@ type LocalParticipant interface { // WaitUntilSubscribed waits until all subscriptions have been settled, or if the timeout // has been reached. If the timeout expires, it will return an error. WaitUntilSubscribed(timeout time.Duration) error + StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState // returns list of participant identities that the current participant is subscribed to GetSubscribedParticipants() []livekit.ParticipantID @@ -412,7 +413,12 @@ type LocalParticipant interface { NotifyMigration() SetMigrateState(s MigrateState) MigrateState() MigrateState - SetMigrateInfo(previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) + SetMigrateInfo( + previousOffer, previousAnswer *webrtc.SessionDescription, + mediaTracks []*livekit.TrackPublishedResponse, + dataChannels []*livekit.DataChannelInfo, + forwarderStates map[livekit.TrackID]*livekit.RTPForwarderState, + ) UpdateMediaRTT(rtt uint32) UpdateSignalingRTT(rtt uint32) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index a21c7c04d..918a5f6a0 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -782,13 +782,14 @@ type FakeLocalParticipant struct { setMetadataArgsForCall []struct { arg1 string } - SetMigrateInfoStub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) + SetMigrateInfoStub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo, map[livekit.TrackID]*livekit.RTPForwarderState) setMigrateInfoMutex sync.RWMutex setMigrateInfoArgsForCall []struct { arg1 *webrtc.SessionDescription arg2 *webrtc.SessionDescription arg3 []*livekit.TrackPublishedResponse arg4 []*livekit.DataChannelInfo + arg5 map[livekit.TrackID]*livekit.RTPForwarderState } SetMigrateStateStub func(types.MigrateState) setMigrateStateMutex sync.RWMutex @@ -854,6 +855,16 @@ type FakeLocalParticipant struct { stateReturnsOnCall map[int]struct { result1 livekit.ParticipantInfo_State } + StopAndGetSubscribedTracksForwarderStateStub func() map[livekit.TrackID]*livekit.RTPForwarderState + stopAndGetSubscribedTracksForwarderStateMutex sync.RWMutex + stopAndGetSubscribedTracksForwarderStateArgsForCall []struct { + } + stopAndGetSubscribedTracksForwarderStateReturns struct { + result1 map[livekit.TrackID]*livekit.RTPForwarderState + } + stopAndGetSubscribedTracksForwarderStateReturnsOnCall map[int]struct { + result1 map[livekit.TrackID]*livekit.RTPForwarderState + } SubscribeToTrackStub func(livekit.TrackID) subscribeToTrackMutex sync.RWMutex subscribeToTrackArgsForCall []struct { @@ -5193,7 +5204,7 @@ func (fake *FakeLocalParticipant) SetMetadataArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription, arg2 *webrtc.SessionDescription, arg3 []*livekit.TrackPublishedResponse, arg4 []*livekit.DataChannelInfo) { +func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription, arg2 *webrtc.SessionDescription, arg3 []*livekit.TrackPublishedResponse, arg4 []*livekit.DataChannelInfo, arg5 map[livekit.TrackID]*livekit.RTPForwarderState) { var arg3Copy []*livekit.TrackPublishedResponse if arg3 != nil { arg3Copy = make([]*livekit.TrackPublishedResponse, len(arg3)) @@ -5210,12 +5221,13 @@ func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription arg2 *webrtc.SessionDescription arg3 []*livekit.TrackPublishedResponse arg4 []*livekit.DataChannelInfo - }{arg1, arg2, arg3Copy, arg4Copy}) + arg5 map[livekit.TrackID]*livekit.RTPForwarderState + }{arg1, arg2, arg3Copy, arg4Copy, arg5}) stub := fake.SetMigrateInfoStub - fake.recordInvocation("SetMigrateInfo", []interface{}{arg1, arg2, arg3Copy, arg4Copy}) + fake.recordInvocation("SetMigrateInfo", []interface{}{arg1, arg2, arg3Copy, arg4Copy, arg5}) fake.setMigrateInfoMutex.Unlock() if stub != nil { - fake.SetMigrateInfoStub(arg1, arg2, arg3, arg4) + fake.SetMigrateInfoStub(arg1, arg2, arg3, arg4, arg5) } } @@ -5225,17 +5237,17 @@ func (fake *FakeLocalParticipant) SetMigrateInfoCallCount() int { return len(fake.setMigrateInfoArgsForCall) } -func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)) { +func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo, map[livekit.TrackID]*livekit.RTPForwarderState)) { fake.setMigrateInfoMutex.Lock() defer fake.setMigrateInfoMutex.Unlock() fake.SetMigrateInfoStub = stub } -func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) (*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) { +func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) (*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo, map[livekit.TrackID]*livekit.RTPForwarderState) { fake.setMigrateInfoMutex.RLock() defer fake.setMigrateInfoMutex.RUnlock() argsForCall := fake.setMigrateInfoArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeLocalParticipant) SetMigrateState(arg1 types.MigrateState) { @@ -5607,6 +5619,59 @@ func (fake *FakeLocalParticipant) StateReturnsOnCall(i int, result1 livekit.Part }{result1} } +func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderState() map[livekit.TrackID]*livekit.RTPForwarderState { + fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock() + ret, specificReturn := fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall[len(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall)] + fake.stopAndGetSubscribedTracksForwarderStateArgsForCall = append(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall, struct { + }{}) + stub := fake.StopAndGetSubscribedTracksForwarderStateStub + fakeReturns := fake.stopAndGetSubscribedTracksForwarderStateReturns + fake.recordInvocation("StopAndGetSubscribedTracksForwarderState", []interface{}{}) + fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateCallCount() int { + fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock() + defer fake.stopAndGetSubscribedTracksForwarderStateMutex.RUnlock() + return len(fake.stopAndGetSubscribedTracksForwarderStateArgsForCall) +} + +func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateCalls(stub func() map[livekit.TrackID]*livekit.RTPForwarderState) { + fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock() + defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock() + fake.StopAndGetSubscribedTracksForwarderStateStub = stub +} + +func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateReturns(result1 map[livekit.TrackID]*livekit.RTPForwarderState) { + fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock() + defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock() + fake.StopAndGetSubscribedTracksForwarderStateStub = nil + fake.stopAndGetSubscribedTracksForwarderStateReturns = struct { + result1 map[livekit.TrackID]*livekit.RTPForwarderState + }{result1} +} + +func (fake *FakeLocalParticipant) StopAndGetSubscribedTracksForwarderStateReturnsOnCall(i int, result1 map[livekit.TrackID]*livekit.RTPForwarderState) { + fake.stopAndGetSubscribedTracksForwarderStateMutex.Lock() + defer fake.stopAndGetSubscribedTracksForwarderStateMutex.Unlock() + fake.StopAndGetSubscribedTracksForwarderStateStub = nil + if fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall == nil { + fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall = make(map[int]struct { + result1 map[livekit.TrackID]*livekit.RTPForwarderState + }) + } + fake.stopAndGetSubscribedTracksForwarderStateReturnsOnCall[i] = struct { + result1 map[livekit.TrackID]*livekit.RTPForwarderState + }{result1} +} + func (fake *FakeLocalParticipant) SubscribeToTrack(arg1 livekit.TrackID) { fake.subscribeToTrackMutex.Lock() fake.subscribeToTrackArgsForCall = append(fake.subscribeToTrackArgsForCall, struct { @@ -6851,6 +6916,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.setTrackMutedMutex.RUnlock() fake.stateMutex.RLock() defer fake.stateMutex.RUnlock() + fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock() + defer fake.stopAndGetSubscribedTracksForwarderStateMutex.RUnlock() fake.subscribeToTrackMutex.RLock() defer fake.subscribeToTrackMutex.RUnlock() fake.subscriberAsPrimaryMutex.RLock() diff --git a/pkg/sfu/codecmunger/vp8.go b/pkg/sfu/codecmunger/vp8.go index 97c37bc4c..3c89d2e1a 100644 --- a/pkg/sfu/codecmunger/vp8.go +++ b/pkg/sfu/codecmunger/vp8.go @@ -15,10 +15,9 @@ package codecmunger import ( - "fmt" - "github.com/elliotchance/orderedmap/v2" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -32,23 +31,6 @@ const ( // ----------------------------------------------------------- -type VP8State struct { - ExtLastPictureId int32 - PictureIdUsed bool - LastTl0PicIdx uint8 - Tl0PicIdxUsed bool - TidUsed bool - LastKeyIdx uint8 - KeyIdxUsed bool -} - -func (v VP8State) String() string { - return fmt.Sprintf("VP8State{extLastPictureId: %d, pictureIdUsed: %+v, lastTl0PicIdx: %d, tl0PicIdxUsed: %+v, tidUsed: %+v, lastKeyIdx: %d, keyIdxUsed: %+v)", - v.ExtLastPictureId, v.PictureIdUsed, v.LastTl0PicIdx, v.Tl0PicIdxUsed, v.TidUsed, v.LastKeyIdx, v.KeyIdxUsed) -} - -// ----------------------------------------------------------- - type VP8 struct { logger logger.Logger @@ -85,25 +67,27 @@ func NewVP8FromNull(cm CodecMunger, logger logger.Logger) *VP8 { } func (v *VP8) GetState() interface{} { - return VP8State{ + return &livekit.VP8MungerState{ ExtLastPictureId: v.extLastPictureId, PictureIdUsed: v.pictureIdUsed, - LastTl0PicIdx: v.lastTl0PicIdx, + LastTl0PicIdx: uint32(v.lastTl0PicIdx), Tl0PicIdxUsed: v.tl0PicIdxUsed, TidUsed: v.tidUsed, - LastKeyIdx: v.lastKeyIdx, + LastKeyIdx: uint32(v.lastKeyIdx), KeyIdxUsed: v.keyIdxUsed, } } func (v *VP8) SeedState(seed interface{}) { - if state, ok := seed.(VP8State); ok { + switch cm := seed.(type) { + case *livekit.RTPForwarderState_Vp8Munger: + state := cm.Vp8Munger v.extLastPictureId = state.ExtLastPictureId v.pictureIdUsed = state.PictureIdUsed - v.lastTl0PicIdx = state.LastTl0PicIdx + v.lastTl0PicIdx = uint8(state.LastTl0PicIdx) v.tl0PicIdxUsed = state.Tl0PicIdxUsed v.tidUsed = state.TidUsed - v.lastKeyIdx = state.LastKeyIdx + v.lastKeyIdx = uint8(state.LastKeyIdx) v.keyIdxUsed = state.KeyIdxUsed } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 9001805c6..54c56714c 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -137,7 +137,7 @@ var ( type DownTrackState struct { RTPStats *buffer.RTPStatsSender DeltaStatsSenderSnapshotId uint32 - ForwarderState ForwarderState + ForwarderState *livekit.RTPForwarderState } func (d DownTrackState) String() string { @@ -264,6 +264,7 @@ type DownTrack struct { connected atomic.Bool bindAndConnectedOnce atomic.Bool writable atomic.Bool + writeStopped atomic.Bool rtpStats *buffer.RTPStatsSender @@ -1146,20 +1147,30 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer { } func (d *DownTrack) GetState() DownTrackState { - dts := DownTrackState{ + return DownTrackState{ RTPStats: d.rtpStats, DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId, ForwarderState: d.forwarder.GetState(), } - return dts } func (d *DownTrack) SeedState(state DownTrackState) { - d.rtpStats.Seed(state.RTPStats) - d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId + if state.RTPStats != nil { + d.rtpStats.Seed(state.RTPStats) + d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId + } d.forwarder.SeedState(state.ForwarderState) } +func (d *DownTrack) StopWriteAndGetState() DownTrackState { + d.bindLock.Lock() + d.writable.Store(false) + d.writeStopped.Store(true) + d.bindLock.Unlock() + + return d.GetState() +} + func (d *DownTrack) UpTrackLayersChange() { if sal := d.getStreamAllocatorListener(); sal != nil { sal.OnAvailableLayersChanged(d) @@ -1984,6 +1995,9 @@ func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) { */ func (d *DownTrack) onBindAndConnectedChange() { + if d.writeStopped.Load() { + return + } d.writable.Store(d.connected.Load() && d.bound.Load()) if d.connected.Load() && d.bound.Load() && !d.bindAndConnectedOnce.Swap(true) { if d.activePaddingOnMuteUpTrack.Load() { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 8f0cb1af9..8aa926a49 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -27,6 +27,7 @@ import ( "github.com/pion/webrtc/v3" "go.uber.org/zap/zapcore" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -186,35 +187,6 @@ type TranslationParams struct { // ------------------------------------------------------------------- -type ForwarderState struct { - Started bool - ReferenceLayerSpatial int32 - PreStartTime time.Time - ExtFirstTS uint64 - DummyStartTSOffset uint64 - RTP RTPMungerState - Codec interface{} -} - -func (f ForwarderState) String() string { - codecString := "" - switch codecState := f.Codec.(type) { - case codecmunger.VP8State: - codecString = codecState.String() - } - return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, dummyStartTSOffset: %d, rtp: %s, codec: %s}", - f.Started, - f.ReferenceLayerSpatial, - f.PreStartTime.String(), - f.ExtFirstTS, - f.DummyStartTSOffset, - f.RTP.String(), - codecString, - ) -} - -// ------------------------------------------------------------------- - type refInfo struct { senderReport *buffer.RTCPSenderReportData tsOffset uint64 @@ -402,41 +374,48 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ } } -func (f *Forwarder) GetState() ForwarderState { +func (f *Forwarder) GetState() *livekit.RTPForwarderState { f.lock.RLock() defer f.lock.RUnlock() if !f.started { - return ForwarderState{} + return nil } - return ForwarderState{ - Started: f.started, - ReferenceLayerSpatial: f.referenceLayerSpatial, - PreStartTime: f.preStartTime, - ExtFirstTS: f.extFirstTS, - DummyStartTSOffset: f.dummyStartTSOffset, - RTP: f.rtpMunger.GetLast(), - Codec: f.codecMunger.GetState(), + state := &livekit.RTPForwarderState{ + Started: f.started, + ReferenceLayerSpatial: f.referenceLayerSpatial, + PreStartTime: f.preStartTime.UnixNano(), + ExtFirstTimestamp: f.extFirstTS, + DummyStartTimestampOffset: f.dummyStartTSOffset, + RtpMunger: f.rtpMunger.GetState(), } + + codecMungerState := f.codecMunger.GetState() + if vp8MungerState, ok := codecMungerState.(*livekit.VP8MungerState); ok { + state.CodecMunger = &livekit.RTPForwarderState_Vp8Munger{ + Vp8Munger: vp8MungerState, + } + } + return state } -func (f *Forwarder) SeedState(state ForwarderState) { - if !state.Started { +func (f *Forwarder) SeedState(state *livekit.RTPForwarderState) { + if state == nil || !state.Started { return } f.lock.Lock() defer f.lock.Unlock() - f.rtpMunger.SeedLast(state.RTP) - f.codecMunger.SeedState(state.Codec) + f.rtpMunger.SeedState(state.RtpMunger) + f.codecMunger.SeedState(state.CodecMunger) f.started = true f.referenceLayerSpatial = state.ReferenceLayerSpatial - f.preStartTime = state.PreStartTime - f.extFirstTS = state.ExtFirstTS - f.dummyStartTSOffset = state.DummyStartTSOffset + f.preStartTime = time.Unix(0, state.PreStartTime) + f.extFirstTS = state.ExtFirstTimestamp + f.dummyStartTSOffset = state.DummyStartTimestampOffset } func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool { @@ -1686,8 +1665,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // 3. extExpectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet // Ideally, extRefTS and extExpectedTS should be very close and extLastTS should be before both of those. // But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always. - rtpMungerState := f.rtpMunger.GetLast() - extLastTS := rtpMungerState.ExtLastTS + rtpMungerState := f.rtpMunger.GetState() + extLastTS := rtpMungerState.ExtLastTimestamp extExpectedTS := extLastTS extRefTS := extLastTS refTS := uint32(extRefTS) @@ -1838,7 +1817,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "extExpectedTS", extExpectedTS, "extNextTS", extNextTS, "tsJump", extNextTS-extLastTS, - "nextSN", rtpMungerState.ExtLastSN+1, + "nextSN", rtpMungerState.ExtLastSequenceNumber+1, "extIncomingSN", extPkt.ExtSequenceNumber, "incomingTS", extPkt.Packet.Timestamp, "extIncomingTS", extPkt.ExtTimestamp, @@ -1856,7 +1835,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "extExpectedTS", extExpectedTS, "extNextTS", extNextTS, "tsJump", extNextTS-extLastTS, - "nextSN", rtpMungerState.ExtLastSN+1, + "nextSN", rtpMungerState.ExtLastSequenceNumber+1, "extIncomingSN", extPkt.ExtSequenceNumber, "extIncomingTS", extPkt.ExtTimestamp, ) @@ -2052,7 +2031,7 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S numPackets++ } - extLastTS := f.rtpMunger.GetLast().ExtLastTS + extLastTS := f.rtpMunger.GetState().ExtLastTimestamp extExpectedTS := extLastTS if f.getExpectedRTPTimestamp != nil { tsExt, err := f.getExpectedRTPTimestamp(time.Now()) diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 0e8fa78ab..7eb41d776 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -15,8 +15,7 @@ package sfu import ( - "fmt" - + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -50,26 +49,6 @@ type SnTs struct { // ---------------------------------------------------------------------- -type RTPMungerState struct { - ExtLastSN uint64 - ExtSecondLastSN uint64 - ExtLastTS uint64 - ExtSecondLastTS uint64 - LastMarker bool - SecondLastMarker bool -} - -func (r RTPMungerState) String() string { - return fmt.Sprintf( - "RTPMungerState{extLastSN: %d, extSecondLastSN: %d, extLastTS: %d, extSecondLastTS: %d, lastMarker: %v, secondLastMarker: %v)", - r.ExtLastSN, r.ExtSecondLastSN, - r.ExtLastTS, r.ExtSecondLastTS, - r.LastMarker, r.SecondLastMarker, - ) -} - -// ---------------------------------------------------------------------- - type RTPMunger struct { logger logger.Logger @@ -112,14 +91,14 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} { } } -func (r *RTPMunger) GetLast() RTPMungerState { - return RTPMungerState{ - ExtLastSN: r.extLastSN, - ExtSecondLastSN: r.extSecondLastSN, - ExtLastTS: r.extLastTS, - ExtSecondLastTS: r.extSecondLastTS, - LastMarker: r.lastMarker, - SecondLastMarker: r.secondLastMarker, +func (r *RTPMunger) GetState() *livekit.RTPMungerState { + return &livekit.RTPMungerState{ + ExtLastSequenceNumber: r.extLastSN, + ExtSecondLastSequenceNumber: r.extSecondLastSN, + ExtLastTimestamp: r.extLastTS, + ExtSecondLastTimestamp: r.extSecondLastTS, + LastMarker: r.lastMarker, + SecondLastMarker: r.secondLastMarker, } } @@ -127,11 +106,11 @@ func (r *RTPMunger) GetTSOffset() uint64 { return r.tsOffset } -func (r *RTPMunger) SeedLast(state RTPMungerState) { - r.extLastSN = state.ExtLastSN - r.extSecondLastSN = state.ExtSecondLastSN - r.extLastTS = state.ExtLastTS - r.extSecondLastTS = state.ExtSecondLastTS +func (r *RTPMunger) SeedState(state *livekit.RTPMungerState) { + r.extLastSN = state.ExtLastSequenceNumber + r.extSecondLastSN = state.ExtSecondLastSequenceNumber + r.extLastTS = state.ExtLastTimestamp + r.extSecondLastTS = state.ExtSecondLastTimestamp r.lastMarker = state.LastMarker r.secondLastMarker = state.SecondLastMarker } From 13ee1aca285e316436f76d55f2b568cb9bb786ec Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 6 Aug 2024 12:45:46 +0530 Subject: [PATCH 16/17] Delay getting forwarder state till migration is complete. (#2909) --- pkg/rtc/mediatracksubscriptions.go | 8 +++- pkg/rtc/participant.go | 37 +++++++++++++++---- pkg/rtc/types/interfaces.go | 1 - .../typesfakes/fake_local_participant.go | 18 ++++----- pkg/sfu/forwarder.go | 12 +++++- 5 files changed, 54 insertions(+), 22 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index c16b4bb7f..acd85037d 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -21,6 +21,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" + "go.uber.org/atomic" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" @@ -161,6 +162,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * }) // Bind callback can happen from replaceTrack, so set it up early + var reusingTransceiver atomic.Bool var dtState sfu.DownTrackState downTrack.OnBinding(func(err error) { if err != nil { @@ -168,7 +170,9 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * return } wr.DetermineReceiver(downTrack.Codec()) - downTrack.SeedState(dtState) + if reusingTransceiver.Load() { + downTrack.SeedState(dtState) + } if err = wr.AddDownTrack(downTrack); err != nil && err != sfu.ErrReceiverClosed { sub.GetLogger().Errorw( "could not add down track", err, @@ -216,6 +220,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * "publisherID", subTrack.PublisherID(), "trackID", trackID, ) + reusingTransceiver.Store(true) rtpSender := existingTransceiver.Sender() if rtpSender != nil { // replaced track will bind immediately without negotiation, SetTransceiver first before bind @@ -246,6 +251,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * existingTransceiver.Stop() } } + reusingTransceiver.Store(false) // if cannot replace, find an unused transceiver or add new one if transceiver == nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 0bf290ceb..853936955 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -129,6 +129,7 @@ type ParticipantParams struct { TURNSEnabled bool GetParticipantInfo func(pID livekit.ParticipantID) *livekit.ParticipantInfo GetRegionSettings func(ip string) *livekit.RegionSettings + GetSubscriberForwarderState func(p types.LocalParticipant) (map[livekit.TrackID]*livekit.RTPForwarderState, error) DisableSupervisor bool ReconnectOnPublicationError bool ReconnectOnSubscriptionError bool @@ -231,6 +232,7 @@ type ParticipantImpl struct { onICEConfigChanged func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) cachedDownTracks map[livekit.TrackID]*downTrackState + forwarderState map[livekit.TrackID]*livekit.RTPForwarderState supervisor *supervisor.ParticipantSupervisor @@ -862,7 +864,6 @@ func (p *ParticipantImpl) SetMigrateInfo( previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo, - forwarderStates map[livekit.TrackID]*livekit.RTPForwarderState, ) { p.pendingTracksLock.Lock() for _, t := range mediaTracks { @@ -883,10 +884,6 @@ func (p *ParticipantImpl) SetMigrateInfo( } p.TransportManager.SetMigrateInfo(previousOffer, previousAnswer, dataChannels) - - for trackID, fs := range forwarderStates { - p.CacheDownTrack(trackID, nil, sfu.DownTrackState{ForwarderState: fs}) - } } func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseReason, isExpectedToResume bool) error { @@ -1056,6 +1053,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) { case types.MigrateStateComplete: p.TransportManager.ProcessPendingPublisherDataChannels() + p.cacheForwarderState() } if onMigrateStateChange := p.getOnMigrateStateChange(); onMigrateStateChange != nil { @@ -1214,7 +1212,9 @@ func (p *ParticipantImpl) onTrackSubscribed(subTrack types.SubscribedTrack) { return } if p.TransportManager.HasSubscriberEverConnected() { - subTrack.DownTrack().SetConnected() + dt := subTrack.DownTrack() + dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(subTrack.ID())}) + dt.SetConnected() } p.TransportManager.AddSubscribedTrack(subTrack) }) @@ -1641,7 +1641,7 @@ func (p *ParticipantImpl) onPublisherInitialConnected() { func (p *ParticipantImpl) onSubscriberInitialConnected() { go p.subscriberRTCPWorker() - p.setDowntracksConnected() + p.setDownTracksConnected() } func (p *ParticipantImpl) onPrimaryTransportInitialConnected() { @@ -2453,14 +2453,35 @@ func (p *ParticipantImpl) postRtcp(pkts []rtcp.Packet) { }, postRtcpOp{p, pkts}) } -func (p *ParticipantImpl) setDowntracksConnected() { +func (p *ParticipantImpl) setDownTracksConnected() { for _, t := range p.SubscriptionManager.GetSubscribedTracks() { if dt := t.DownTrack(); dt != nil { + dt.SeedState(sfu.DownTrackState{ForwarderState: p.getAndDeleteForwarderState(t.ID())}) dt.SetConnected() } } } +func (p *ParticipantImpl) cacheForwarderState() { + // if migrating in, get forwarder state from migrating out node to facilitate resume + if f := p.params.GetSubscriberForwarderState; f != nil { + if fs, err := f(p); err == nil { + p.lock.Lock() + p.forwarderState = fs + p.lock.Unlock() + } + } +} + +func (p *ParticipantImpl) getAndDeleteForwarderState(trackID livekit.TrackID) *livekit.RTPForwarderState { + p.lock.Lock() + fs := p.forwarderState[trackID] + delete(p.forwarderState, trackID) + p.lock.Unlock() + + return fs +} + func (p *ParticipantImpl) CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, downTrack sfu.DownTrackState) { p.lock.Lock() if existing := p.cachedDownTracks[trackID]; existing != nil && existing.transceiver != rtpTransceiver { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 593c47e96..e3d25a59d 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -417,7 +417,6 @@ type LocalParticipant interface { previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo, - forwarderStates map[livekit.TrackID]*livekit.RTPForwarderState, ) UpdateMediaRTT(rtt uint32) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 918a5f6a0..79f22b445 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -782,14 +782,13 @@ type FakeLocalParticipant struct { setMetadataArgsForCall []struct { arg1 string } - SetMigrateInfoStub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo, map[livekit.TrackID]*livekit.RTPForwarderState) + SetMigrateInfoStub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) setMigrateInfoMutex sync.RWMutex setMigrateInfoArgsForCall []struct { arg1 *webrtc.SessionDescription arg2 *webrtc.SessionDescription arg3 []*livekit.TrackPublishedResponse arg4 []*livekit.DataChannelInfo - arg5 map[livekit.TrackID]*livekit.RTPForwarderState } SetMigrateStateStub func(types.MigrateState) setMigrateStateMutex sync.RWMutex @@ -5204,7 +5203,7 @@ func (fake *FakeLocalParticipant) SetMetadataArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription, arg2 *webrtc.SessionDescription, arg3 []*livekit.TrackPublishedResponse, arg4 []*livekit.DataChannelInfo, arg5 map[livekit.TrackID]*livekit.RTPForwarderState) { +func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription, arg2 *webrtc.SessionDescription, arg3 []*livekit.TrackPublishedResponse, arg4 []*livekit.DataChannelInfo) { var arg3Copy []*livekit.TrackPublishedResponse if arg3 != nil { arg3Copy = make([]*livekit.TrackPublishedResponse, len(arg3)) @@ -5221,13 +5220,12 @@ func (fake *FakeLocalParticipant) SetMigrateInfo(arg1 *webrtc.SessionDescription arg2 *webrtc.SessionDescription arg3 []*livekit.TrackPublishedResponse arg4 []*livekit.DataChannelInfo - arg5 map[livekit.TrackID]*livekit.RTPForwarderState - }{arg1, arg2, arg3Copy, arg4Copy, arg5}) + }{arg1, arg2, arg3Copy, arg4Copy}) stub := fake.SetMigrateInfoStub - fake.recordInvocation("SetMigrateInfo", []interface{}{arg1, arg2, arg3Copy, arg4Copy, arg5}) + fake.recordInvocation("SetMigrateInfo", []interface{}{arg1, arg2, arg3Copy, arg4Copy}) fake.setMigrateInfoMutex.Unlock() if stub != nil { - fake.SetMigrateInfoStub(arg1, arg2, arg3, arg4, arg5) + fake.SetMigrateInfoStub(arg1, arg2, arg3, arg4) } } @@ -5237,17 +5235,17 @@ func (fake *FakeLocalParticipant) SetMigrateInfoCallCount() int { return len(fake.setMigrateInfoArgsForCall) } -func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo, map[livekit.TrackID]*livekit.RTPForwarderState)) { +func (fake *FakeLocalParticipant) SetMigrateInfoCalls(stub func(*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo)) { fake.setMigrateInfoMutex.Lock() defer fake.setMigrateInfoMutex.Unlock() fake.SetMigrateInfoStub = stub } -func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) (*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo, map[livekit.TrackID]*livekit.RTPForwarderState) { +func (fake *FakeLocalParticipant) SetMigrateInfoArgsForCall(i int) (*webrtc.SessionDescription, *webrtc.SessionDescription, []*livekit.TrackPublishedResponse, []*livekit.DataChannelInfo) { fake.setMigrateInfoMutex.RLock() defer fake.setMigrateInfoMutex.RUnlock() argsForCall := fake.setMigrateInfoArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } func (fake *FakeLocalParticipant) SetMigrateState(arg1 types.MigrateState) { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 8aa926a49..7f7f06d0f 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -385,11 +385,13 @@ func (f *Forwarder) GetState() *livekit.RTPForwarderState { state := &livekit.RTPForwarderState{ Started: f.started, ReferenceLayerSpatial: f.referenceLayerSpatial, - PreStartTime: f.preStartTime.UnixNano(), ExtFirstTimestamp: f.extFirstTS, DummyStartTimestampOffset: f.dummyStartTSOffset, RtpMunger: f.rtpMunger.GetState(), } + if !f.preStartTime.IsZero() { + state.PreStartTime = f.preStartTime.UnixNano() + } codecMungerState := f.codecMunger.GetState() if vp8MungerState, ok := codecMungerState.(*livekit.VP8MungerState); ok { @@ -413,7 +415,9 @@ func (f *Forwarder) SeedState(state *livekit.RTPForwarderState) { f.started = true f.referenceLayerSpatial = state.ReferenceLayerSpatial - f.preStartTime = time.Unix(0, state.PreStartTime) + if state.PreStartTime != 0 { + f.preStartTime = time.Unix(0, state.PreStartTime) + } f.extFirstTS = state.ExtFirstTimestamp f.dummyStartTSOffset = state.DummyStartTimestampOffset } @@ -1633,12 +1637,14 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.logger.Debugw( message, "layer", layer, + "referenceLayerSpatial", f.referenceLayerSpatial, "extExpectedTS", extExpectedTS, "incomingTS", extPkt.Packet.Timestamp, "extIncomingTS", extPkt.ExtTimestamp, "extRefTS", extRefTS, "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), + "refInfos", wrappedRefInfoLogger{f}, ) } // TODO-REMOVE-AFTER-DATA-COLLECTION @@ -1646,12 +1652,14 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.logger.Infow( message, "layer", layer, + "referenceLayerSpatial", f.referenceLayerSpatial, "extExpectedTS", extExpectedTS, "incomingTS", extPkt.Packet.Timestamp, "extIncomingTS", extPkt.ExtTimestamp, "extRefTS", extRefTS, "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), + "refInfos", wrappedRefInfoLogger{f}, ) } From 01100650f62a702f977758ac26aab79aacb8bd62 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 6 Aug 2024 14:30:08 +0530 Subject: [PATCH 17/17] Clean up packet checks. (#2910) Still leaving the utility `ValidateRTPPacket` in helpers as it could be useful. --- pkg/sfu/buffer/buffer.go | 49 ---------------------------------------- pkg/sfu/receiver.go | 16 ------------- 2 files changed, 65 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 1e38ef8f1..505ddd694 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -17,7 +17,6 @@ package buffer import ( "encoding/binary" "errors" - "fmt" "io" "strings" "sync" @@ -321,28 +320,6 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { return } - if err = utils.ValidateRTPPacket(&rtpPacket, b.payloadType, b.mediaSSRC); err != nil { - invalidPacketCount := b.invalidPacketCount.Inc() - if (invalidPacketCount-1)%100 == 0 { - b.logger.Warnw( - "validating RTP packet failed", err, - "version", rtpPacket.Version, - "padding", rtpPacket.Padding, - "marker", rtpPacket.Marker, - "expectedPayloadType", b.payloadType, - "payloadType", rtpPacket.PayloadType, - "sequenceNumber", rtpPacket.SequenceNumber, - "timestamp", rtpPacket.Timestamp, - "expectedSSRC", b.mediaSSRC, - "ssrc", rtpPacket.SSRC, - "numExtensions", len(rtpPacket.Extensions), - "payloadSize", len(rtpPacket.Payload), - "rtpStats", b.rtpStats, - "snRangeMap", b.snRangeMap, - ) - } - } - now := time.Now().UnixNano() if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() { if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil { @@ -697,32 +674,6 @@ func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket { b.logger.Warnw("unexpected marshal size", nil, "max", n, "need", payloadEnd) return nil } - // TODO-REMOVE-AFTER-DEBUG START - if payloadEnd != n { - paddingEnd := payloadStart + int(ep.Packet.PaddingSize) - if paddingEnd != n { - b.logger.Warnw("unexpected marshal size", nil, "max", n, "payloadEnd", payloadEnd, "paddingEnd", paddingEnd) - } - } - // check a few fields for validity - checkVersion := (buf[0] & 0xc0) >> 6 - checkPayloadType := buf[1] & 0x7f - checkSequenceNumber := binary.BigEndian.Uint16(buf[2:]) - checkSSRC := binary.BigEndian.Uint32(buf[8:]) - if checkVersion != pkt.Version || checkPayloadType != pkt.PayloadType || checkSequenceNumber != pkt.SequenceNumber || checkSSRC != pkt.SSRC { - b.logger.Warnw( - "rtp packet mismatch", nil, - "version", fmt.Sprintf("%d != %d", checkVersion, pkt.Version), - "payloadType", fmt.Sprintf("%d != %d", checkPayloadType, pkt.PayloadType), - "sequenceNumber", fmt.Sprintf("%d != %d", checkSequenceNumber, pkt.SequenceNumber), - "SSRC", fmt.Sprintf("%d != %d", checkSSRC, pkt.SSRC), - "bytes", buf[0:16], - "len", n, - "headerSize", payloadStart, - "payloadSize", payloadEnd-payloadStart, - ) - } - // TODO-REMOVE-AFTER-DEBUG END pkt.Payload = buf[payloadStart:payloadEnd] ep.Packet = &pkt diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index cb55b19c4..beb2fb06b 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -712,22 +712,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { spatialTracker = w.streamTrackerManager.AddTracker(pkt.Spatial) } } - if spatialLayer > buffer.DefaultMaxLayerSpatial { // TODO-REMOVE-AFTER-DEBUG - w.logger.Warnw( - "invalid spatial layer", nil, - "mime", w.codec.MimeType, - "layer", layer, - "spatialLayer", spatialLayer, - "sn", pkt.Packet.SequenceNumber, - "esn", pkt.ExtSequenceNumber, - "timestamp", pkt.Packet.Timestamp, - "ets", pkt.ExtTimestamp, - "payloadSize", len(pkt.Packet.Payload), - "rtpVersion", pkt.Packet.Version, - "payloadType", pkt.Packet.PayloadType, - "ssrc", pkt.Packet.SSRC, - ) - } writeCount := w.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.WriteRTP(pkt, spatialLayer)