diff --git a/go.mod b/go.mod index 0ac4665fe..d13f1b45b 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.23.1-0.20241007110347-136dfa7a2532 + github.com/livekit/protocol v1.23.1-0.20241008082340-082848150f8f github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 32c1b6caa..198b0c6e1 100644 --- a/go.sum +++ b/go.sum @@ -165,8 +165,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.23.1-0.20241007110347-136dfa7a2532 h1:U4wgaMOpgTsaEOuaW6DodFdVwvkfAIZj21F+sVFFxzw= -github.com/livekit/protocol v1.23.1-0.20241007110347-136dfa7a2532/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.23.1-0.20241008082340-082848150f8f h1:ZtT9U7Mfcfv+uI1uCZUUeBf/R0uin+KIHanyStuXr68= +github.com/livekit/protocol v1.23.1-0.20241008082340-082848150f8f/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/rtc/dynacastmanager.go b/pkg/rtc/dynacast/dynacastmanager.go similarity index 97% rename from pkg/rtc/dynacastmanager.go rename to pkg/rtc/dynacast/dynacastmanager.go index ca6ea7af3..641fe1be2 100644 --- a/pkg/rtc/dynacastmanager.go +++ b/pkg/rtc/dynacast/dynacastmanager.go @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rtc +package dynacast import ( + "strings" "sync" "time" "github.com/bep/debounce" + "golang.org/x/exp/maps" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -157,6 +159,7 @@ func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuali return nil } + mime = strings.ToLower(mime) if dq := d.dynacastQuality[mime]; dq != nil { return dq } @@ -165,8 +168,8 @@ func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuali MimeType: mime, Logger: d.params.Logger, }) - dq.OnSubscribedMaxQualityChange(func(maxQuality livekit.VideoQuality) { - d.updateMaxQualityForMime(mime, maxQuality) + dq.OnSubscribedMaxQualityChange(func(mimeType string, maxQuality livekit.VideoQuality) { + d.updateMaxQualityForMime(mimeType, maxQuality) }) dq.Start() @@ -176,12 +179,7 @@ func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuali } func (d *DynacastManager) getDynacastQualitiesLocked() []*DynacastQuality { - dqs := make([]*DynacastQuality, 0, len(d.dynacastQuality)) - for _, dq := range d.dynacastQuality { - dqs = append(dqs, dq) - } - - return dqs + return maps.Values(d.dynacastQuality) } func (d *DynacastManager) updateMaxQualityForMime(mime string, maxQuality livekit.VideoQuality) { diff --git a/pkg/rtc/dynacastmanager_test.go b/pkg/rtc/dynacast/dynacastmanager_test.go similarity index 92% rename from pkg/rtc/dynacastmanager_test.go rename to pkg/rtc/dynacast/dynacastmanager_test.go index ee1c97c70..c38fd30ef 100644 --- a/pkg/rtc/dynacastmanager_test.go +++ b/pkg/rtc/dynacast/dynacastmanager_test.go @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rtc +package dynacast import ( "sort" + "strings" "sync" "testing" "time" @@ -54,7 +55,7 @@ func TestSubscribedMaxQuality(t *testing.T) { expectedSubscribedQualities := []*livekit.SubscribedCodec{ { - Codec: webrtc.MimeTypeVP8, + Codec: strings.ToLower(webrtc.MimeTypeVP8), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: false}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, @@ -62,7 +63,7 @@ func TestSubscribedMaxQuality(t *testing.T) { }, }, { - Codec: webrtc.MimeTypeAV1, + Codec: strings.ToLower(webrtc.MimeTypeAV1), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, @@ -94,8 +95,8 @@ func TestSubscribedMaxQuality(t *testing.T) { }) dm.maxSubscribedQuality = map[string]livekit.VideoQuality{ - webrtc.MimeTypeVP8: livekit.VideoQuality_LOW, - webrtc.MimeTypeAV1: livekit.VideoQuality_LOW, + strings.ToLower(webrtc.MimeTypeVP8): livekit.VideoQuality_LOW, + strings.ToLower(webrtc.MimeTypeAV1): livekit.VideoQuality_LOW, } dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_HIGH) dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_MEDIUM) @@ -103,7 +104,7 @@ func TestSubscribedMaxQuality(t *testing.T) { expectedSubscribedQualities := []*livekit.SubscribedCodec{ { - Codec: webrtc.MimeTypeVP8, + Codec: strings.ToLower(webrtc.MimeTypeVP8), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, @@ -111,7 +112,7 @@ func TestSubscribedMaxQuality(t *testing.T) { }, }, { - Codec: webrtc.MimeTypeAV1, + Codec: strings.ToLower(webrtc.MimeTypeAV1), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, @@ -131,7 +132,7 @@ func TestSubscribedMaxQuality(t *testing.T) { expectedSubscribedQualities = []*livekit.SubscribedCodec{ { - Codec: webrtc.MimeTypeVP8, + Codec: strings.ToLower(webrtc.MimeTypeVP8), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, @@ -139,7 +140,7 @@ func TestSubscribedMaxQuality(t *testing.T) { }, }, { - Codec: webrtc.MimeTypeAV1, + Codec: strings.ToLower(webrtc.MimeTypeAV1), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, @@ -161,7 +162,7 @@ func TestSubscribedMaxQuality(t *testing.T) { expectedSubscribedQualities = []*livekit.SubscribedCodec{ { - Codec: webrtc.MimeTypeVP8, + Codec: strings.ToLower(webrtc.MimeTypeVP8), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, @@ -169,7 +170,7 @@ func TestSubscribedMaxQuality(t *testing.T) { }, }, { - Codec: webrtc.MimeTypeAV1, + Codec: strings.ToLower(webrtc.MimeTypeAV1), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, @@ -201,7 +202,7 @@ func TestSubscribedMaxQuality(t *testing.T) { expectedSubscribedQualities = []*livekit.SubscribedCodec{ { - Codec: webrtc.MimeTypeVP8, + Codec: strings.ToLower(webrtc.MimeTypeVP8), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: false}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, @@ -209,7 +210,7 @@ func TestSubscribedMaxQuality(t *testing.T) { }, }, { - Codec: webrtc.MimeTypeAV1, + Codec: strings.ToLower(webrtc.MimeTypeAV1), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: false}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, @@ -229,7 +230,7 @@ func TestSubscribedMaxQuality(t *testing.T) { expectedSubscribedQualities = []*livekit.SubscribedCodec{ { - Codec: webrtc.MimeTypeVP8, + Codec: strings.ToLower(webrtc.MimeTypeVP8), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, @@ -237,7 +238,7 @@ func TestSubscribedMaxQuality(t *testing.T) { }, }, { - Codec: webrtc.MimeTypeAV1, + Codec: strings.ToLower(webrtc.MimeTypeAV1), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: false}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: false}, @@ -260,7 +261,7 @@ func TestSubscribedMaxQuality(t *testing.T) { expectedSubscribedQualities = []*livekit.SubscribedCodec{ { - Codec: webrtc.MimeTypeVP8, + Codec: strings.ToLower(webrtc.MimeTypeVP8), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, @@ -268,7 +269,7 @@ func TestSubscribedMaxQuality(t *testing.T) { }, }, { - Codec: webrtc.MimeTypeAV1, + Codec: strings.ToLower(webrtc.MimeTypeAV1), Qualities: []*livekit.SubscribedQuality{ {Quality: livekit.VideoQuality_LOW, Enabled: true}, {Quality: livekit.VideoQuality_MEDIUM, Enabled: true}, diff --git a/pkg/rtc/dynacastquality.go b/pkg/rtc/dynacast/dynacastquality.go similarity index 93% rename from pkg/rtc/dynacastquality.go rename to pkg/rtc/dynacast/dynacastquality.go index 7f0f90495..53e9f979d 100644 --- a/pkg/rtc/dynacastquality.go +++ b/pkg/rtc/dynacast/dynacastquality.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rtc +package dynacast import ( "sync" @@ -43,7 +43,7 @@ type DynacastQuality struct { maxSubscribedQuality livekit.VideoQuality maxQualityTimer *time.Timer - onSubscribedMaxQualityChange func(maxSubscribedQuality livekit.VideoQuality) + onSubscribedMaxQualityChange func(mimeType string, maxSubscribedQuality livekit.VideoQuality) } func NewDynacastQuality(params DynacastQualityParams) *DynacastQuality { @@ -66,7 +66,7 @@ func (d *DynacastQuality) Stop() { d.stopMaxQualityTimer() } -func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(maxSubscribedQuality livekit.VideoQuality)) { +func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(mimeType string, maxSubscribedQuality livekit.VideoQuality)) { d.onSubscribedMaxQualityChange = f } @@ -148,7 +148,7 @@ func (d *DynacastQuality) updateQualityChange(force bool) { d.lock.Unlock() if onSubscribedMaxQualityChange != nil { - onSubscribedMaxQualityChange(maxSubscribedQuality) + onSubscribedMaxQualityChange(d.params.MimeType, maxSubscribedQuality) } } diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 9b1e8c2ea..2471c92da 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/rtc/dynacast" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -48,7 +49,7 @@ type MediaTrack struct { *MediaTrackReceiver *MediaLossProxy - dynacastManager *DynacastManager + dynacastManager *dynacast.DynacastManager lock sync.RWMutex @@ -107,7 +108,7 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { } if ti.Type == livekit.TrackType_VIDEO { - t.dynacastManager = NewDynacastManager(DynacastManagerParams{ + t.dynacastManager = dynacast.NewDynacastManager(dynacast.DynacastManagerParams{ DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay, Logger: params.Logger, }) diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index a404c6852..6c52cb727 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -683,6 +683,7 @@ func (t *MediaTrackReceiver) UpdateAudioTrack(update *livekit.UpdateLocalAudioTr t.updateTrackInfoOfReceivers() t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), clonedInfo) + t.params.Logger.Debugw("updated audio track", "before", logger.Proto(trackInfo), "after", logger.Proto(clonedInfo)) } func (t *MediaTrackReceiver) UpdateVideoTrack(update *livekit.UpdateLocalVideoTrack) { @@ -706,6 +707,7 @@ func (t *MediaTrackReceiver) UpdateVideoTrack(update *livekit.UpdateLocalVideoTr t.updateTrackInfoOfReceivers() t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), clonedInfo) + t.params.Logger.Debugw("updated video track", "before", logger.Proto(trackInfo), "after", logger.Proto(clonedInfo)) } func (t *MediaTrackReceiver) TrackInfo() *livekit.TrackInfo { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index aab706a78..db34de735 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1696,7 +1696,6 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt overrideSenderIdentity = false payload.ChatMessage.Generated = true } - shouldForwardData = true case *livekit.DataPacket_Metrics: if payload.Metrics == nil { return @@ -1712,6 +1711,19 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt // and pushing it to all subscribers on some cadence and subscribers have their own cadence of // processing/batching and sending to edge clients. p.metricTimestamper.Process(payload.Metrics) + case *livekit.DataPacket_RpcRequest: + if payload.RpcRequest == nil { + return + } + p.pubLogger.Infow("received RPC request data packet", "method", payload.RpcRequest.Method, "rpc_request_id", payload.RpcRequest.Id) + case *livekit.DataPacket_RpcResponse: + if payload.RpcResponse == nil { + return + } + case *livekit.DataPacket_RpcAck: + if payload.RpcAck == nil { + return + } default: p.pubLogger.Warnw("received unsupported data packet", nil, "payload", payload) } @@ -1943,7 +1955,7 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange( // normalize the codec name for _, subscribedQuality := range subscribedQualities { - subscribedQuality.Codec = strings.ToLower(strings.TrimLeft(subscribedQuality.Codec, "video/")) + subscribedQuality.Codec = strings.ToLower(strings.TrimPrefix(subscribedQuality.Codec, "video/")) } subscribedQualityUpdate := &livekit.SubscribedQualityUpdate{ @@ -2219,12 +2231,24 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei if newTrack { go func() { - p.pubLogger.Debugw( - "track published", - "trackID", mt.ID(), - "track", logger.Proto(mt.ToProto()), - "cost", pubTime.Milliseconds(), - ) + // TODO: remove this after we know where the high delay is coming from + if pubTime > 3*time.Second { + p.pubLogger.Infow( + "track published with high delay", + "trackID", mt.ID(), + "track", logger.Proto(mt.ToProto()), + "cost", pubTime.Milliseconds(), + "rid", track.RID(), + "mime", track.Codec().MimeType, + ) + } else { + p.pubLogger.Debugw( + "track published", + "trackID", mt.ID(), + "track", logger.Proto(mt.ToProto()), + "cost", pubTime.Milliseconds(), + ) + } prometheus.RecordPublishTime(mt.Source(), mt.Kind(), pubTime, p.GetClientInfo().GetSdk(), p.Kind()) p.handleTrackPublished(mt) @@ -2826,6 +2850,8 @@ func (p *ParticipantImpl) UpdateAudioTrack(update *livekit.UpdateLocalAudioTrack ti.DisableDtx = true } } + + p.pubLogger.Debugw("updated pending track", "trackID", ti.Sid, "trackInfo", logger.Proto(ti)) } } } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 488ffcca4..7c1918d2c 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -969,8 +969,8 @@ func (r *Room) GetAgentDispatches(dispatchID string) ([]*livekit.AgentDispatch, return ret, nil } -func (r *Room) AddAgentDispatch(agentName string, metadata string) (*livekit.AgentDispatch, error) { - ad, err := r.createAgentDispatchFromParams(agentName, metadata) +func (r *Room) AddAgentDispatch(dispatch *livekit.AgentDispatch) (*livekit.AgentDispatch, error) { + ad, err := r.createAgentDispatch(dispatch) if err != nil { return nil, err } @@ -1692,20 +1692,12 @@ func (r *Room) DebugInfo() map[string]interface{} { return info } -func (r *Room) createAgentDispatchFromParams(agentName string, metadata string) (*agentDispatch, error) { - now := time.Now() +func (r *Room) createAgentDispatch(dispatch *livekit.AgentDispatch) (*agentDispatch, error) { + dispatch.State = &livekit.AgentDispatchState{ + CreatedAt: time.Now().UnixNano(), + } + ad := newAgentDispatch(dispatch) - ad := newAgentDispatch( - &livekit.AgentDispatch{ - Id: guid.New(guid.AgentDispatchPrefix), - AgentName: agentName, - Metadata: metadata, - Room: r.protoRoom.Name, - State: &livekit.AgentDispatchState{ - CreatedAt: now.UnixNano(), - }, - }, - ) r.lock.RLock() r.agentDispatches[ad.Id] = ad r.lock.RUnlock() @@ -1719,6 +1711,15 @@ func (r *Room) createAgentDispatchFromParams(agentName string, metadata string) return ad, nil } +func (r *Room) createAgentDispatchFromParams(agentName string, metadata string) (*agentDispatch, error) { + return r.createAgentDispatch(&livekit.AgentDispatch{ + Id: guid.New(guid.AgentDispatchPrefix), + AgentName: agentName, + Metadata: metadata, + Room: r.protoRoom.Name, + }) +} + func (r *Room) createAgentDispatchesFromRoomAgent() { if r.internal == nil { return diff --git a/pkg/rtc/subscriptionmanager.go b/pkg/rtc/subscriptionmanager.go index 5c4584910..33585b4f1 100644 --- a/pkg/rtc/subscriptionmanager.go +++ b/pkg/rtc/subscriptionmanager.go @@ -721,6 +721,9 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, i } m.params.Participant.Negotiate(false) + } else { + t := time.Now() + s.subscribeAt.Store(&t) } if relieveFromLimits { m.queueReconcile(trackIDForReconcileSubscriptions) @@ -755,16 +758,20 @@ type trackSubscription struct { // this timestamp determines when failures are reported subStartedAt atomic.Pointer[time.Time] - createAt time.Time + // the timestamp when the subscription was started, will be reset when downtrack is closed with expected resume + subscribeAt atomic.Pointer[time.Time] + succRecordCounter atomic.Int32 } func newTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *trackSubscription { - return &trackSubscription{ + s := &trackSubscription{ subscriberID: subscriberID, trackID: trackID, logger: l, - createAt: time.Now(), } + t := time.Now() + s.subscribeAt.Store(&t) + return s } func (s *trackSubscription) setPublisher(publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) { @@ -790,6 +797,7 @@ func (s *trackSubscription) setDesired(desired bool) bool { // we'll reset the timer so it has sufficient time to reconcile t := time.Now() s.subStartedAt.Store(&t) + s.subscribeAt.Store(&t) } if s.desired == desired { @@ -822,6 +830,7 @@ func (s *trackSubscription) setHasPermission(perm bool) bool { // when permission is granted, reset the timer so it has sufficient time to reconcile t := time.Now() s.subStartedAt.Store(&t) + s.subscribeAt.Store(&t) } return true } @@ -992,10 +1001,10 @@ func (s *trackSubscription) maybeRecordSuccess(ts telemetry.TelemetryService, pI return } - d := time.Since(s.createAt) + d := time.Since(*s.subscribeAt.Load()) s.logger.Debugw("track subscribed", "cost", d.Milliseconds()) subscriber := subTrack.Subscriber() - prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind()) + prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind(), int(s.succRecordCounter.Inc())) eventSent := s.eventSent.Swap(true) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 465c1828a..dd02dda16 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/bep/debounce" "github.com/pion/dtls/v2/pkg/crypto/elliptic" "github.com/pion/ice/v2" "github.com/pion/interceptor" @@ -60,6 +59,7 @@ const ( LossyDataChannel = "_lossy" ReliableDataChannel = "_reliable" + fastNegotiationFrequency = 10 * time.Millisecond negotiationFrequency = 150 * time.Millisecond negotiationFailedTimeout = 15 * time.Second dtlsRetransmissionInterval = 100 * time.Millisecond @@ -190,7 +190,7 @@ type PCTransport struct { resetShortConnOnICERestart atomic.Bool signalingRTT atomic.Uint32 // milliseconds - debouncedNegotiate func(func()) + debouncedNegotiate *sfuutils.Debouncer debouncePending bool lastNegotiate time.Time @@ -415,7 +415,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) { } t := &PCTransport{ params: params, - debouncedNegotiate: debounce.New(negotiationFrequency), + debouncedNegotiate: sfuutils.NewDebouncer(negotiationFrequency), negotiationState: transport.NegotiationStateNone, eventsQueue: utils.NewTypedOpsQueue[event](utils.OpsQueueParams{ Name: "transport", @@ -1001,8 +1001,8 @@ func (t *PCTransport) Negotiate(force bool) { var postEvent bool t.lock.Lock() - if force || (!t.debouncePending && time.Since(t.lastNegotiate) > negotiationFrequency) { - t.debouncedNegotiate(func() { + if force { + t.debouncedNegotiate.Add(func() { // no op to cancel pending negotiation }) t.debouncePending = false @@ -1011,7 +1011,13 @@ func (t *PCTransport) Negotiate(force bool) { postEvent = true } else { if !t.debouncePending { - t.debouncedNegotiate(func() { + if time.Since(t.lastNegotiate) > negotiationFrequency { + t.debouncedNegotiate.SetDuration(fastNegotiationFrequency) + } else { + t.debouncedNegotiate.SetDuration(negotiationFrequency) + } + + t.debouncedNegotiate.Add(func() { t.lock.Lock() t.debouncePending = false t.updateLastNeogitateLocked() diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index e3c812e1b..f1a24b4fe 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -478,7 +478,7 @@ func (t *TransportManager) configureICE(iceConfig *livekit.ICEConfig, reset bool return } - t.params.Logger.Infow("setting ICE config", "iceConfig", iceConfig) + t.params.Logger.Infow("setting ICE config", "iceConfig", logger.Proto(iceConfig)) onICEConfigChanged := t.onICEConfigChanged t.iceConfig = iceConfig t.lock.Unlock() diff --git a/pkg/service/agent_dispatch_service.go b/pkg/service/agent_dispatch_service.go index 680c98268..c5cfe0bbf 100644 --- a/pkg/service/agent_dispatch_service.go +++ b/pkg/service/agent_dispatch_service.go @@ -19,6 +19,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils/guid" ) type AgentDispatchService struct { @@ -39,7 +40,13 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit return nil, twirpAuthError(err) } - return ag.agentDispatchClient.CreateDispatch(ctx, ag.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + dispatch := &livekit.AgentDispatch{ + Id: guid.New(guid.AgentDispatchPrefix), + AgentName: req.AgentName, + Room: req.Room, + Metadata: req.Metadata, + } + return ag.agentDispatchClient.CreateDispatch(ctx, ag.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), dispatch) } func (ag *AgentDispatchService) DeleteDispatch(ctx context.Context, req *livekit.DeleteAgentDispatchRequest) (*livekit.AgentDispatch, error) { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 2d80833ba..d97c2994a 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -885,13 +885,13 @@ func (r *RoomManager) ListDispatch(ctx context.Context, req *livekit.ListAgentDi return ret, nil } -func (r *RoomManager) CreateDispatch(ctx context.Context, req *livekit.CreateAgentDispatchRequest) (*livekit.AgentDispatch, error) { +func (r *RoomManager) CreateDispatch(ctx context.Context, req *livekit.AgentDispatch) (*livekit.AgentDispatch, error) { room := r.GetRoom(ctx, livekit.RoomName(req.Room)) if room == nil { return nil, ErrRoomNotFound } - disp, err := room.AddAgentDispatch(req.AgentName, req.Metadata) + disp, err := room.AddAgentDispatch(req) if err != nil { return nil, err } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b2b3a5095..77b1a8d65 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -444,7 +444,8 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, if onBinding != nil { onBinding(err) } - return webrtc.RTPCodecParameters{}, err + // don't return error here, as pion will not start transports if Bind fails at first answer + return webrtc.RTPCodecParameters{}, nil } // if a downtrack is closed before bind, it already unsubscribed from client, don't do subsequent operation and return here. @@ -1265,6 +1266,7 @@ func (d *DownTrack) SeedState(state DownTrackState) { } func (d *DownTrack) StopWriteAndGetState() DownTrackState { + d.params.Logger.Debugw("stopping write") d.bindLock.Lock() d.writable.Store(false) d.writeStopped.Store(true) diff --git a/pkg/sfu/utils/debounce.go b/pkg/sfu/utils/debounce.go new file mode 100644 index 000000000..5fab42bc3 --- /dev/null +++ b/pkg/sfu/utils/debounce.go @@ -0,0 +1,34 @@ +package utils + +import ( + "sync" + "time" +) + +func NewDebouncer(after time.Duration) *Debouncer { + return &Debouncer{ + after: after, + } +} + +type Debouncer struct { + mu sync.Mutex + after time.Duration + timer *time.Timer +} + +func (d *Debouncer) Add(f func()) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil { + d.timer.Stop() + } + d.timer = time.AfterFunc(d.after, f) +} + +func (d *Debouncer) SetDuration(after time.Duration) { + d.mu.Lock() + d.after = after + d.mu.Unlock() +} diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index acfe5b85c..b5b4e9d4a 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -281,7 +281,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r d.logger.Debugw( "switch to target", "highestDecodeTarget", highestDecodeTarget, - "current", d.currentLayer, + "previous", d.previousLayer, "bitmask", *d.activeDecodeTargetsBitmask, "fn", dd.FrameNumber, "efn", extFrameNum, diff --git a/pkg/sfu/videolayerselector/framechain.go b/pkg/sfu/videolayerselector/framechain.go index 836783864..533fc4e7d 100644 --- a/pkg/sfu/videolayerselector/framechain.go +++ b/pkg/sfu/videolayerselector/framechain.go @@ -54,7 +54,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate if fd.ChainDiffs[fc.chainIdx] == 0 { if fc.broken { fc.broken = false - // fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx, "frame", extFrameNum) + fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx, "frame", extFrameNum) } fc.expectFrames = fc.expectFrames[:0] return true @@ -86,7 +86,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate if !intact { fc.broken = true - // fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", sd, "frame", extFrameNum, "prevFrame", prevFrameInChain) + fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", sd, "frame", extFrameNum, "prevFrame", prevFrameInChain) } return intact } @@ -100,7 +100,7 @@ func (fc *FrameChain) OnExpectFrameChanged(frameNum uint64, decision selectorDec if f == frameNum { if decision != selectorDecisionForwarded { fc.broken = true - // fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", decision, "frame", frameNum) + fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", decision, "frame", frameNum) } fc.expectFrames[i] = fc.expectFrames[len(fc.expectFrames)-1] fc.expectFrames = fc.expectFrames[:len(fc.expectFrames)-1] @@ -132,6 +132,7 @@ func (fc *FrameChain) EndUpdateActive() { // if the chain transit from inactive to active, reset broken to wait a decodable SWITCH frame if !fc.active { fc.broken = true + fc.logger.Debugw("frame chain broken by inactive", "chanIdx", fc.chainIdx) } fc.active = active diff --git a/pkg/telemetry/prometheus/rooms.go b/pkg/telemetry/prometheus/rooms.go index fa7126539..e0d48479d 100644 --- a/pkg/telemetry/prometheus/rooms.go +++ b/pkg/telemetry/prometheus/rooms.go @@ -115,7 +115,7 @@ func initRoomStats(nodeID string, nodeType livekit.NodeType) { Name: "ms", ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, Buckets: []float64{100, 200, 500, 700, 1000, 5000, 10000}, - }, append(promStreamLabels, "sdk", "kind")) + }, append(promStreamLabels, "sdk", "kind", "count")) prometheus.MustRegister(promRoomCurrent) prometheus.MustRegister(promRoomDuration) @@ -173,19 +173,19 @@ func AddPublishSuccess(kind string) { } func RecordPublishTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) { - recordPubSubTime(true, source, trackType, d, sdk, kind) + recordPubSubTime(true, source, trackType, d, sdk, kind, 1) } -func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) { - recordPubSubTime(false, source, trackType, d, sdk, kind) +func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) { + recordPubSubTime(false, source, trackType, d, sdk, kind, count) } -func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) { +func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) { direction := "subscribe" if isPublish { direction = "publish" } - promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String()).Observe(float64(d.Milliseconds())) + promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String(), strconv.Itoa(count)).Observe(float64(d.Milliseconds())) } func RecordTrackSubscribeSuccess(kind string) {