From dc385f5d244bfd19ccaa672d79377e5193c81b37 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 29 Dec 2021 14:46:32 +0530 Subject: [PATCH] Beginnings of typing for various ids. (#287) * Beginnings of typing for various ids. * trackSid/TrackSid -> trackID/TrackID * update protocol * Initial livekit.ParticipantID use --- go.mod | 2 +- go.sum | 4 +- pkg/rtc/uptrackmanager.go | 112 +++++++++++++++++++------------------- 3 files changed, 59 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index 205bcc43f..8db73ce30 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.11.5 + github.com/livekit/protocol v0.11.6 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index a4fe769d0..9241a1e87 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.5 h1:1wArim3zmGgfvawXAs7Mo6Vu+xkW6z4q2EL2Gbr0A/c= -github.com/livekit/protocol v0.11.5/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= +github.com/livekit/protocol v0.11.6 h1:F6WGwK/sDm7SfcDUln+6YlwmmWJT7gm6bXS8u6RmcyM= +github.com/livekit/protocol v0.11.6/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index ef848e5da..68350273f 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -36,13 +36,13 @@ type UptrackManager struct { twcc *twcc.Responder // publishedTracks that participant is publishing - publishedTracks map[string]types.PublishedTrack + publishedTracks map[livekit.TrackID]types.PublishedTrack // client intended to publish, yet to be reconciled pendingTracks map[string]*livekit.TrackInfo // keeps track of subscriptions that are awaiting permissions - subscriptionPermissions map[string]*livekit.TrackPermission // subscriberID => *livekit.TrackPermission + subscriptionPermissions map[livekit.ParticipantID]*livekit.TrackPermission // subscriberID => *livekit.TrackPermission // keeps tracks of track specific subscribers who are awaiting permission - pendingSubscriptions map[string][]string // trackSid => []subscriberID + pendingSubscriptions map[livekit.TrackID][]livekit.ParticipantID // trackID => []subscriberID lock sync.RWMutex @@ -50,7 +50,7 @@ type UptrackManager struct { onTrackPublished func(track types.PublishedTrack) onTrackUpdated func(track types.PublishedTrack, onlyIfReady bool) onWriteRTCP func(pkts []rtcp.Packet) - onSubscribedMaxQualityChange func(trackSid string, subscribedQualities []*livekit.SubscribedQuality) error + onSubscribedMaxQualityChange func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality) error } func NewUptrackManager(params UptrackManagerParams) *UptrackManager { @@ -58,9 +58,9 @@ func NewUptrackManager(params UptrackManagerParams) *UptrackManager { params: params, rtcpCh: make(chan []rtcp.Packet, 50), pliThrottle: newPLIThrottle(params.ThrottleConfig), - publishedTracks: make(map[string]types.PublishedTrack, 0), + publishedTracks: make(map[livekit.TrackID]types.PublishedTrack, 0), pendingTracks: make(map[string]*livekit.TrackInfo), - pendingSubscriptions: make(map[string][]string), + pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantID), } } @@ -108,7 +108,7 @@ func (u *UptrackManager) OnWriteRTCP(f func(pkts []rtcp.Packet)) { u.onWriteRTCP = f } -func (u *UptrackManager) OnSubscribedMaxQualityChange(f func(trackSid string, subscribedQualities []*livekit.SubscribedQuality) error) { +func (u *UptrackManager) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality) error) { u.onSubscribedMaxQualityChange = f } @@ -149,8 +149,8 @@ func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSu if params.AllTracks { tracks = u.GetPublishedTracks() } else { - for _, trackSid := range params.TrackSids { - track := u.getPublishedTrack(trackSid) + for _, trackID := range params.TrackSids { + track := u.getPublishedTrack(trackID) if track == nil { continue } @@ -169,10 +169,10 @@ func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSu n := 0 for _, track := range tracks { - trackSid := track.ID() + trackID := track.ID() subscriberID := sub.ID() - if !u.hasPermission(trackSid, subscriberID) { - u.maybeAddPendingSubscription(trackSid, sub) + if !u.hasPermission(trackID, subscriberID) { + u.maybeAddPendingSubscription(trackID, sub) continue } @@ -184,33 +184,33 @@ func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSu return n, nil } -func (u *UptrackManager) RemoveSubscriber(sub types.Participant, trackSid string) { +func (u *UptrackManager) RemoveSubscriber(sub types.Participant, trackID livekit.TrackID) { u.lock.Lock() defer u.lock.Unlock() - track := u.getPublishedTrack(trackSid) + track := u.getPublishedTrack(trackID) if track != nil { track.RemoveSubscriber(sub.ID()) } - u.maybeRemovePendingSubscription(trackSid, sub) + u.maybeRemovePendingSubscription(trackID, sub) } -func (u *UptrackManager) SetTrackMuted(trackSid string, muted bool) { +func (u *UptrackManager) SetTrackMuted(trackID livekit.TrackID, muted bool) { isPending := false u.lock.RLock() for _, ti := range u.pendingTracks { - if ti.Sid == trackSid { + if ti.Sid == trackID { ti.Muted = muted isPending = true } } - track := u.publishedTracks[trackSid] + track := u.publishedTracks[trackID] u.lock.RUnlock() if track == nil { if !isPending { - u.params.Logger.Warnw("could not locate track", nil, "track", trackSid) + u.params.Logger.Warnw("could not locate track", nil, "track", trackID) } return } @@ -219,7 +219,7 @@ func (u *UptrackManager) SetTrackMuted(trackSid string, muted bool) { if currentMuted != track.IsMuted() && u.onTrackUpdated != nil { u.params.Logger.Debugw("mute status changed", - "track", trackSid, + "track", trackID, "muted", track.IsMuted()) u.onTrackUpdated(track, false) } @@ -261,7 +261,7 @@ func (u *UptrackManager) GetConnectionQuality() (scores float64, numTracks int) return } -func (u *UptrackManager) GetPublishedTrack(sid string) types.PublishedTrack { +func (u *UptrackManager) GetPublishedTrack(sid livekit.TrackID) types.PublishedTrack { u.lock.RLock() defer u.lock.RUnlock() @@ -300,7 +300,7 @@ func (u *UptrackManager) GetDTX() bool { func (u *UptrackManager) UpdateSubscriptionPermissions( permissions *livekit.UpdateSubscriptionPermissions, - resolver func(participantSid string) types.Participant, + resolver func(participantID livekit.ParticipantID) types.Participant, ) error { u.lock.Lock() defer u.lock.Unlock() @@ -372,7 +372,7 @@ func (u *UptrackManager) MediaTrackReceived(track *webrtc.TrackRemote, rtpReceiv } // should be called with lock held -func (u *UptrackManager) getPublishedTrack(sid string) types.PublishedTrack { +func (u *UptrackManager) getPublishedTrack(sid livekit.TrackID) types.PublishedTrack { return u.publishedTracks[sid] } @@ -435,9 +435,9 @@ func (u *UptrackManager) handleTrackPublished(track types.PublishedTrack) { track.AddOnClose(func() { // cleanup u.lock.Lock() - trackSid := track.ID() - delete(u.publishedTracks, trackSid) - delete(u.pendingSubscriptions, trackSid) + trackID := track.ID() + delete(u.publishedTracks, trackID) + delete(u.pendingSubscriptions, trackID) // not modifying subscription permissions, will get reset on next update from participant // as rtcpCh handles RTCP for all published tracks, close only after all published tracks are closed @@ -467,13 +467,13 @@ func (u *UptrackManager) updateSubscriptionPermissions(permissions *livekit.Upda } // per participant permissions - u.subscriptionPermissions = make(map[string]*livekit.TrackPermission) + u.subscriptionPermissions = make(map[livekit.ParticipantID]*livekit.TrackPermission) for _, trackPerms := range permissions.TrackPermissions { u.subscriptionPermissions[trackPerms.ParticipantSid] = trackPerms } } -func (u *UptrackManager) hasPermission(trackSid string, subscriberID string) bool { +func (u *UptrackManager) hasPermission(trackID livekit.TrackID, subscriberID livekit.ParticipantID) bool { if u.subscriptionPermissions == nil { return true } @@ -488,7 +488,7 @@ func (u *UptrackManager) hasPermission(trackSid string, subscriberID string) boo } for _, sid := range perms.TrackSids { - if sid == trackSid { + if sid == trackID { return true } } @@ -496,7 +496,7 @@ func (u *UptrackManager) hasPermission(trackSid string, subscriberID string) boo return false } -func (u *UptrackManager) getAllowedSubscribers(trackSid string) []string { +func (u *UptrackManager) getAllowedSubscribers(trackID livekit.TrackID) []string { if u.subscriptionPermissions == nil { return nil } @@ -509,7 +509,7 @@ func (u *UptrackManager) getAllowedSubscribers(trackSid string) []string { } for _, sid := range perms.TrackSids { - if sid == trackSid { + if sid == trackID { allowed = append(allowed, subscriberID) break } @@ -519,10 +519,10 @@ func (u *UptrackManager) getAllowedSubscribers(trackSid string) []string { return allowed } -func (u *UptrackManager) maybeAddPendingSubscription(trackSid string, sub types.Participant) { +func (u *UptrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, sub types.Participant) { subscriberID := sub.ID() - pending := u.pendingSubscriptions[trackSid] + pending := u.pendingSubscriptions[trackID] for _, sid := range pending { if sid == subscriberID { // already pending @@ -530,36 +530,36 @@ func (u *UptrackManager) maybeAddPendingSubscription(trackSid string, sub types. } } - u.pendingSubscriptions[trackSid] = append(u.pendingSubscriptions[trackSid], subscriberID) - go sub.SubscriptionPermissionUpdate(u.params.SID, trackSid, false) + u.pendingSubscriptions[trackID] = append(u.pendingSubscriptions[trackID], subscriberID) + go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false) } -func (u *UptrackManager) maybeRemovePendingSubscription(trackSid string, sub types.Participant) { +func (u *UptrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.Participant) { subscriberID := sub.ID() - pending := u.pendingSubscriptions[trackSid] + pending := u.pendingSubscriptions[trackID] n := len(pending) for idx, sid := range pending { if sid == subscriberID { - u.pendingSubscriptions[trackSid][idx] = u.pendingSubscriptions[trackSid][n-1] - u.pendingSubscriptions[trackSid] = u.pendingSubscriptions[trackSid][:n-1] + u.pendingSubscriptions[trackID][idx] = u.pendingSubscriptions[trackID][n-1] + u.pendingSubscriptions[trackID] = u.pendingSubscriptions[trackID][:n-1] break } } - if len(u.pendingSubscriptions[trackSid]) == 0 { - delete(u.pendingSubscriptions, trackSid) + if len(u.pendingSubscriptions[trackID]) == 0 { + delete(u.pendingSubscriptions, trackID) } } -func (u *UptrackManager) processPendingSubscriptions(resolver func(participantSid string) types.Participant) { - updatedPendingSubscriptions := make(map[string][]string) - for trackSid, pending := range u.pendingSubscriptions { - track := u.getPublishedTrack(trackSid) +func (u *UptrackManager) processPendingSubscriptions(resolver func(participantID livekit.ParticipantID) types.Participant) { + updatedPendingSubscriptions := make(map[livekit.TrackID][]livekit.ParticipantID) + for trackID, pending := range u.pendingSubscriptions { + track := u.getPublishedTrack(trackID) if track == nil { continue } - var updatedPending []string + var updatedPending []livekit.ParticipantID for _, sid := range pending { var sub types.Participant if resolver != nil { @@ -570,7 +570,7 @@ func (u *UptrackManager) processPendingSubscriptions(resolver func(participantSi continue } - if !u.hasPermission(trackSid, sid) { + if !u.hasPermission(trackID, sid) { updatedPending = append(updatedPending, sid) continue } @@ -582,19 +582,19 @@ func (u *UptrackManager) processPendingSubscriptions(resolver func(participantSi continue } - go sub.SubscriptionPermissionUpdate(u.params.SID, trackSid, true) + go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true) } - updatedPendingSubscriptions[trackSid] = updatedPending + updatedPendingSubscriptions[trackID] = updatedPending } u.pendingSubscriptions = updatedPendingSubscriptions } -func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantSid string) types.Participant) { +func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantID livekit.ParticipantID) types.Participant) { for _, track := range u.publishedTracks { - trackSid := track.ID() - allowed := u.getAllowedSubscribers(trackSid) + trackID := track.ID() + allowed := u.getAllowedSubscribers(trackID) if allowed == nil { // no restrictions continue @@ -610,7 +610,7 @@ func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantSid s continue } - u.maybeAddPendingSubscription(trackSid, sub) + u.maybeAddPendingSubscription(trackID, sub) } } } @@ -654,11 +654,11 @@ func (u *UptrackManager) DebugInfo() map[string]interface{} { pendingTrackInfo := make(map[string]interface{}) u.lock.RLock() - for trackSid, track := range u.publishedTracks { + for trackID, track := range u.publishedTracks { if mt, ok := track.(*MediaTrack); ok { - publishedTrackInfo[trackSid] = mt.DebugInfo() + publishedTrackInfo[trackID] = mt.DebugInfo() } else { - publishedTrackInfo[trackSid] = map[string]interface{}{ + publishedTrackInfo[trackID] = map[string]interface{}{ "ID": track.ID(), "Kind": track.Kind().String(), "PubMuted": track.IsMuted(),