diff --git a/go.mod b/go.mod index c223136fe..82a4aad42 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,6 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.1 - golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90 golang.org/x/mod v0.34.0 golang.org/x/sync v0.20.0 google.golang.org/protobuf v1.36.11 @@ -84,6 +83,7 @@ require ( go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90 // indirect golang.org/x/time v0.15.0 // indirect ) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 94e66fac0..186edf578 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -2021,8 +2021,11 @@ func (l participantTelemetryListener) OnTrackUnsubscribed(pID livekit.Participan l.room.telemetry.TrackUnsubscribed(context.Background(), l.room.ID(), l.room.Name(), pID, ti, shouldSendEvent) } -func (l participantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, ti livekit.TrackID, err error, isUserError bool) { - l.room.telemetry.TrackSubscribeFailed(context.Background(), l.room.ID(), l.room.Name(), pID, ti, err, isUserError) +func (l participantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) { + l.room.telemetry.TrackSubscribeFailed(context.Background(), l.room.ID(), l.room.Name(), pID, trackID, err, isUserError) +} + +func (l participantTelemetryListener) OnTrackSubscribeStreamStarted(pID livekit.ParticipantID, ti *livekit.TrackInfo) { } func (l participantTelemetryListener) OnTrackMuted(pID livekit.ParticipantID, ti *livekit.TrackInfo) { diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 9f6c6ebe1..feea229ff 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -480,3 +480,7 @@ func (t *SubscribedTrack) OnDownTrackClose(isExpectedToResume bool) { } t.Close(isExpectedToResume) } + +func (t *SubscribedTrack) OnStreamStarted() { + t.params.TelemetryListener.OnTrackSubscribeStreamStarted(t.params.Subscriber.ID(), t.params.MediaTrack.ToProto()) +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index cbb25346f..7da652eef 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -650,7 +650,8 @@ type ParticipantTelemetryListener interface { OnTrackSubscribeRequested(pID livekit.ParticipantID, ti *livekit.TrackInfo) OnTrackSubscribed(pID livekit.ParticipantID, ti *livekit.TrackInfo, publisherInfo *livekit.ParticipantInfo, shouldSendEvent bool) OnTrackUnsubscribed(pID livekit.ParticipantID, ti *livekit.TrackInfo, shouldSendEvent bool) - OnTrackSubscribeFailed(pID livekit.ParticipantID, ti livekit.TrackID, err error, isUserError bool) + OnTrackSubscribeFailed(pID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) + OnTrackSubscribeStreamStarted(pID livekit.ParticipantID, ti *livekit.TrackInfo) OnTrackMuted(pID livekit.ParticipantID, ti *livekit.TrackInfo) OnTrackUnmuted(pID livekit.ParticipantID, ti *livekit.TrackInfo) OnTrackPublishedUpdate(pID livekit.ParticipantID, ti *livekit.TrackInfo) @@ -677,7 +678,9 @@ func (NullParticipantTelemetryListener) OnTrackSubscribed(pID livekit.Participan } func (NullParticipantTelemetryListener) OnTrackUnsubscribed(pID livekit.ParticipantID, ti *livekit.TrackInfo, shouldSendEvent bool) { } -func (NullParticipantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, ti livekit.TrackID, err error, isUserError bool) { +func (NullParticipantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) { +} +func (NullParticipantTelemetryListener) OnTrackSubscribeStreamStarted(pID livekit.ParticipantID, ti *livekit.TrackInfo) { } func (NullParticipantTelemetryListener) OnTrackMuted(pID livekit.ParticipantID, ti *livekit.TrackInfo) { } diff --git a/pkg/rtc/types/typesfakes/fake_participant_telemetry_listener.go b/pkg/rtc/types/typesfakes/fake_participant_telemetry_listener.go index dd00c817a..df4414eae 100644 --- a/pkg/rtc/types/typesfakes/fake_participant_telemetry_listener.go +++ b/pkg/rtc/types/typesfakes/fake_participant_telemetry_listener.go @@ -83,6 +83,12 @@ type FakeParticipantTelemetryListener struct { arg1 livekit.ParticipantID arg2 *livekit.TrackInfo } + OnTrackSubscribeStreamStartedStub func(livekit.ParticipantID, *livekit.TrackInfo) + onTrackSubscribeStreamStartedMutex sync.RWMutex + onTrackSubscribeStreamStartedArgsForCall []struct { + arg1 livekit.ParticipantID + arg2 *livekit.TrackInfo + } OnTrackSubscribedStub func(livekit.ParticipantID, *livekit.TrackInfo, *livekit.ParticipantInfo, bool) onTrackSubscribedMutex sync.RWMutex onTrackSubscribedArgsForCall []struct { @@ -458,6 +464,39 @@ func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeRequestedArgsForCa return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStarted(arg1 livekit.ParticipantID, arg2 *livekit.TrackInfo) { + fake.onTrackSubscribeStreamStartedMutex.Lock() + fake.onTrackSubscribeStreamStartedArgsForCall = append(fake.onTrackSubscribeStreamStartedArgsForCall, struct { + arg1 livekit.ParticipantID + arg2 *livekit.TrackInfo + }{arg1, arg2}) + stub := fake.OnTrackSubscribeStreamStartedStub + fake.recordInvocation("OnTrackSubscribeStreamStarted", []interface{}{arg1, arg2}) + fake.onTrackSubscribeStreamStartedMutex.Unlock() + if stub != nil { + fake.OnTrackSubscribeStreamStartedStub(arg1, arg2) + } +} + +func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStartedCallCount() int { + fake.onTrackSubscribeStreamStartedMutex.RLock() + defer fake.onTrackSubscribeStreamStartedMutex.RUnlock() + return len(fake.onTrackSubscribeStreamStartedArgsForCall) +} + +func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStartedCalls(stub func(livekit.ParticipantID, *livekit.TrackInfo)) { + fake.onTrackSubscribeStreamStartedMutex.Lock() + defer fake.onTrackSubscribeStreamStartedMutex.Unlock() + fake.OnTrackSubscribeStreamStartedStub = stub +} + +func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStartedArgsForCall(i int) (livekit.ParticipantID, *livekit.TrackInfo) { + fake.onTrackSubscribeStreamStartedMutex.RLock() + defer fake.onTrackSubscribeStreamStartedMutex.RUnlock() + argsForCall := fake.onTrackSubscribeStreamStartedArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeParticipantTelemetryListener) OnTrackSubscribed(arg1 livekit.ParticipantID, arg2 *livekit.TrackInfo, arg3 *livekit.ParticipantInfo, arg4 bool) { fake.onTrackSubscribedMutex.Lock() fake.onTrackSubscribedArgsForCall = append(fake.onTrackSubscribedArgsForCall, struct { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index a238b8d11..e45ac0b88 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -262,6 +262,7 @@ type DownTrackListener interface { OnRttUpdate(rtt uint32) OnCodecNegotiated(webrtc.RTPCodecCapability) OnDownTrackClose(isExpectedToResume bool) + OnStreamStarted() } // ------------------------------------------------------------------- @@ -1212,6 +1213,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { sal.OnResume(d) } } + + if tp.isStarting { + d.params.Listener.OnStreamStarted() + } return 1 } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 5779945ad..927b9075d 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -185,6 +185,7 @@ func (v *VideoTransition) MarshalLogObject(e zapcore.ObjectEncoder) error { type TranslationParams struct { shouldDrop bool + isStarting bool isResuming bool isSwitching bool rtp TranslationParamsRTP @@ -1741,10 +1742,11 @@ func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int return ts + offset, nil } -func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) error { +func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (bool, error) { + starting := false if !f.started { if extPkt.IsOutOfOrder { - return errSkipStartOnOutOfOrderPacket + return starting, errSkipStartOnOutOfOrderPacket } f.started = true @@ -1760,10 +1762,11 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "layer", layer, "referenceLayerSpatial", f.referenceLayerSpatial, ) - return nil + starting = true + return starting, nil } else if f.referenceLayerSpatial == buffer.InvalidLayerSpatial { if extPkt.IsOutOfOrder { - return errSkipStartOnOutOfOrderPacket + return starting, errSkipStartOnOutOfOrderPacket } f.referenceLayerSpatial = layer @@ -1777,6 +1780,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "layer", layer, "referenceLayerSpatial", f.referenceLayerSpatial, ) + + starting = true } logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) { @@ -1827,7 +1832,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "layer", layer, "error", err, ) - return err + return starting, err } } @@ -1925,7 +1930,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) - return errSwitchPointTooFarBehind + return starting, errSwitchPointTooFarBehind } // use a nominal increase to ensure that timestamp is always moving forward @@ -1966,13 +1971,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, extNextTS-extLastTS) f.codecMunger.UpdateOffsets(extPkt) - return nil + return starting, nil } // should be called with lock held -func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) error { +func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) (bool, error) { + var ( + starting bool + err error + ) if f.lastSSRC != extPkt.Packet.SSRC { - if err := f.processSourceSwitch(extPkt, layer); err != nil { + if starting, err = f.processSourceSwitch(extPkt, layer); err != nil { f.logger.Debugw( "could not switch feed", "error", err, @@ -1986,7 +1995,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i ) tp.shouldDrop = true f.vls.Rollback() - return nil + return starting, nil } f.logger.Debugw( "switching feed", @@ -2010,28 +2019,29 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i if err != nil { tp.shouldDrop = true if err == errPaddingOnlyPacket || err == errDuplicatePacket || err == errOutOfOrderSequenceNumberCacheMiss { - return nil + return starting, nil } - return err + return starting, err } tp.rtp = tpRTP if len(extPkt.Packet.Payload) > 0 { - return f.translateCodecHeader(extPkt, tp) + return starting, f.translateCodecHeader(extPkt, tp) } - return nil + return starting, nil } // should be called with lock held func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket, layer int32) (TranslationParams, error) { tp := TranslationParams{} - if err := f.getTranslationParamsCommon(extPkt, layer, &tp); err != nil { + starting, err := f.getTranslationParamsCommon(extPkt, layer, &tp) + tp.isStarting = starting + if err != nil { tp.shouldDrop = true - return tp, err } - return tp, nil + return tp, err } // should be called with lock held @@ -2079,7 +2089,8 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in tp.ddBytes = result.DependencyDescriptorExtension tp.marker = result.RTPMarker - err := f.getTranslationParamsCommon(extPkt, layer, &tp) + starting, err := f.getTranslationParamsCommon(extPkt, layer, &tp) + tp.isStarting = starting if tp.shouldDrop { return tp, err } diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 5c110f642..7f080c646 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1316,6 +1316,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { // should lock onto the first in-order packet expectedTP = TranslationParams{ + isStarting: true, rtp: TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, extSequenceNumber: 23333, @@ -1583,6 +1584,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { } marshalledVP8, _ := expectedVP8.Marshal() expectedTP = TranslationParams{ + isStarting: true, isSwitching: true, isResuming: true, rtp: TranslationParamsRTP{