Use stable TrackID after unpublishing & republishing (#751)

Multiple parts of the system relies on a Track's ID (egress, telemetry, etc).
Track ID changes when a track was unpublished, then republished with the
exact same attributes. This PR would allow us to re-use a previously
unpublished Track ID
This commit is contained in:
David Zhao
2022-06-05 23:31:51 -07:00
committed by GitHub
parent 2927521b8b
commit c62def3fdf
2 changed files with 124 additions and 28 deletions
+55 -27
View File
@@ -14,11 +14,6 @@ import (
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -27,6 +22,10 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/twcc"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
const (
@@ -110,7 +109,8 @@ type ParticipantImpl struct {
// keeps track of disallowed tracks
disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID
// keep track of other publishers identities that we are subscribed to
subscribedTo sync.Map // livekit.ParticipantID => struct{}
subscribedTo sync.Map // livekit.ParticipantID => struct{}
unpublishedTracks []*livekit.TrackInfo
rttUpdatedAt time.Time
lastRTT uint32
@@ -548,7 +548,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
return
}
ti := p.addPendingTrack(req)
ti := p.addPendingTrackLocked(req)
if ti == nil {
return
}
@@ -1407,7 +1407,7 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID,
})
}
func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit.TrackInfo {
func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *livekit.TrackInfo {
if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil {
return nil
}
@@ -1420,27 +1420,9 @@ func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit
return nil
}
trackPrefix := utils.TrackPrefix
if req.Type == livekit.TrackType_VIDEO {
trackPrefix += "V"
} else if req.Type == livekit.TrackType_AUDIO {
trackPrefix += "A"
}
switch req.Source {
case livekit.TrackSource_CAMERA:
trackPrefix += "C"
case livekit.TrackSource_MICROPHONE:
trackPrefix += "M"
case livekit.TrackSource_SCREEN_SHARE:
trackPrefix += "S"
case livekit.TrackSource_SCREEN_SHARE_AUDIO:
trackPrefix += "s"
}
ti := &livekit.TrackInfo{
Type: req.Type,
Name: req.Name,
Sid: utils.NewGuid(trackPrefix),
Width: req.Width,
Height: req.Height,
Muted: req.Muted,
@@ -1448,6 +1430,7 @@ func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit
Source: req.Source,
Layers: req.Layers,
}
p.setStableTrackID(ti)
pendingInfo := &pendingTrackInfo{TrackInfo: ti}
for _, codec := range req.SimulcastCodecs {
mime := codec.Codec
@@ -1457,7 +1440,7 @@ func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit
mime = "audio/" + mime
}
ti.Codecs = append(ti.Codecs, &livekit.SimulcastCodecInfo{
MimeType: string(mime),
MimeType: mime,
Cid: codec.Cid,
})
}
@@ -1584,6 +1567,13 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
p.UpTrackManager.AddPublishedTrack(mt)
delete(p.pendingTracks, signalCid)
mt.AddOnClose(func() {
// re-use track
p.lock.Lock()
p.unpublishedTracks = append(p.unpublishedTracks, ti)
p.lock.Unlock()
})
newTrack = true
}
@@ -1676,6 +1666,44 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp
return signalCid, trackInfo.TrackInfo
}
// setStableTrackID either generates a new TrackID or reuses a previously used one
// for
func (p *ParticipantImpl) setStableTrackID(info *livekit.TrackInfo) {
var trackID string
for i, ti := range p.unpublishedTracks {
if ti.Type == info.Type && ti.Source == info.Source && ti.Name == info.Name {
trackID = ti.Sid
if i < len(p.unpublishedTracks)-1 {
p.unpublishedTracks = append(p.unpublishedTracks[:i], p.unpublishedTracks[i+1:]...)
} else {
p.unpublishedTracks = p.unpublishedTracks[:i]
}
break
}
}
// otherwise generate
if trackID == "" {
trackPrefix := utils.TrackPrefix
if info.Type == livekit.TrackType_VIDEO {
trackPrefix += "V"
} else if info.Type == livekit.TrackType_AUDIO {
trackPrefix += "A"
}
switch info.Source {
case livekit.TrackSource_CAMERA:
trackPrefix += "C"
case livekit.TrackSource_MICROPHONE:
trackPrefix += "M"
case livekit.TrackSource_SCREEN_SHARE:
trackPrefix += "S"
case livekit.TrackSource_SCREEN_SHARE_AUDIO:
trackPrefix += "s"
}
trackID = utils.NewGuid(trackPrefix)
}
info.Sid = trackID
}
func (p *ParticipantImpl) getPublishedTrackBySignalCid(clientId string) types.MediaTrack {
for _, publishedTrack := range p.GetPublishedTracks() {
if publishedTrack.(types.LocalMediaTrack).SignalCid() == clientId {
+69 -1
View File
@@ -222,7 +222,6 @@ func TestMuteSetting(t *testing.T) {
}
func TestConnectionQuality(t *testing.T) {
videoScore := func(totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams,
codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) float32 {
return connectionquality.VideoConnectionScore(1*time.Second, totalBytes, totalFrames, qualityParam, codec,
@@ -368,6 +367,75 @@ func TestSubscriberAsPrimary(t *testing.T) {
})
}
func TestSetStableTrackID(t *testing.T) {
testCases := []struct {
name string
trackInfo *livekit.TrackInfo
unpublished []*livekit.TrackInfo
prefix string
remainingUnpublished int
}{
{
name: "first track, generates new ID",
trackInfo: &livekit.TrackInfo{
Type: livekit.TrackType_VIDEO,
Source: livekit.TrackSource_CAMERA,
},
prefix: "TR_VC",
},
{
name: "re-using existing ID",
trackInfo: &livekit.TrackInfo{
Type: livekit.TrackType_VIDEO,
Source: livekit.TrackSource_CAMERA,
},
unpublished: []*livekit.TrackInfo{
{
Type: livekit.TrackType_VIDEO,
Source: livekit.TrackSource_SCREEN_SHARE,
Sid: "TR_VC1234",
},
{
Type: livekit.TrackType_VIDEO,
Source: livekit.TrackSource_CAMERA,
Sid: "TR_VC1235",
},
},
prefix: "TR_VC1235",
remainingUnpublished: 1,
},
{
name: "mismatch name for reuse",
trackInfo: &livekit.TrackInfo{
Type: livekit.TrackType_VIDEO,
Source: livekit.TrackSource_CAMERA,
Name: "new_name",
},
unpublished: []*livekit.TrackInfo{
{
Type: livekit.TrackType_VIDEO,
Source: livekit.TrackSource_CAMERA,
Sid: "TR_NotUsed",
},
},
prefix: "TR_VC",
remainingUnpublished: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := newParticipantForTest("test")
p.unpublishedTracks = tc.unpublished
ti := tc.trackInfo
p.setStableTrackID(ti)
require.Contains(t, ti.Sid, tc.prefix)
require.Len(t, p.unpublishedTracks, tc.remainingUnpublished)
})
}
}
type participantOpts struct {
permissions *livekit.ParticipantPermission
protocolVersion types.ProtocolVersion