Add subscriber stream start event notification (#4449)

This commit is contained in:
Raja Subramanian
2026-04-14 22:08:31 +05:30
committed by GitHub
parent ce1bf47b5c
commit 6c81f67858
8 changed files with 90 additions and 23 deletions
+1 -1
View File
@@ -53,7 +53,6 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.1
golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90
golang.org/x/mod v0.34.0
golang.org/x/sync v0.20.0
google.golang.org/protobuf v1.36.11
@@ -84,6 +83,7 @@ require (
go.opentelemetry.io/otel/trace v1.42.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/exp v0.0.0-20260312153236-7ab1446f8b90 // indirect
golang.org/x/time v0.15.0 // indirect
)
+5 -2
View File
@@ -2021,8 +2021,11 @@ func (l participantTelemetryListener) OnTrackUnsubscribed(pID livekit.Participan
l.room.telemetry.TrackUnsubscribed(context.Background(), l.room.ID(), l.room.Name(), pID, ti, shouldSendEvent)
}
func (l participantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, ti livekit.TrackID, err error, isUserError bool) {
l.room.telemetry.TrackSubscribeFailed(context.Background(), l.room.ID(), l.room.Name(), pID, ti, err, isUserError)
func (l participantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) {
l.room.telemetry.TrackSubscribeFailed(context.Background(), l.room.ID(), l.room.Name(), pID, trackID, err, isUserError)
}
func (l participantTelemetryListener) OnTrackSubscribeStreamStarted(pID livekit.ParticipantID, ti *livekit.TrackInfo) {
}
func (l participantTelemetryListener) OnTrackMuted(pID livekit.ParticipantID, ti *livekit.TrackInfo) {
+4
View File
@@ -480,3 +480,7 @@ func (t *SubscribedTrack) OnDownTrackClose(isExpectedToResume bool) {
}
t.Close(isExpectedToResume)
}
func (t *SubscribedTrack) OnStreamStarted() {
t.params.TelemetryListener.OnTrackSubscribeStreamStarted(t.params.Subscriber.ID(), t.params.MediaTrack.ToProto())
}
+5 -2
View File
@@ -650,7 +650,8 @@ type ParticipantTelemetryListener interface {
OnTrackSubscribeRequested(pID livekit.ParticipantID, ti *livekit.TrackInfo)
OnTrackSubscribed(pID livekit.ParticipantID, ti *livekit.TrackInfo, publisherInfo *livekit.ParticipantInfo, shouldSendEvent bool)
OnTrackUnsubscribed(pID livekit.ParticipantID, ti *livekit.TrackInfo, shouldSendEvent bool)
OnTrackSubscribeFailed(pID livekit.ParticipantID, ti livekit.TrackID, err error, isUserError bool)
OnTrackSubscribeFailed(pID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool)
OnTrackSubscribeStreamStarted(pID livekit.ParticipantID, ti *livekit.TrackInfo)
OnTrackMuted(pID livekit.ParticipantID, ti *livekit.TrackInfo)
OnTrackUnmuted(pID livekit.ParticipantID, ti *livekit.TrackInfo)
OnTrackPublishedUpdate(pID livekit.ParticipantID, ti *livekit.TrackInfo)
@@ -677,7 +678,9 @@ func (NullParticipantTelemetryListener) OnTrackSubscribed(pID livekit.Participan
}
func (NullParticipantTelemetryListener) OnTrackUnsubscribed(pID livekit.ParticipantID, ti *livekit.TrackInfo, shouldSendEvent bool) {
}
func (NullParticipantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, ti livekit.TrackID, err error, isUserError bool) {
func (NullParticipantTelemetryListener) OnTrackSubscribeFailed(pID livekit.ParticipantID, trackID livekit.TrackID, err error, isUserError bool) {
}
func (NullParticipantTelemetryListener) OnTrackSubscribeStreamStarted(pID livekit.ParticipantID, ti *livekit.TrackInfo) {
}
func (NullParticipantTelemetryListener) OnTrackMuted(pID livekit.ParticipantID, ti *livekit.TrackInfo) {
}
@@ -83,6 +83,12 @@ type FakeParticipantTelemetryListener struct {
arg1 livekit.ParticipantID
arg2 *livekit.TrackInfo
}
OnTrackSubscribeStreamStartedStub func(livekit.ParticipantID, *livekit.TrackInfo)
onTrackSubscribeStreamStartedMutex sync.RWMutex
onTrackSubscribeStreamStartedArgsForCall []struct {
arg1 livekit.ParticipantID
arg2 *livekit.TrackInfo
}
OnTrackSubscribedStub func(livekit.ParticipantID, *livekit.TrackInfo, *livekit.ParticipantInfo, bool)
onTrackSubscribedMutex sync.RWMutex
onTrackSubscribedArgsForCall []struct {
@@ -458,6 +464,39 @@ func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeRequestedArgsForCa
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStarted(arg1 livekit.ParticipantID, arg2 *livekit.TrackInfo) {
fake.onTrackSubscribeStreamStartedMutex.Lock()
fake.onTrackSubscribeStreamStartedArgsForCall = append(fake.onTrackSubscribeStreamStartedArgsForCall, struct {
arg1 livekit.ParticipantID
arg2 *livekit.TrackInfo
}{arg1, arg2})
stub := fake.OnTrackSubscribeStreamStartedStub
fake.recordInvocation("OnTrackSubscribeStreamStarted", []interface{}{arg1, arg2})
fake.onTrackSubscribeStreamStartedMutex.Unlock()
if stub != nil {
fake.OnTrackSubscribeStreamStartedStub(arg1, arg2)
}
}
func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStartedCallCount() int {
fake.onTrackSubscribeStreamStartedMutex.RLock()
defer fake.onTrackSubscribeStreamStartedMutex.RUnlock()
return len(fake.onTrackSubscribeStreamStartedArgsForCall)
}
func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStartedCalls(stub func(livekit.ParticipantID, *livekit.TrackInfo)) {
fake.onTrackSubscribeStreamStartedMutex.Lock()
defer fake.onTrackSubscribeStreamStartedMutex.Unlock()
fake.OnTrackSubscribeStreamStartedStub = stub
}
func (fake *FakeParticipantTelemetryListener) OnTrackSubscribeStreamStartedArgsForCall(i int) (livekit.ParticipantID, *livekit.TrackInfo) {
fake.onTrackSubscribeStreamStartedMutex.RLock()
defer fake.onTrackSubscribeStreamStartedMutex.RUnlock()
argsForCall := fake.onTrackSubscribeStreamStartedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeParticipantTelemetryListener) OnTrackSubscribed(arg1 livekit.ParticipantID, arg2 *livekit.TrackInfo, arg3 *livekit.ParticipantInfo, arg4 bool) {
fake.onTrackSubscribedMutex.Lock()
fake.onTrackSubscribedArgsForCall = append(fake.onTrackSubscribedArgsForCall, struct {
+5
View File
@@ -262,6 +262,7 @@ type DownTrackListener interface {
OnRttUpdate(rtt uint32)
OnCodecNegotiated(webrtc.RTPCodecCapability)
OnDownTrackClose(isExpectedToResume bool)
OnStreamStarted()
}
// -------------------------------------------------------------------
@@ -1212,6 +1213,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 {
sal.OnResume(d)
}
}
if tp.isStarting {
d.params.Listener.OnStreamStarted()
}
return 1
}
+29 -18
View File
@@ -185,6 +185,7 @@ func (v *VideoTransition) MarshalLogObject(e zapcore.ObjectEncoder) error {
type TranslationParams struct {
shouldDrop bool
isStarting bool
isResuming bool
isSwitching bool
rtp TranslationParamsRTP
@@ -1741,10 +1742,11 @@ func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int
return ts + offset, nil
}
func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) error {
func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (bool, error) {
starting := false
if !f.started {
if extPkt.IsOutOfOrder {
return errSkipStartOnOutOfOrderPacket
return starting, errSkipStartOnOutOfOrderPacket
}
f.started = true
@@ -1760,10 +1762,11 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"layer", layer,
"referenceLayerSpatial", f.referenceLayerSpatial,
)
return nil
starting = true
return starting, nil
} else if f.referenceLayerSpatial == buffer.InvalidLayerSpatial {
if extPkt.IsOutOfOrder {
return errSkipStartOnOutOfOrderPacket
return starting, errSkipStartOnOutOfOrderPacket
}
f.referenceLayerSpatial = layer
@@ -1777,6 +1780,8 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"layer", layer,
"referenceLayerSpatial", f.referenceLayerSpatial,
)
starting = true
}
logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) {
@@ -1827,7 +1832,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"layer", layer,
"error", err,
)
return err
return starting, err
}
}
@@ -1925,7 +1930,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition).
logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
return errSwitchPointTooFarBehind
return starting, errSwitchPointTooFarBehind
}
// use a nominal increase to ensure that timestamp is always moving forward
@@ -1966,13 +1971,17 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, extNextTS-extLastTS)
f.codecMunger.UpdateOffsets(extPkt)
return nil
return starting, nil
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) error {
func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) (bool, error) {
var (
starting bool
err error
)
if f.lastSSRC != extPkt.Packet.SSRC {
if err := f.processSourceSwitch(extPkt, layer); err != nil {
if starting, err = f.processSourceSwitch(extPkt, layer); err != nil {
f.logger.Debugw(
"could not switch feed",
"error", err,
@@ -1986,7 +1995,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
)
tp.shouldDrop = true
f.vls.Rollback()
return nil
return starting, nil
}
f.logger.Debugw(
"switching feed",
@@ -2010,28 +2019,29 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
if err != nil {
tp.shouldDrop = true
if err == errPaddingOnlyPacket || err == errDuplicatePacket || err == errOutOfOrderSequenceNumberCacheMiss {
return nil
return starting, nil
}
return err
return starting, err
}
tp.rtp = tpRTP
if len(extPkt.Packet.Payload) > 0 {
return f.translateCodecHeader(extPkt, tp)
return starting, f.translateCodecHeader(extPkt, tp)
}
return nil
return starting, nil
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket, layer int32) (TranslationParams, error) {
tp := TranslationParams{}
if err := f.getTranslationParamsCommon(extPkt, layer, &tp); err != nil {
starting, err := f.getTranslationParamsCommon(extPkt, layer, &tp)
tp.isStarting = starting
if err != nil {
tp.shouldDrop = true
return tp, err
}
return tp, nil
return tp, err
}
// should be called with lock held
@@ -2079,7 +2089,8 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
tp.ddBytes = result.DependencyDescriptorExtension
tp.marker = result.RTPMarker
err := f.getTranslationParamsCommon(extPkt, layer, &tp)
starting, err := f.getTranslationParamsCommon(extPkt, layer, &tp)
tp.isStarting = starting
if tp.shouldDrop {
return tp, err
}
+2
View File
@@ -1316,6 +1316,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
// should lock onto the first in-order packet
expectedTP = TranslationParams{
isStarting: true,
rtp: TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
extSequenceNumber: 23333,
@@ -1583,6 +1584,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
}
marshalledVP8, _ := expectedVP8.Marshal()
expectedTP = TranslationParams{
isStarting: true,
isSwitching: true,
isResuming: true,
rtp: TranslationParamsRTP{