diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 3c797d0ce..1bf80ecb5 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -14,11 +14,6 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" - "github.com/livekit/protocol/auth" - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -27,6 +22,10 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) const ( @@ -110,7 +109,8 @@ type ParticipantImpl struct { // keeps track of disallowed tracks disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID // trackID -> publisherID // keep track of other publishers identities that we are subscribed to - subscribedTo sync.Map // livekit.ParticipantID => struct{} + subscribedTo sync.Map // livekit.ParticipantID => struct{} + unpublishedTracks []*livekit.TrackInfo rttUpdatedAt time.Time lastRTT uint32 @@ -548,7 +548,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) { return } - ti := p.addPendingTrack(req) + ti := p.addPendingTrackLocked(req) if ti == nil { return } @@ -1407,7 +1407,7 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID, }) } -func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit.TrackInfo { +func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *livekit.TrackInfo { if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil { return nil } @@ -1420,27 +1420,9 @@ func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit return nil } - trackPrefix := utils.TrackPrefix - if req.Type == livekit.TrackType_VIDEO { - trackPrefix += "V" - } else if req.Type == livekit.TrackType_AUDIO { - trackPrefix += "A" - } - switch req.Source { - case livekit.TrackSource_CAMERA: - trackPrefix += "C" - case livekit.TrackSource_MICROPHONE: - trackPrefix += "M" - case livekit.TrackSource_SCREEN_SHARE: - trackPrefix += "S" - case livekit.TrackSource_SCREEN_SHARE_AUDIO: - trackPrefix += "s" - } - ti := &livekit.TrackInfo{ Type: req.Type, Name: req.Name, - Sid: utils.NewGuid(trackPrefix), Width: req.Width, Height: req.Height, Muted: req.Muted, @@ -1448,6 +1430,7 @@ func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit Source: req.Source, Layers: req.Layers, } + p.setStableTrackID(ti) pendingInfo := &pendingTrackInfo{TrackInfo: ti} for _, codec := range req.SimulcastCodecs { mime := codec.Codec @@ -1457,7 +1440,7 @@ func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit mime = "audio/" + mime } ti.Codecs = append(ti.Codecs, &livekit.SimulcastCodecInfo{ - MimeType: string(mime), + MimeType: mime, Cid: codec.Cid, }) } @@ -1584,6 +1567,13 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei p.UpTrackManager.AddPublishedTrack(mt) delete(p.pendingTracks, signalCid) + mt.AddOnClose(func() { + // re-use track + p.lock.Lock() + p.unpublishedTracks = append(p.unpublishedTracks, ti) + p.lock.Unlock() + }) + newTrack = true } @@ -1676,6 +1666,44 @@ func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackTyp return signalCid, trackInfo.TrackInfo } +// setStableTrackID either generates a new TrackID or reuses a previously used one +// for +func (p *ParticipantImpl) setStableTrackID(info *livekit.TrackInfo) { + var trackID string + for i, ti := range p.unpublishedTracks { + if ti.Type == info.Type && ti.Source == info.Source && ti.Name == info.Name { + trackID = ti.Sid + if i < len(p.unpublishedTracks)-1 { + p.unpublishedTracks = append(p.unpublishedTracks[:i], p.unpublishedTracks[i+1:]...) + } else { + p.unpublishedTracks = p.unpublishedTracks[:i] + } + break + } + } + // otherwise generate + if trackID == "" { + trackPrefix := utils.TrackPrefix + if info.Type == livekit.TrackType_VIDEO { + trackPrefix += "V" + } else if info.Type == livekit.TrackType_AUDIO { + trackPrefix += "A" + } + switch info.Source { + case livekit.TrackSource_CAMERA: + trackPrefix += "C" + case livekit.TrackSource_MICROPHONE: + trackPrefix += "M" + case livekit.TrackSource_SCREEN_SHARE: + trackPrefix += "S" + case livekit.TrackSource_SCREEN_SHARE_AUDIO: + trackPrefix += "s" + } + trackID = utils.NewGuid(trackPrefix) + } + info.Sid = trackID +} + func (p *ParticipantImpl) getPublishedTrackBySignalCid(clientId string) types.MediaTrack { for _, publishedTrack := range p.GetPublishedTracks() { if publishedTrack.(types.LocalMediaTrack).SignalCid() == clientId { diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index d18577ab3..e10ff2fb3 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -222,7 +222,6 @@ func TestMuteSetting(t *testing.T) { } func TestConnectionQuality(t *testing.T) { - videoScore := func(totalBytes int64, totalFrames int64, qualityParam *buffer.ConnectionQualityParams, codec string, expectedHeight int32, expectedWidth int32, actualHeight int32, actualWidth int32) float32 { return connectionquality.VideoConnectionScore(1*time.Second, totalBytes, totalFrames, qualityParam, codec, @@ -368,6 +367,75 @@ func TestSubscriberAsPrimary(t *testing.T) { }) } +func TestSetStableTrackID(t *testing.T) { + testCases := []struct { + name string + trackInfo *livekit.TrackInfo + unpublished []*livekit.TrackInfo + prefix string + remainingUnpublished int + }{ + { + name: "first track, generates new ID", + trackInfo: &livekit.TrackInfo{ + Type: livekit.TrackType_VIDEO, + Source: livekit.TrackSource_CAMERA, + }, + prefix: "TR_VC", + }, + { + name: "re-using existing ID", + trackInfo: &livekit.TrackInfo{ + Type: livekit.TrackType_VIDEO, + Source: livekit.TrackSource_CAMERA, + }, + unpublished: []*livekit.TrackInfo{ + { + Type: livekit.TrackType_VIDEO, + Source: livekit.TrackSource_SCREEN_SHARE, + Sid: "TR_VC1234", + }, + { + Type: livekit.TrackType_VIDEO, + Source: livekit.TrackSource_CAMERA, + Sid: "TR_VC1235", + }, + }, + prefix: "TR_VC1235", + remainingUnpublished: 1, + }, + { + name: "mismatch name for reuse", + trackInfo: &livekit.TrackInfo{ + Type: livekit.TrackType_VIDEO, + Source: livekit.TrackSource_CAMERA, + Name: "new_name", + }, + unpublished: []*livekit.TrackInfo{ + { + Type: livekit.TrackType_VIDEO, + Source: livekit.TrackSource_CAMERA, + Sid: "TR_NotUsed", + }, + }, + prefix: "TR_VC", + remainingUnpublished: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := newParticipantForTest("test") + p.unpublishedTracks = tc.unpublished + + ti := tc.trackInfo + p.setStableTrackID(ti) + require.Contains(t, ti.Sid, tc.prefix) + require.Len(t, p.unpublishedTracks, tc.remainingUnpublished) + }) + } +} + type participantOpts struct { permissions *livekit.ParticipantPermission protocolVersion types.ProtocolVersion