From ca4b56d2d568f179557999465c5bb1fa44d0bb61 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 12 Dec 2025 00:29:15 +0530 Subject: [PATCH] Handle case of sequence number jump just after start. (#4150) It is possible that the stream stops just after start and restarts much later introducing a large gap in sequence number. That could look like an unhandled case because the wrap back handler does not have enough packets yet. Let other checks based on time stamp gap take effect and only if that also leaves the sequence number unhandled, drop the packet. --- pkg/service/wire_gen.go | 14 +++++++------- pkg/sfu/buffer/buffer.go | 3 ++- pkg/sfu/rtpstats/rtpstats_receiver.go | 12 +++++++----- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 22b1e0c1c..b33cc1744 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - v, err := rpc.NewTypedRoomClient(clientParams) + roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - v2, err := rpc.NewTypedParticipantClient(clientParams) + participantClient, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } - v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) + whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) if err != nil { return nil, err } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index ddb74c093..7c3fb7c8f 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -1409,12 +1409,13 @@ func (b *Buffer) seedKeyFrame(keyFrameSeederGeneration int32) { for { if b.closed.Load() || b.keyFrameSeederGeneration.Load() != keyFrameSeederGeneration { + b.logger.Debugw("stopping key frame seeder: stopped") return } select { case <-timer.C: - b.logger.Infow("stopping key frame seeder: timeout") + b.logger.Debugw("stopping key frame seeder: timeout") return case <-ticker.C: diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index 778e5cd93..14fab8026 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -199,10 +199,6 @@ func (r *RTPStatsReceiver) Update( ) } else { resSN = r.sequenceNumber.Update(sequenceNumber) - if resSN.IsUnhandled { - flowState.IsNotHandled = true - return - } gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) timeSinceHighest = packetTime - r.highestTime @@ -249,7 +245,7 @@ func (r *RTPStatsReceiver) Update( } // it is possible that sequence number has rolled over too - if gapSN < 0 && gapTS > 0 && payloadSize > 0 { + if (gapSN < 0 || gapSN > (1<<15)) && gapTS > 0 && payloadSize > 0 { // not possible to know how many cycles of sequence number roll over could have happened, // ensure that it at least does not go backwards snRolloverCount = 0 @@ -264,6 +260,12 @@ func (r *RTPStatsReceiver) Update( logger().Warnw("forcing sequence number rollover", nil) } + + if resSN.IsUnhandled { + flowState.IsNotHandled = true + logger().Warnw("dropping packet, cannot find sequence", nil) + return + } } gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)