Beginnings of typing for various ids. (#287)

* Beginnings of typing for various ids.

* trackSid/TrackSid -> trackID/TrackID

* update protocol

* Initial livekit.ParticipantID use
This commit is contained in:
Raja Subramanian
2021-12-29 14:46:32 +05:30
committed by GitHub
parent 2209edce20
commit dc385f5d24
3 changed files with 59 additions and 59 deletions
+1 -1
View File
@@ -14,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.11.5
github.com/livekit/protocol v0.11.6
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
+2 -2
View File
@@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.11.5 h1:1wArim3zmGgfvawXAs7Mo6Vu+xkW6z4q2EL2Gbr0A/c=
github.com/livekit/protocol v0.11.5/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/livekit/protocol v0.11.6 h1:F6WGwK/sDm7SfcDUln+6YlwmmWJT7gm6bXS8u6RmcyM=
github.com/livekit/protocol v0.11.6/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
+56 -56
View File
@@ -36,13 +36,13 @@ type UptrackManager struct {
twcc *twcc.Responder
// publishedTracks that participant is publishing
publishedTracks map[string]types.PublishedTrack
publishedTracks map[livekit.TrackID]types.PublishedTrack
// client intended to publish, yet to be reconciled
pendingTracks map[string]*livekit.TrackInfo
// keeps track of subscriptions that are awaiting permissions
subscriptionPermissions map[string]*livekit.TrackPermission // subscriberID => *livekit.TrackPermission
subscriptionPermissions map[livekit.ParticipantID]*livekit.TrackPermission // subscriberID => *livekit.TrackPermission
// keeps tracks of track specific subscribers who are awaiting permission
pendingSubscriptions map[string][]string // trackSid => []subscriberID
pendingSubscriptions map[livekit.TrackID][]livekit.ParticipantID // trackID => []subscriberID
lock sync.RWMutex
@@ -50,7 +50,7 @@ type UptrackManager struct {
onTrackPublished func(track types.PublishedTrack)
onTrackUpdated func(track types.PublishedTrack, onlyIfReady bool)
onWriteRTCP func(pkts []rtcp.Packet)
onSubscribedMaxQualityChange func(trackSid string, subscribedQualities []*livekit.SubscribedQuality) error
onSubscribedMaxQualityChange func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality) error
}
func NewUptrackManager(params UptrackManagerParams) *UptrackManager {
@@ -58,9 +58,9 @@ func NewUptrackManager(params UptrackManagerParams) *UptrackManager {
params: params,
rtcpCh: make(chan []rtcp.Packet, 50),
pliThrottle: newPLIThrottle(params.ThrottleConfig),
publishedTracks: make(map[string]types.PublishedTrack, 0),
publishedTracks: make(map[livekit.TrackID]types.PublishedTrack, 0),
pendingTracks: make(map[string]*livekit.TrackInfo),
pendingSubscriptions: make(map[string][]string),
pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantID),
}
}
@@ -108,7 +108,7 @@ func (u *UptrackManager) OnWriteRTCP(f func(pkts []rtcp.Packet)) {
u.onWriteRTCP = f
}
func (u *UptrackManager) OnSubscribedMaxQualityChange(f func(trackSid string, subscribedQualities []*livekit.SubscribedQuality) error) {
func (u *UptrackManager) OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality) error) {
u.onSubscribedMaxQualityChange = f
}
@@ -149,8 +149,8 @@ func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSu
if params.AllTracks {
tracks = u.GetPublishedTracks()
} else {
for _, trackSid := range params.TrackSids {
track := u.getPublishedTrack(trackSid)
for _, trackID := range params.TrackSids {
track := u.getPublishedTrack(trackID)
if track == nil {
continue
}
@@ -169,10 +169,10 @@ func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSu
n := 0
for _, track := range tracks {
trackSid := track.ID()
trackID := track.ID()
subscriberID := sub.ID()
if !u.hasPermission(trackSid, subscriberID) {
u.maybeAddPendingSubscription(trackSid, sub)
if !u.hasPermission(trackID, subscriberID) {
u.maybeAddPendingSubscription(trackID, sub)
continue
}
@@ -184,33 +184,33 @@ func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSu
return n, nil
}
func (u *UptrackManager) RemoveSubscriber(sub types.Participant, trackSid string) {
func (u *UptrackManager) RemoveSubscriber(sub types.Participant, trackID livekit.TrackID) {
u.lock.Lock()
defer u.lock.Unlock()
track := u.getPublishedTrack(trackSid)
track := u.getPublishedTrack(trackID)
if track != nil {
track.RemoveSubscriber(sub.ID())
}
u.maybeRemovePendingSubscription(trackSid, sub)
u.maybeRemovePendingSubscription(trackID, sub)
}
func (u *UptrackManager) SetTrackMuted(trackSid string, muted bool) {
func (u *UptrackManager) SetTrackMuted(trackID livekit.TrackID, muted bool) {
isPending := false
u.lock.RLock()
for _, ti := range u.pendingTracks {
if ti.Sid == trackSid {
if ti.Sid == trackID {
ti.Muted = muted
isPending = true
}
}
track := u.publishedTracks[trackSid]
track := u.publishedTracks[trackID]
u.lock.RUnlock()
if track == nil {
if !isPending {
u.params.Logger.Warnw("could not locate track", nil, "track", trackSid)
u.params.Logger.Warnw("could not locate track", nil, "track", trackID)
}
return
}
@@ -219,7 +219,7 @@ func (u *UptrackManager) SetTrackMuted(trackSid string, muted bool) {
if currentMuted != track.IsMuted() && u.onTrackUpdated != nil {
u.params.Logger.Debugw("mute status changed",
"track", trackSid,
"track", trackID,
"muted", track.IsMuted())
u.onTrackUpdated(track, false)
}
@@ -261,7 +261,7 @@ func (u *UptrackManager) GetConnectionQuality() (scores float64, numTracks int)
return
}
func (u *UptrackManager) GetPublishedTrack(sid string) types.PublishedTrack {
func (u *UptrackManager) GetPublishedTrack(sid livekit.TrackID) types.PublishedTrack {
u.lock.RLock()
defer u.lock.RUnlock()
@@ -300,7 +300,7 @@ func (u *UptrackManager) GetDTX() bool {
func (u *UptrackManager) UpdateSubscriptionPermissions(
permissions *livekit.UpdateSubscriptionPermissions,
resolver func(participantSid string) types.Participant,
resolver func(participantID livekit.ParticipantID) types.Participant,
) error {
u.lock.Lock()
defer u.lock.Unlock()
@@ -372,7 +372,7 @@ func (u *UptrackManager) MediaTrackReceived(track *webrtc.TrackRemote, rtpReceiv
}
// should be called with lock held
func (u *UptrackManager) getPublishedTrack(sid string) types.PublishedTrack {
func (u *UptrackManager) getPublishedTrack(sid livekit.TrackID) types.PublishedTrack {
return u.publishedTracks[sid]
}
@@ -435,9 +435,9 @@ func (u *UptrackManager) handleTrackPublished(track types.PublishedTrack) {
track.AddOnClose(func() {
// cleanup
u.lock.Lock()
trackSid := track.ID()
delete(u.publishedTracks, trackSid)
delete(u.pendingSubscriptions, trackSid)
trackID := track.ID()
delete(u.publishedTracks, trackID)
delete(u.pendingSubscriptions, trackID)
// not modifying subscription permissions, will get reset on next update from participant
// as rtcpCh handles RTCP for all published tracks, close only after all published tracks are closed
@@ -467,13 +467,13 @@ func (u *UptrackManager) updateSubscriptionPermissions(permissions *livekit.Upda
}
// per participant permissions
u.subscriptionPermissions = make(map[string]*livekit.TrackPermission)
u.subscriptionPermissions = make(map[livekit.ParticipantID]*livekit.TrackPermission)
for _, trackPerms := range permissions.TrackPermissions {
u.subscriptionPermissions[trackPerms.ParticipantSid] = trackPerms
}
}
func (u *UptrackManager) hasPermission(trackSid string, subscriberID string) bool {
func (u *UptrackManager) hasPermission(trackID livekit.TrackID, subscriberID livekit.ParticipantID) bool {
if u.subscriptionPermissions == nil {
return true
}
@@ -488,7 +488,7 @@ func (u *UptrackManager) hasPermission(trackSid string, subscriberID string) boo
}
for _, sid := range perms.TrackSids {
if sid == trackSid {
if sid == trackID {
return true
}
}
@@ -496,7 +496,7 @@ func (u *UptrackManager) hasPermission(trackSid string, subscriberID string) boo
return false
}
func (u *UptrackManager) getAllowedSubscribers(trackSid string) []string {
func (u *UptrackManager) getAllowedSubscribers(trackID livekit.TrackID) []string {
if u.subscriptionPermissions == nil {
return nil
}
@@ -509,7 +509,7 @@ func (u *UptrackManager) getAllowedSubscribers(trackSid string) []string {
}
for _, sid := range perms.TrackSids {
if sid == trackSid {
if sid == trackID {
allowed = append(allowed, subscriberID)
break
}
@@ -519,10 +519,10 @@ func (u *UptrackManager) getAllowedSubscribers(trackSid string) []string {
return allowed
}
func (u *UptrackManager) maybeAddPendingSubscription(trackSid string, sub types.Participant) {
func (u *UptrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, sub types.Participant) {
subscriberID := sub.ID()
pending := u.pendingSubscriptions[trackSid]
pending := u.pendingSubscriptions[trackID]
for _, sid := range pending {
if sid == subscriberID {
// already pending
@@ -530,36 +530,36 @@ func (u *UptrackManager) maybeAddPendingSubscription(trackSid string, sub types.
}
}
u.pendingSubscriptions[trackSid] = append(u.pendingSubscriptions[trackSid], subscriberID)
go sub.SubscriptionPermissionUpdate(u.params.SID, trackSid, false)
u.pendingSubscriptions[trackID] = append(u.pendingSubscriptions[trackID], subscriberID)
go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false)
}
func (u *UptrackManager) maybeRemovePendingSubscription(trackSid string, sub types.Participant) {
func (u *UptrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.Participant) {
subscriberID := sub.ID()
pending := u.pendingSubscriptions[trackSid]
pending := u.pendingSubscriptions[trackID]
n := len(pending)
for idx, sid := range pending {
if sid == subscriberID {
u.pendingSubscriptions[trackSid][idx] = u.pendingSubscriptions[trackSid][n-1]
u.pendingSubscriptions[trackSid] = u.pendingSubscriptions[trackSid][:n-1]
u.pendingSubscriptions[trackID][idx] = u.pendingSubscriptions[trackID][n-1]
u.pendingSubscriptions[trackID] = u.pendingSubscriptions[trackID][:n-1]
break
}
}
if len(u.pendingSubscriptions[trackSid]) == 0 {
delete(u.pendingSubscriptions, trackSid)
if len(u.pendingSubscriptions[trackID]) == 0 {
delete(u.pendingSubscriptions, trackID)
}
}
func (u *UptrackManager) processPendingSubscriptions(resolver func(participantSid string) types.Participant) {
updatedPendingSubscriptions := make(map[string][]string)
for trackSid, pending := range u.pendingSubscriptions {
track := u.getPublishedTrack(trackSid)
func (u *UptrackManager) processPendingSubscriptions(resolver func(participantID livekit.ParticipantID) types.Participant) {
updatedPendingSubscriptions := make(map[livekit.TrackID][]livekit.ParticipantID)
for trackID, pending := range u.pendingSubscriptions {
track := u.getPublishedTrack(trackID)
if track == nil {
continue
}
var updatedPending []string
var updatedPending []livekit.ParticipantID
for _, sid := range pending {
var sub types.Participant
if resolver != nil {
@@ -570,7 +570,7 @@ func (u *UptrackManager) processPendingSubscriptions(resolver func(participantSi
continue
}
if !u.hasPermission(trackSid, sid) {
if !u.hasPermission(trackID, sid) {
updatedPending = append(updatedPending, sid)
continue
}
@@ -582,19 +582,19 @@ func (u *UptrackManager) processPendingSubscriptions(resolver func(participantSi
continue
}
go sub.SubscriptionPermissionUpdate(u.params.SID, trackSid, true)
go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, true)
}
updatedPendingSubscriptions[trackSid] = updatedPending
updatedPendingSubscriptions[trackID] = updatedPending
}
u.pendingSubscriptions = updatedPendingSubscriptions
}
func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantSid string) types.Participant) {
func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantID livekit.ParticipantID) types.Participant) {
for _, track := range u.publishedTracks {
trackSid := track.ID()
allowed := u.getAllowedSubscribers(trackSid)
trackID := track.ID()
allowed := u.getAllowedSubscribers(trackID)
if allowed == nil {
// no restrictions
continue
@@ -610,7 +610,7 @@ func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantSid s
continue
}
u.maybeAddPendingSubscription(trackSid, sub)
u.maybeAddPendingSubscription(trackID, sub)
}
}
}
@@ -654,11 +654,11 @@ func (u *UptrackManager) DebugInfo() map[string]interface{} {
pendingTrackInfo := make(map[string]interface{})
u.lock.RLock()
for trackSid, track := range u.publishedTracks {
for trackID, track := range u.publishedTracks {
if mt, ok := track.(*MediaTrack); ok {
publishedTrackInfo[trackSid] = mt.DebugInfo()
publishedTrackInfo[trackID] = mt.DebugInfo()
} else {
publishedTrackInfo[trackSid] = map[string]interface{}{
publishedTrackInfo[trackID] = map[string]interface{}{
"ID": track.ID(),
"Kind": track.Kind().String(),
"PubMuted": track.IsMuted(),