Merge remote-tracking branch 'origin/master' into benjamin/telemetry

This commit is contained in:
Benjamin Pracht
2024-10-11 16:02:10 -07:00
19 changed files with 172 additions and 84 deletions
+1 -1
View File
@@ -21,7 +21,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.23.1-0.20241007110347-136dfa7a2532
github.com/livekit/protocol v1.23.1-0.20241008082340-082848150f8f
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -165,8 +165,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.23.1-0.20241007110347-136dfa7a2532 h1:U4wgaMOpgTsaEOuaW6DodFdVwvkfAIZj21F+sVFFxzw=
github.com/livekit/protocol v1.23.1-0.20241007110347-136dfa7a2532/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.23.1-0.20241008082340-082848150f8f h1:ZtT9U7Mfcfv+uI1uCZUUeBf/R0uin+KIHanyStuXr68=
github.com/livekit/protocol v1.23.1-0.20241008082340-082848150f8f/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
package dynacast
import (
"strings"
"sync"
"time"
"github.com/bep/debounce"
"golang.org/x/exp/maps"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -157,6 +159,7 @@ func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuali
return nil
}
mime = strings.ToLower(mime)
if dq := d.dynacastQuality[mime]; dq != nil {
return dq
}
@@ -165,8 +168,8 @@ func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuali
MimeType: mime,
Logger: d.params.Logger,
})
dq.OnSubscribedMaxQualityChange(func(maxQuality livekit.VideoQuality) {
d.updateMaxQualityForMime(mime, maxQuality)
dq.OnSubscribedMaxQualityChange(func(mimeType string, maxQuality livekit.VideoQuality) {
d.updateMaxQualityForMime(mimeType, maxQuality)
})
dq.Start()
@@ -176,12 +179,7 @@ func (d *DynacastManager) getOrCreateDynacastQuality(mime string) *DynacastQuali
}
func (d *DynacastManager) getDynacastQualitiesLocked() []*DynacastQuality {
dqs := make([]*DynacastQuality, 0, len(d.dynacastQuality))
for _, dq := range d.dynacastQuality {
dqs = append(dqs, dq)
}
return dqs
return maps.Values(d.dynacastQuality)
}
func (d *DynacastManager) updateMaxQualityForMime(mime string, maxQuality livekit.VideoQuality) {
@@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
package dynacast
import (
"sort"
"strings"
"sync"
"testing"
"time"
@@ -54,7 +55,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Codec: strings.ToLower(webrtc.MimeTypeVP8),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
@@ -62,7 +63,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
},
},
{
Codec: webrtc.MimeTypeAV1,
Codec: strings.ToLower(webrtc.MimeTypeAV1),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -94,8 +95,8 @@ func TestSubscribedMaxQuality(t *testing.T) {
})
dm.maxSubscribedQuality = map[string]livekit.VideoQuality{
webrtc.MimeTypeVP8: livekit.VideoQuality_LOW,
webrtc.MimeTypeAV1: livekit.VideoQuality_LOW,
strings.ToLower(webrtc.MimeTypeVP8): livekit.VideoQuality_LOW,
strings.ToLower(webrtc.MimeTypeAV1): livekit.VideoQuality_LOW,
}
dm.NotifySubscriberMaxQuality("s1", webrtc.MimeTypeVP8, livekit.VideoQuality_HIGH)
dm.NotifySubscriberMaxQuality("s2", webrtc.MimeTypeVP8, livekit.VideoQuality_MEDIUM)
@@ -103,7 +104,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
expectedSubscribedQualities := []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Codec: strings.ToLower(webrtc.MimeTypeVP8),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -111,7 +112,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
},
},
{
Codec: webrtc.MimeTypeAV1,
Codec: strings.ToLower(webrtc.MimeTypeAV1),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -131,7 +132,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Codec: strings.ToLower(webrtc.MimeTypeVP8),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -139,7 +140,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
},
},
{
Codec: webrtc.MimeTypeAV1,
Codec: strings.ToLower(webrtc.MimeTypeAV1),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -161,7 +162,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Codec: strings.ToLower(webrtc.MimeTypeVP8),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
@@ -169,7 +170,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
},
},
{
Codec: webrtc.MimeTypeAV1,
Codec: strings.ToLower(webrtc.MimeTypeAV1),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
@@ -201,7 +202,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Codec: strings.ToLower(webrtc.MimeTypeVP8),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
@@ -209,7 +210,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
},
},
{
Codec: webrtc.MimeTypeAV1,
Codec: strings.ToLower(webrtc.MimeTypeAV1),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
@@ -229,7 +230,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Codec: strings.ToLower(webrtc.MimeTypeVP8),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
@@ -237,7 +238,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
},
},
{
Codec: webrtc.MimeTypeAV1,
Codec: strings.ToLower(webrtc.MimeTypeAV1),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: false},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
@@ -260,7 +261,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
expectedSubscribedQualities = []*livekit.SubscribedCodec{
{
Codec: webrtc.MimeTypeVP8,
Codec: strings.ToLower(webrtc.MimeTypeVP8),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -268,7 +269,7 @@ func TestSubscribedMaxQuality(t *testing.T) {
},
},
{
Codec: webrtc.MimeTypeAV1,
Codec: strings.ToLower(webrtc.MimeTypeAV1),
Qualities: []*livekit.SubscribedQuality{
{Quality: livekit.VideoQuality_LOW, Enabled: true},
{Quality: livekit.VideoQuality_MEDIUM, Enabled: true},
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
package dynacast
import (
"sync"
@@ -43,7 +43,7 @@ type DynacastQuality struct {
maxSubscribedQuality livekit.VideoQuality
maxQualityTimer *time.Timer
onSubscribedMaxQualityChange func(maxSubscribedQuality livekit.VideoQuality)
onSubscribedMaxQualityChange func(mimeType string, maxSubscribedQuality livekit.VideoQuality)
}
func NewDynacastQuality(params DynacastQualityParams) *DynacastQuality {
@@ -66,7 +66,7 @@ func (d *DynacastQuality) Stop() {
d.stopMaxQualityTimer()
}
func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(maxSubscribedQuality livekit.VideoQuality)) {
func (d *DynacastQuality) OnSubscribedMaxQualityChange(f func(mimeType string, maxSubscribedQuality livekit.VideoQuality)) {
d.onSubscribedMaxQualityChange = f
}
@@ -148,7 +148,7 @@ func (d *DynacastQuality) updateQualityChange(force bool) {
d.lock.Unlock()
if onSubscribedMaxQualityChange != nil {
onSubscribedMaxQualityChange(maxSubscribedQuality)
onSubscribedMaxQualityChange(d.params.MimeType, maxSubscribedQuality)
}
}
+3 -2
View File
@@ -29,6 +29,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/dynacast"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -48,7 +49,7 @@ type MediaTrack struct {
*MediaTrackReceiver
*MediaLossProxy
dynacastManager *DynacastManager
dynacastManager *dynacast.DynacastManager
lock sync.RWMutex
@@ -107,7 +108,7 @@ func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack {
}
if ti.Type == livekit.TrackType_VIDEO {
t.dynacastManager = NewDynacastManager(DynacastManagerParams{
t.dynacastManager = dynacast.NewDynacastManager(dynacast.DynacastManagerParams{
DynacastPauseDelay: params.VideoConfig.DynacastPauseDelay,
Logger: params.Logger,
})
+2
View File
@@ -683,6 +683,7 @@ func (t *MediaTrackReceiver) UpdateAudioTrack(update *livekit.UpdateLocalAudioTr
t.updateTrackInfoOfReceivers()
t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), clonedInfo)
t.params.Logger.Debugw("updated audio track", "before", logger.Proto(trackInfo), "after", logger.Proto(clonedInfo))
}
func (t *MediaTrackReceiver) UpdateVideoTrack(update *livekit.UpdateLocalVideoTrack) {
@@ -706,6 +707,7 @@ func (t *MediaTrackReceiver) UpdateVideoTrack(update *livekit.UpdateLocalVideoTr
t.updateTrackInfoOfReceivers()
t.params.Telemetry.TrackPublishedUpdate(context.Background(), t.PublisherID(), clonedInfo)
t.params.Logger.Debugw("updated video track", "before", logger.Proto(trackInfo), "after", logger.Proto(clonedInfo))
}
func (t *MediaTrackReceiver) TrackInfo() *livekit.TrackInfo {
+34 -8
View File
@@ -1696,7 +1696,6 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
overrideSenderIdentity = false
payload.ChatMessage.Generated = true
}
shouldForwardData = true
case *livekit.DataPacket_Metrics:
if payload.Metrics == nil {
return
@@ -1712,6 +1711,19 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt
// and pushing it to all subscribers on some cadence and subscribers have their own cadence of
// processing/batching and sending to edge clients.
p.metricTimestamper.Process(payload.Metrics)
case *livekit.DataPacket_RpcRequest:
if payload.RpcRequest == nil {
return
}
p.pubLogger.Infow("received RPC request data packet", "method", payload.RpcRequest.Method, "rpc_request_id", payload.RpcRequest.Id)
case *livekit.DataPacket_RpcResponse:
if payload.RpcResponse == nil {
return
}
case *livekit.DataPacket_RpcAck:
if payload.RpcAck == nil {
return
}
default:
p.pubLogger.Warnw("received unsupported data packet", nil, "payload", payload)
}
@@ -1943,7 +1955,7 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(
// normalize the codec name
for _, subscribedQuality := range subscribedQualities {
subscribedQuality.Codec = strings.ToLower(strings.TrimLeft(subscribedQuality.Codec, "video/"))
subscribedQuality.Codec = strings.ToLower(strings.TrimPrefix(subscribedQuality.Codec, "video/"))
}
subscribedQualityUpdate := &livekit.SubscribedQualityUpdate{
@@ -2219,12 +2231,24 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
if newTrack {
go func() {
p.pubLogger.Debugw(
"track published",
"trackID", mt.ID(),
"track", logger.Proto(mt.ToProto()),
"cost", pubTime.Milliseconds(),
)
// TODO: remove this after we know where the high delay is coming from
if pubTime > 3*time.Second {
p.pubLogger.Infow(
"track published with high delay",
"trackID", mt.ID(),
"track", logger.Proto(mt.ToProto()),
"cost", pubTime.Milliseconds(),
"rid", track.RID(),
"mime", track.Codec().MimeType,
)
} else {
p.pubLogger.Debugw(
"track published",
"trackID", mt.ID(),
"track", logger.Proto(mt.ToProto()),
"cost", pubTime.Milliseconds(),
)
}
prometheus.RecordPublishTime(mt.Source(), mt.Kind(), pubTime, p.GetClientInfo().GetSdk(), p.Kind())
p.handleTrackPublished(mt)
@@ -2826,6 +2850,8 @@ func (p *ParticipantImpl) UpdateAudioTrack(update *livekit.UpdateLocalAudioTrack
ti.DisableDtx = true
}
}
p.pubLogger.Debugw("updated pending track", "trackID", ti.Sid, "trackInfo", logger.Proto(ti))
}
}
}
+16 -15
View File
@@ -969,8 +969,8 @@ func (r *Room) GetAgentDispatches(dispatchID string) ([]*livekit.AgentDispatch,
return ret, nil
}
func (r *Room) AddAgentDispatch(agentName string, metadata string) (*livekit.AgentDispatch, error) {
ad, err := r.createAgentDispatchFromParams(agentName, metadata)
func (r *Room) AddAgentDispatch(dispatch *livekit.AgentDispatch) (*livekit.AgentDispatch, error) {
ad, err := r.createAgentDispatch(dispatch)
if err != nil {
return nil, err
}
@@ -1692,20 +1692,12 @@ func (r *Room) DebugInfo() map[string]interface{} {
return info
}
func (r *Room) createAgentDispatchFromParams(agentName string, metadata string) (*agentDispatch, error) {
now := time.Now()
func (r *Room) createAgentDispatch(dispatch *livekit.AgentDispatch) (*agentDispatch, error) {
dispatch.State = &livekit.AgentDispatchState{
CreatedAt: time.Now().UnixNano(),
}
ad := newAgentDispatch(dispatch)
ad := newAgentDispatch(
&livekit.AgentDispatch{
Id: guid.New(guid.AgentDispatchPrefix),
AgentName: agentName,
Metadata: metadata,
Room: r.protoRoom.Name,
State: &livekit.AgentDispatchState{
CreatedAt: now.UnixNano(),
},
},
)
r.lock.RLock()
r.agentDispatches[ad.Id] = ad
r.lock.RUnlock()
@@ -1719,6 +1711,15 @@ func (r *Room) createAgentDispatchFromParams(agentName string, metadata string)
return ad, nil
}
func (r *Room) createAgentDispatchFromParams(agentName string, metadata string) (*agentDispatch, error) {
return r.createAgentDispatch(&livekit.AgentDispatch{
Id: guid.New(guid.AgentDispatchPrefix),
AgentName: agentName,
Metadata: metadata,
Room: r.protoRoom.Name,
})
}
func (r *Room) createAgentDispatchesFromRoomAgent() {
if r.internal == nil {
return
+14 -5
View File
@@ -721,6 +721,9 @@ func (m *SubscriptionManager) handleSubscribedTrackClose(s *trackSubscription, i
}
m.params.Participant.Negotiate(false)
} else {
t := time.Now()
s.subscribeAt.Store(&t)
}
if relieveFromLimits {
m.queueReconcile(trackIDForReconcileSubscriptions)
@@ -755,16 +758,20 @@ type trackSubscription struct {
// this timestamp determines when failures are reported
subStartedAt atomic.Pointer[time.Time]
createAt time.Time
// the timestamp when the subscription was started, will be reset when downtrack is closed with expected resume
subscribeAt atomic.Pointer[time.Time]
succRecordCounter atomic.Int32
}
func newTrackSubscription(subscriberID livekit.ParticipantID, trackID livekit.TrackID, l logger.Logger) *trackSubscription {
return &trackSubscription{
s := &trackSubscription{
subscriberID: subscriberID,
trackID: trackID,
logger: l,
createAt: time.Now(),
}
t := time.Now()
s.subscribeAt.Store(&t)
return s
}
func (s *trackSubscription) setPublisher(publisherIdentity livekit.ParticipantIdentity, publisherID livekit.ParticipantID) {
@@ -790,6 +797,7 @@ func (s *trackSubscription) setDesired(desired bool) bool {
// we'll reset the timer so it has sufficient time to reconcile
t := time.Now()
s.subStartedAt.Store(&t)
s.subscribeAt.Store(&t)
}
if s.desired == desired {
@@ -822,6 +830,7 @@ func (s *trackSubscription) setHasPermission(perm bool) bool {
// when permission is granted, reset the timer so it has sufficient time to reconcile
t := time.Now()
s.subStartedAt.Store(&t)
s.subscribeAt.Store(&t)
}
return true
}
@@ -992,10 +1001,10 @@ func (s *trackSubscription) maybeRecordSuccess(ts telemetry.TelemetryService, pI
return
}
d := time.Since(s.createAt)
d := time.Since(*s.subscribeAt.Load())
s.logger.Debugw("track subscribed", "cost", d.Milliseconds())
subscriber := subTrack.Subscriber()
prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind())
prometheus.RecordSubscribeTime(mediaTrack.Source(), mediaTrack.Kind(), d, subscriber.GetClientInfo().GetSdk(), subscriber.Kind(), int(s.succRecordCounter.Inc()))
eventSent := s.eventSent.Swap(true)
+12 -6
View File
@@ -22,7 +22,6 @@ import (
"sync"
"time"
"github.com/bep/debounce"
"github.com/pion/dtls/v2/pkg/crypto/elliptic"
"github.com/pion/ice/v2"
"github.com/pion/interceptor"
@@ -60,6 +59,7 @@ const (
LossyDataChannel = "_lossy"
ReliableDataChannel = "_reliable"
fastNegotiationFrequency = 10 * time.Millisecond
negotiationFrequency = 150 * time.Millisecond
negotiationFailedTimeout = 15 * time.Second
dtlsRetransmissionInterval = 100 * time.Millisecond
@@ -190,7 +190,7 @@ type PCTransport struct {
resetShortConnOnICERestart atomic.Bool
signalingRTT atomic.Uint32 // milliseconds
debouncedNegotiate func(func())
debouncedNegotiate *sfuutils.Debouncer
debouncePending bool
lastNegotiate time.Time
@@ -415,7 +415,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
}
t := &PCTransport{
params: params,
debouncedNegotiate: debounce.New(negotiationFrequency),
debouncedNegotiate: sfuutils.NewDebouncer(negotiationFrequency),
negotiationState: transport.NegotiationStateNone,
eventsQueue: utils.NewTypedOpsQueue[event](utils.OpsQueueParams{
Name: "transport",
@@ -1001,8 +1001,8 @@ func (t *PCTransport) Negotiate(force bool) {
var postEvent bool
t.lock.Lock()
if force || (!t.debouncePending && time.Since(t.lastNegotiate) > negotiationFrequency) {
t.debouncedNegotiate(func() {
if force {
t.debouncedNegotiate.Add(func() {
// no op to cancel pending negotiation
})
t.debouncePending = false
@@ -1011,7 +1011,13 @@ func (t *PCTransport) Negotiate(force bool) {
postEvent = true
} else {
if !t.debouncePending {
t.debouncedNegotiate(func() {
if time.Since(t.lastNegotiate) > negotiationFrequency {
t.debouncedNegotiate.SetDuration(fastNegotiationFrequency)
} else {
t.debouncedNegotiate.SetDuration(negotiationFrequency)
}
t.debouncedNegotiate.Add(func() {
t.lock.Lock()
t.debouncePending = false
t.updateLastNeogitateLocked()
+1 -1
View File
@@ -478,7 +478,7 @@ func (t *TransportManager) configureICE(iceConfig *livekit.ICEConfig, reset bool
return
}
t.params.Logger.Infow("setting ICE config", "iceConfig", iceConfig)
t.params.Logger.Infow("setting ICE config", "iceConfig", logger.Proto(iceConfig))
onICEConfigChanged := t.onICEConfigChanged
t.iceConfig = iceConfig
t.lock.Unlock()
+8 -1
View File
@@ -19,6 +19,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils/guid"
)
type AgentDispatchService struct {
@@ -39,7 +40,13 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit
return nil, twirpAuthError(err)
}
return ag.agentDispatchClient.CreateDispatch(ctx, ag.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
dispatch := &livekit.AgentDispatch{
Id: guid.New(guid.AgentDispatchPrefix),
AgentName: req.AgentName,
Room: req.Room,
Metadata: req.Metadata,
}
return ag.agentDispatchClient.CreateDispatch(ctx, ag.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), dispatch)
}
func (ag *AgentDispatchService) DeleteDispatch(ctx context.Context, req *livekit.DeleteAgentDispatchRequest) (*livekit.AgentDispatch, error) {
+2 -2
View File
@@ -885,13 +885,13 @@ func (r *RoomManager) ListDispatch(ctx context.Context, req *livekit.ListAgentDi
return ret, nil
}
func (r *RoomManager) CreateDispatch(ctx context.Context, req *livekit.CreateAgentDispatchRequest) (*livekit.AgentDispatch, error) {
func (r *RoomManager) CreateDispatch(ctx context.Context, req *livekit.AgentDispatch) (*livekit.AgentDispatch, error) {
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
disp, err := room.AddAgentDispatch(req.AgentName, req.Metadata)
disp, err := room.AddAgentDispatch(req)
if err != nil {
return nil, err
}
+3 -1
View File
@@ -444,7 +444,8 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
if onBinding != nil {
onBinding(err)
}
return webrtc.RTPCodecParameters{}, err
// don't return error here, as pion will not start transports if Bind fails at first answer
return webrtc.RTPCodecParameters{}, nil
}
// if a downtrack is closed before bind, it already unsubscribed from client, don't do subsequent operation and return here.
@@ -1265,6 +1266,7 @@ func (d *DownTrack) SeedState(state DownTrackState) {
}
func (d *DownTrack) StopWriteAndGetState() DownTrackState {
d.params.Logger.Debugw("stopping write")
d.bindLock.Lock()
d.writable.Store(false)
d.writeStopped.Store(true)
+34
View File
@@ -0,0 +1,34 @@
package utils
import (
"sync"
"time"
)
func NewDebouncer(after time.Duration) *Debouncer {
return &Debouncer{
after: after,
}
}
type Debouncer struct {
mu sync.Mutex
after time.Duration
timer *time.Timer
}
func (d *Debouncer) Add(f func()) {
d.mu.Lock()
defer d.mu.Unlock()
if d.timer != nil {
d.timer.Stop()
}
d.timer = time.AfterFunc(d.after, f)
}
func (d *Debouncer) SetDuration(after time.Duration) {
d.mu.Lock()
d.after = after
d.mu.Unlock()
}
@@ -281,7 +281,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
d.logger.Debugw(
"switch to target",
"highestDecodeTarget", highestDecodeTarget,
"current", d.currentLayer,
"previous", d.previousLayer,
"bitmask", *d.activeDecodeTargetsBitmask,
"fn", dd.FrameNumber,
"efn", extFrameNum,
+4 -3
View File
@@ -54,7 +54,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate
if fd.ChainDiffs[fc.chainIdx] == 0 {
if fc.broken {
fc.broken = false
// fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx, "frame", extFrameNum)
fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx, "frame", extFrameNum)
}
fc.expectFrames = fc.expectFrames[:0]
return true
@@ -86,7 +86,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate
if !intact {
fc.broken = true
// fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", sd, "frame", extFrameNum, "prevFrame", prevFrameInChain)
fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", sd, "frame", extFrameNum, "prevFrame", prevFrameInChain)
}
return intact
}
@@ -100,7 +100,7 @@ func (fc *FrameChain) OnExpectFrameChanged(frameNum uint64, decision selectorDec
if f == frameNum {
if decision != selectorDecisionForwarded {
fc.broken = true
// fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", decision, "frame", frameNum)
fc.logger.Debugw("frame chain broken", "chanIdx", fc.chainIdx, "sd", decision, "frame", frameNum)
}
fc.expectFrames[i] = fc.expectFrames[len(fc.expectFrames)-1]
fc.expectFrames = fc.expectFrames[:len(fc.expectFrames)-1]
@@ -132,6 +132,7 @@ func (fc *FrameChain) EndUpdateActive() {
// if the chain transit from inactive to active, reset broken to wait a decodable SWITCH frame
if !fc.active {
fc.broken = true
fc.logger.Debugw("frame chain broken by inactive", "chanIdx", fc.chainIdx)
}
fc.active = active
+6 -6
View File
@@ -115,7 +115,7 @@ func initRoomStats(nodeID string, nodeType livekit.NodeType) {
Name: "ms",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
Buckets: []float64{100, 200, 500, 700, 1000, 5000, 10000},
}, append(promStreamLabels, "sdk", "kind"))
}, append(promStreamLabels, "sdk", "kind", "count"))
prometheus.MustRegister(promRoomCurrent)
prometheus.MustRegister(promRoomDuration)
@@ -173,19 +173,19 @@ func AddPublishSuccess(kind string) {
}
func RecordPublishTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) {
recordPubSubTime(true, source, trackType, d, sdk, kind)
recordPubSubTime(true, source, trackType, d, sdk, kind, 1)
}
func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) {
recordPubSubTime(false, source, trackType, d, sdk, kind)
func RecordSubscribeTime(source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) {
recordPubSubTime(false, source, trackType, d, sdk, kind, count)
}
func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind) {
func recordPubSubTime(isPublish bool, source livekit.TrackSource, trackType livekit.TrackType, d time.Duration, sdk livekit.ClientInfo_SDK, kind livekit.ParticipantInfo_Kind, count int) {
direction := "subscribe"
if isPublish {
direction = "publish"
}
promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String()).Observe(float64(d.Milliseconds()))
promPubSubTime.WithLabelValues(direction, source.String(), trackType.String(), sdk.String(), kind.String(), strconv.Itoa(count)).Observe(float64(d.Milliseconds()))
}
func RecordTrackSubscribeSuccess(kind string) {