Remove stubs (#331)

* WIP commit

* Use LocalParticipant interface

* Remove no-op local participant

* Remove localparticipant

* Remove no-op stubs

* Consolidate PublishedTrack into MediaTrack
This commit is contained in:
Raja Subramanian
2022-01-12 00:11:48 +05:30
committed by GitHub
parent c2ba26eee6
commit e67db84b3b
24 changed files with 3304 additions and 5790 deletions
+6 -6
View File
@@ -8,8 +8,8 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
)
func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool) *typesfakes.FakeParticipant {
p := &typesfakes.FakeParticipant{}
func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool) *typesfakes.FakeLocalParticipant {
p := &typesfakes.FakeLocalParticipant{}
p.IDReturns(livekit.ParticipantID(utils.NewGuid(utils.ParticipantPrefix)))
p.IdentityReturns(identity)
p.StateReturns(livekit.ParticipantInfo_JOINED)
@@ -20,7 +20,7 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro
p.HiddenReturns(hidden)
p.SetMetadataStub = func(m string) {
var f func(participant types.Participant)
var f func(participant types.LocalParticipant)
if p.OnMetadataUpdateCallCount() > 0 {
f = p.OnMetadataUpdateArgsForCall(p.OnMetadataUpdateCallCount() - 1)
}
@@ -29,7 +29,7 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro
}
}
updateTrack := func() {
var f func(participant types.Participant, track types.PublishedTrack)
var f func(participant types.LocalParticipant, track types.MediaTrack)
if p.OnTrackUpdatedCallCount() > 0 {
f = p.OnTrackUpdatedArgsForCall(p.OnTrackUpdatedCallCount() - 1)
}
@@ -48,8 +48,8 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro
return p
}
func newMockTrack(kind livekit.TrackType, name string) *typesfakes.FakePublishedTrack {
t := &typesfakes.FakePublishedTrack{}
func newMockTrack(kind livekit.TrackType, name string) *typesfakes.FakeMediaTrack {
t := &typesfakes.FakeMediaTrack{}
t.IDReturns(livekit.TrackID(utils.NewGuid(utils.TrackPrefix)))
t.KindReturns(kind)
t.NameReturns(name)
-403
View File
@@ -1,403 +0,0 @@
package rtc
import (
"errors"
"sync"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/twcc"
"github.com/livekit/livekit-server/pkg/telemetry"
)
type pendingTrackInfo struct {
*livekit.TrackInfo
migrated bool
}
type LocalParticipantParams struct {
Identity livekit.ParticipantIdentity
SID livekit.ParticipantID
Config *WebRTCConfig
AudioConfig config.AudioConfig
Telemetry telemetry.TelemetryService
ThrottleConfig config.PLIThrottleConfig
Logger logger.Logger
SimTracks map[uint32]SimulcastTrackInfo
}
type LocalParticipant struct {
params LocalParticipantParams
rtcpCh chan []rtcp.Packet
pliThrottle *pliThrottle
// hold reference for MediaTrack
twcc *twcc.Responder
// client intended to publish, yet to be reconciled
pendingTracksLock sync.RWMutex
pendingTracks map[string]*pendingTrackInfo
*UptrackManager
// callbacks & handlers
onWriteRTCP func(pkts []rtcp.Packet)
onTrackPublished func(track types.PublishedTrack)
}
func NewLocalParticipant(params LocalParticipantParams) *LocalParticipant {
l := &LocalParticipant{
params: params,
rtcpCh: make(chan []rtcp.Packet, 50),
pliThrottle: newPLIThrottle(params.ThrottleConfig),
pendingTracks: make(map[string]*pendingTrackInfo),
}
l.setupUptrackManager()
return l
}
func (l *LocalParticipant) Start() {
l.UptrackManager.Start()
go l.rtcpSendWorker()
}
func (l *LocalParticipant) Close() {
l.UptrackManager.Close()
l.pendingTracksLock.Lock()
l.pendingTracks = make(map[string]*pendingTrackInfo)
l.pendingTracksLock.Unlock()
}
func (l *LocalParticipant) OnWriteRTCP(f func(pkts []rtcp.Packet)) {
l.onWriteRTCP = f
}
func (l *LocalParticipant) OnTrackPublished(f func(track types.PublishedTrack)) {
l.onTrackPublished = f
}
// AddTrack is called when client intends to publish track.
// records track details and lets client know it's ok to proceed
func (l *LocalParticipant) AddTrack(req *livekit.AddTrackRequest) *livekit.TrackInfo {
l.pendingTracksLock.Lock()
defer l.pendingTracksLock.Unlock()
// if track is already published, reject
if l.pendingTracks[req.Cid] != nil {
return nil
}
if l.UptrackManager.GetPublishedTrackBySignalCidOrSdpCid(req.Cid) != nil {
return nil
}
ti := &livekit.TrackInfo{
Type: req.Type,
Name: req.Name,
Sid: utils.NewGuid(utils.TrackPrefix),
Width: req.Width,
Height: req.Height,
Muted: req.Muted,
DisableDtx: req.DisableDtx,
Source: req.Source,
Layers: req.Layers,
}
l.pendingTracks[req.Cid] = &pendingTrackInfo{TrackInfo: ti}
return ti
}
func (l *LocalParticipant) AddMigratedTrack(cid string, ti *livekit.TrackInfo) {
l.pendingTracksLock.Lock()
defer l.pendingTracksLock.Unlock()
l.pendingTracks[cid] = &pendingTrackInfo{ti, true}
}
func (l *LocalParticipant) SetTrackMuted(trackID livekit.TrackID, muted bool) {
track := l.UptrackManager.SetPublishedTrackMuted(trackID, muted)
if track != nil {
// handled in UptrackManager for a published track, no need to update state of pending track
return
}
isPending := false
l.pendingTracksLock.RLock()
for _, ti := range l.pendingTracks {
if livekit.TrackID(ti.Sid) == trackID {
ti.Muted = muted
isPending = true
break
}
}
l.pendingTracksLock.RUnlock()
if !isPending {
l.params.Logger.Warnw("could not locate track", nil, "track", trackID)
}
}
func (l *LocalParticipant) GetAudioLevel() (level uint8, active bool) {
level = SilentAudioLevel
for _, pt := range l.UptrackManager.GetPublishedTracks() {
tl, ta := pt.GetAudioLevel()
if ta {
active = true
if tl < level {
level = tl
}
}
}
return
}
func (l *LocalParticipant) GetConnectionQuality() (scores float64, numTracks int) {
for _, pt := range l.UptrackManager.GetPublishedTracks() {
if pt.IsMuted() {
continue
}
scores += pt.GetConnectionScore()
numTracks++
}
return
}
func (l *LocalParticipant) GetDTX() bool {
l.pendingTracksLock.RLock()
defer l.pendingTracksLock.RUnlock()
//
// Although DTX is set per track, there are cases where
// pending track has to be looked up by kind. This happens
// when clients change track id between signalling and SDP.
// In that case, look at all pending tracks by kind and
// enable DTX even if one has it enabled.
//
// Most of the time in practice, there is going to be one
// audio kind track and hence this is fine.
//
for _, ti := range l.pendingTracks {
if ti.Type == livekit.TrackType_AUDIO {
if !ti.TrackInfo.DisableDtx {
return true
}
}
}
return false
}
func (l *LocalParticipant) MediaTrackReceived(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver, mid string) (types.PublishedTrack, bool) {
l.pendingTracksLock.Lock()
newTrack := false
// use existing mediatrack to handle simulcast
mt, ok := l.UptrackManager.GetPublishedTrackBySdpCid(track.ID()).(*MediaTrack)
if !ok {
signalCid, ti := l.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()))
if ti == nil {
l.pendingTracksLock.Unlock()
return nil, false
}
ti.MimeType = track.Codec().MimeType
ti.Mid = mid
mt = NewMediaTrack(track, MediaTrackParams{
TrackInfo: ti,
SignalCid: signalCid,
SdpCid: track.ID(),
ParticipantID: l.params.SID,
ParticipantIdentity: l.params.Identity,
RTCPChan: l.rtcpCh,
BufferFactory: l.params.Config.BufferFactory,
ReceiverConfig: l.params.Config.Receiver,
AudioConfig: l.params.AudioConfig,
Telemetry: l.params.Telemetry,
Logger: l.params.Logger,
SubscriberConfig: l.params.Config.Subscriber,
})
for ssrc, info := range l.params.SimTracks {
if info.Mid == mid {
mt.TrySetSimulcastSSRC(uint8(sfu.RidToLayer(info.Rid)), ssrc)
}
}
// add to published and clean up pending
l.UptrackManager.AddPublishedTrack(mt)
delete(l.pendingTracks, signalCid)
newTrack = true
}
ssrc := uint32(track.SSRC())
l.pliThrottle.addTrack(ssrc, track.RID())
if l.twcc == nil {
l.twcc = twcc.NewTransportWideCCResponder(ssrc)
l.twcc.OnFeedback(func(pkt rtcp.RawPacket) {
if l.onWriteRTCP != nil {
l.onWriteRTCP([]rtcp.Packet{&pkt})
}
})
}
l.pendingTracksLock.Unlock()
mt.AddReceiver(rtpReceiver, track, l.twcc)
if newTrack {
l.handleTrackPublished(mt)
}
return mt, newTrack
}
func (l *LocalParticipant) handleTrackPublished(track types.PublishedTrack) {
if l.onTrackPublished != nil {
l.onTrackPublished(track)
}
}
func (l *LocalParticipant) UpdateSubscribedQuality(nodeID string, trackID livekit.TrackID, maxQuality livekit.VideoQuality) error {
track := l.UptrackManager.GetPublishedTrack(trackID)
if track == nil {
l.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find track")
}
if mt, ok := track.(*MediaTrack); ok {
mt.NotifySubscriberNodeMaxQuality(nodeID, maxQuality)
}
return nil
}
func (l *LocalParticipant) UpdateMediaLoss(nodeID string, trackID livekit.TrackID, fractionalLoss uint32) error {
track := l.UptrackManager.GetPublishedTrack(trackID)
if track == nil {
l.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find track")
}
if mt, ok := track.(*MediaTrack); ok {
mt.NotifySubscriberNodeMediaLoss(nodeID, uint8(fractionalLoss))
}
return nil
}
func (l *LocalParticipant) HasPendingMigratedTrack() bool {
l.pendingTracksLock.RLock()
defer l.pendingTracksLock.RUnlock()
for _, t := range l.pendingTracks {
if t.migrated {
return true
}
}
return false
}
func (l *LocalParticipant) DebugInfo() map[string]interface{} {
info := map[string]interface{}{}
pendingTrackInfo := make(map[string]interface{})
l.pendingTracksLock.RLock()
for clientID, ti := range l.pendingTracks {
pendingTrackInfo[clientID] = map[string]interface{}{
"Sid": ti.Sid,
"Type": ti.Type.String(),
"Simulcast": ti.Simulcast,
}
}
l.pendingTracksLock.RUnlock()
info["PendingTracks"] = pendingTrackInfo
info["UptrackManager"] = l.UptrackManager.DebugInfo()
return info
}
func (l *LocalParticipant) setupUptrackManager() {
l.UptrackManager = NewUptrackManager(UptrackManagerParams{
SID: l.params.SID,
Logger: l.params.Logger,
})
l.UptrackManager.OnUptrackManagerClose(l.onUptrackManagerClose)
}
func (l *LocalParticipant) onUptrackManagerClose() {
close(l.rtcpCh)
}
func (l *LocalParticipant) getPendingTrack(clientId string, kind livekit.TrackType) (string, *livekit.TrackInfo) {
signalCid := clientId
trackInfo := l.pendingTracks[clientId]
if trackInfo == nil {
//
// If no match on client id, find first one matching type
// as MediaStreamTrack can change client id when transceiver
// is added to peer connection.
//
for cid, ti := range l.pendingTracks {
if ti.Type == kind {
trackInfo = ti
signalCid = cid
break
}
}
}
// if still not found, we are done
if trackInfo == nil {
l.params.Logger.Errorw("track info not published prior to track", nil, "clientId", clientId)
}
return signalCid, trackInfo.TrackInfo
}
func (l *LocalParticipant) rtcpSendWorker() {
defer Recover()
// read from rtcpChan
for pkts := range l.rtcpCh {
if pkts == nil {
return
}
fwdPkts := make([]rtcp.Packet, 0, len(pkts))
for _, pkt := range pkts {
switch pkt.(type) {
case *rtcp.PictureLossIndication:
mediaSSRC := pkt.(*rtcp.PictureLossIndication).MediaSSRC
if l.pliThrottle.canSend(mediaSSRC) {
fwdPkts = append(fwdPkts, pkt)
}
case *rtcp.FullIntraRequest:
mediaSSRC := pkt.(*rtcp.FullIntraRequest).MediaSSRC
if l.pliThrottle.canSend(mediaSSRC) {
fwdPkts = append(fwdPkts, pkt)
}
default:
fwdPkts = append(fwdPkts, pkt)
}
}
if len(fwdPkts) > 0 && l.onWriteRTCP != nil {
l.onWriteRTCP(fwdPkts)
}
}
}
+1 -1
View File
@@ -162,7 +162,7 @@ func (t *MediaTrackReceiver) AddOnClose(f func()) {
}
// AddSubscriber subscribes sub to current mediaTrack
func (t *MediaTrackReceiver) AddSubscriber(sub types.Participant) error {
func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) error {
t.lock.Lock()
defer t.lock.Unlock()
+2 -2
View File
@@ -92,7 +92,7 @@ func (t *MediaTrackSubscriptions) IsSubscriber(subID livekit.ParticipantID) bool
}
// AddSubscriber subscribes sub to current mediaTrack
func (t *MediaTrackSubscriptions) AddSubscriber(sub types.Participant, codec webrtc.RTPCodecCapability, wr WrappedReceiver) (*sfu.DownTrack, error) {
func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, codec webrtc.RTPCodecCapability, wr WrappedReceiver) (*sfu.DownTrack, error) {
subscriberID := sub.ID()
t.subscribedTracksMu.Lock()
@@ -324,7 +324,7 @@ func (t *MediaTrackSubscriptions) getSubscribedTrack(subscriberID livekit.Partic
// TODO: send for all downtracks from the source participant
// https://tools.ietf.org/html/rfc7941
func (t *MediaTrackSubscriptions) sendDownTrackBindingReports(sub types.Participant) {
func (t *MediaTrackSubscriptions) sendDownTrackBindingReports(sub types.LocalParticipant) {
var sd []rtcp.SourceDescriptionChunk
subTrack := t.getSubscribedTrack(sub.ID())
-7
View File
@@ -1,7 +0,0 @@
package rtc
type NoOpLocalMediaTrack struct {
}
func (t *NoOpLocalMediaTrack) NotifySubscriberNodeMediaLoss(_nodeID string, _fractionalLoss uint8) {
}
-190
View File
@@ -1,190 +0,0 @@
package rtc
import (
"errors"
"time"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/protocol/livekit"
"github.com/pion/webrtc/v3"
)
var (
ErrUnimplemented = errors.New("unimplemented")
)
type NoOpLocalParticipant struct {
}
func (p *NoOpLocalParticipant) ProtocolVersion() types.ProtocolVersion {
return 0
}
func (p *NoOpLocalParticipant) ConnectedAt() time.Time {
return time.Time{}
}
func (p *NoOpLocalParticipant) State() livekit.ParticipantInfo_State {
return livekit.ParticipantInfo_DISCONNECTED
}
func (p *NoOpLocalParticipant) IsReady() bool {
return false
}
func (p *NoOpLocalParticipant) IsRecorder() bool {
return false
}
func (p *NoOpLocalParticipant) SubscriberAsPrimary() bool {
return false
}
func (p *NoOpLocalParticipant) GetResponseSink() routing.MessageSink {
return nil
}
func (p *NoOpLocalParticipant) SetResponseSink(_sink routing.MessageSink) {
}
func (p *NoOpLocalParticipant) SetPermission(_permission *livekit.ParticipantPermission) {
}
func (p *NoOpLocalParticipant) CanPublish() bool {
return false
}
func (p *NoOpLocalParticipant) CanSubscribe() bool {
return false
}
func (p *NoOpLocalParticipant) CanPublishData() bool {
return false
}
func (p *NoOpLocalParticipant) AddICECandidate(_candidate webrtc.ICECandidateInit, _target livekit.SignalTarget) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) HandleOffer(_sdp webrtc.SessionDescription) (webrtc.SessionDescription, error) {
return webrtc.SessionDescription{}, ErrUnimplemented
}
func (p *NoOpLocalParticipant) AddTrack(_req *livekit.AddTrackRequest) {
}
func (p *NoOpLocalParticipant) SetTrackMuted(_trackID livekit.TrackID, _muted bool, _fromAdmin bool) {
}
func (p *NoOpLocalParticipant) SubscriberMediaEngine() *webrtc.MediaEngine {
return nil
}
func (p *NoOpLocalParticipant) SubscriberPC() *webrtc.PeerConnection {
return nil
}
func (p *NoOpLocalParticipant) HandleAnswer(_sdp webrtc.SessionDescription) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) Negotiate() {
}
func (p *NoOpLocalParticipant) ICERestart() error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) AddSubscribedTrack(_subTrack types.SubscribedTrack) {
}
func (p *NoOpLocalParticipant) RemoveSubscribedTrack(_subTrack types.SubscribedTrack) {
}
func (p *NoOpLocalParticipant) GetSubscribedTrack(_sid livekit.TrackID) types.SubscribedTrack {
return nil
}
func (p *NoOpLocalParticipant) GetSubscribedTracks() []types.SubscribedTrack {
return nil
}
func (p *NoOpLocalParticipant) GetSubscribedParticipants() []livekit.ParticipantID {
return nil
}
func (t *NoOpLocalParticipant) GetAudioLevel() (level uint8, active bool) {
return SilentAudioLevel, false
}
func (p *NoOpLocalParticipant) GetConnectionQuality() *livekit.ConnectionQualityInfo {
return &livekit.ConnectionQualityInfo{}
}
func (p *NoOpLocalParticipant) SendJoinResponse(
_roomInfo *livekit.Room,
_otherParticipants []*livekit.ParticipantInfo,
_iceServers []*livekit.ICEServer,
) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) SendParticipantUpdate(_participantsToUpdate []*livekit.ParticipantInfo, updatedAt time.Time) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) SendSpeakerUpdate(_speakers []*livekit.SpeakerInfo) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) SendDataPacket(_dp *livekit.DataPacket) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) SendRoomUpdate(_room *livekit.Room) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) SendConnectionQualityUpdate(_update *livekit.ConnectionQualityUpdate) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) SubscriptionPermissionUpdate(_publisherID livekit.ParticipantID, _trackID livekit.TrackID, _allowed bool) {
}
func (p *NoOpLocalParticipant) OnStateChange(_callback func(p types.Participant, oldState livekit.ParticipantInfo_State)) {
}
func (p *NoOpLocalParticipant) OnTrackUpdated(_callback func(types.Participant, types.PublishedTrack)) {
}
func (p *NoOpLocalParticipant) OnMetadataUpdate(_callback func(types.Participant)) {
}
func (p *NoOpLocalParticipant) OnDataPacket(_callback func(types.Participant, *livekit.DataPacket)) {
}
func (p *NoOpLocalParticipant) OnClose(_callback func(types.Participant, map[livekit.TrackID]livekit.ParticipantID)) {
}
func (p *NoOpLocalParticipant) UpdateSubscribedQuality(_nodeID string, _trackID livekit.TrackID, _maxQuality livekit.VideoQuality) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) UpdateMediaLoss(_nodeID string, _trackID livekit.TrackID, _fractionalLoss uint32) error {
return ErrUnimplemented
}
func (p *NoOpLocalParticipant) SetMigrateState(_s types.MigrateState) {
}
func (p *NoOpLocalParticipant) MigrateState() types.MigrateState {
return types.MigrateStateInit
}
func (p *NoOpLocalParticipant) AddMigratedTrack(cid string, ti *livekit.TrackInfo) {
}
func (p *NoOpLocalParticipant) SetPreviousAnswer(_previous *webrtc.SessionDescription) {
}
-20
View File
@@ -1,20 +0,0 @@
package rtc
type NoOpLocalPublishedTrack struct {
}
func (t *NoOpLocalPublishedTrack) GetAudioLevel() (level uint8, active bool) {
return SilentAudioLevel, false
}
func (t *NoOpLocalPublishedTrack) GetConnectionScore() float64 {
return 0
}
func (t *NoOpLocalPublishedTrack) SdpCid() (sdpCid string) {
return
}
func (t *NoOpLocalPublishedTrack) SignalCid() (signalCid string) {
return
}
+391 -70
View File
@@ -9,6 +9,7 @@ import (
"time"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
"github.com/livekit/livekit-server/pkg/sfu/twcc"
lru "github.com/hashicorp/golang-lru"
"github.com/livekit/protocol/livekit"
@@ -34,6 +35,11 @@ const (
sdBatchSize = 20
)
type pendingTrackInfo struct {
*livekit.TrackInfo
migrated bool
}
type ParticipantParams struct {
Identity livekit.ParticipantIdentity
Name livekit.ParticipantName
@@ -73,7 +79,17 @@ type ParticipantImpl struct {
// JSON encoded metadata to pass to clients
metadata string
*LocalParticipant
rtcpCh chan []rtcp.Packet
pliThrottle *pliThrottle
// hold reference for MediaTrack
twcc *twcc.Responder
// client intended to publish, yet to be reconciled
pendingTracksLock sync.RWMutex
pendingTracks map[string]*pendingTrackInfo
*UptrackManager
// tracks the current participant is subscribed to, map of sid => DownTrack
subscribedTracks map[livekit.TrackID]types.SubscribedTrack
@@ -87,15 +103,15 @@ type ParticipantImpl struct {
updateLock sync.Mutex
// callbacks & handlers
onTrackPublished func(types.Participant, types.PublishedTrack)
onTrackUpdated func(types.Participant, types.PublishedTrack)
onStateChange func(p types.Participant, oldState livekit.ParticipantInfo_State)
onMetadataUpdate func(types.Participant)
onDataPacket func(types.Participant, *livekit.DataPacket)
onTrackPublished func(types.LocalParticipant, types.MediaTrack)
onTrackUpdated func(types.LocalParticipant, types.MediaTrack)
onStateChange func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)
onMetadataUpdate func(types.LocalParticipant)
onDataPacket func(types.LocalParticipant, *livekit.DataPacket)
migrateState atomic.Value // types.MigrateState
pendingOffer *webrtc.SessionDescription
onClose func(types.Participant, map[livekit.TrackID]livekit.ParticipantID)
onClose func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)
}
func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
@@ -103,6 +119,9 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p := &ParticipantImpl{
params: params,
rtcpCh: make(chan []rtcp.Packet, 50),
pliThrottle: newPLIThrottle(params.ThrottleConfig),
pendingTracks: make(map[string]*pendingTrackInfo),
subscribedTracks: make(map[livekit.TrackID]types.SubscribedTrack),
disallowedSubscriptions: make(map[livekit.TrackID]livekit.ParticipantID),
connectedAt: time.Now(),
@@ -185,7 +204,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
p.subscriber.OnStreamStateChange(p.onStreamStateChange)
p.setupLocalParticipant()
p.setupUptrackManager()
return p, nil
}
@@ -239,7 +258,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
Hidden: p.Hidden(),
Recorder: p.IsRecorder(),
}
info.Tracks = p.LocalParticipant.ToProto()
info.Tracks = p.UptrackManager.ToProto()
return info
}
@@ -258,27 +277,27 @@ func (p *ParticipantImpl) SubscriberMediaEngine() *webrtc.MediaEngine {
// callbacks for clients
func (p *ParticipantImpl) OnTrackPublished(callback func(types.Participant, types.PublishedTrack)) {
func (p *ParticipantImpl) OnTrackPublished(callback func(types.LocalParticipant, types.MediaTrack)) {
p.onTrackPublished = callback
}
func (p *ParticipantImpl) OnStateChange(callback func(p types.Participant, oldState livekit.ParticipantInfo_State)) {
func (p *ParticipantImpl) OnStateChange(callback func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State)) {
p.onStateChange = callback
}
func (p *ParticipantImpl) OnTrackUpdated(callback func(types.Participant, types.PublishedTrack)) {
func (p *ParticipantImpl) OnTrackUpdated(callback func(types.LocalParticipant, types.MediaTrack)) {
p.onTrackUpdated = callback
}
func (p *ParticipantImpl) OnMetadataUpdate(callback func(types.Participant)) {
func (p *ParticipantImpl) OnMetadataUpdate(callback func(types.LocalParticipant)) {
p.onMetadataUpdate = callback
}
func (p *ParticipantImpl) OnDataPacket(callback func(types.Participant, *livekit.DataPacket)) {
func (p *ParticipantImpl) OnDataPacket(callback func(types.LocalParticipant, *livekit.DataPacket)) {
p.onDataPacket = callback
}
func (p *ParticipantImpl) OnClose(callback func(types.Participant, map[livekit.TrackID]livekit.ParticipantID)) {
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant, map[livekit.TrackID]livekit.ParticipantID)) {
p.onClose = callback
}
@@ -336,10 +355,6 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
return
}
func (p *ParticipantImpl) AddMigratedTrack(cid string, ti *livekit.TrackInfo) {
p.LocalParticipant.AddMigratedTrack(cid, ti)
}
// AddTrack is called when client intends to publish track.
// records track details and lets client know it's ok to proceed
func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
@@ -351,7 +366,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
return
}
ti := p.LocalParticipant.AddTrack(req)
ti := p.addPendingTrack(req)
if ti == nil {
return
}
@@ -366,6 +381,13 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
})
}
func (p *ParticipantImpl) AddMigratedTrack(cid string, ti *livekit.TrackInfo) {
p.pendingTracksLock.Lock()
defer p.pendingTracksLock.Unlock()
p.pendingTracks[cid] = &pendingTrackInfo{ti, true}
}
// HandleAnswer handles a client answer response, with subscriber PC, server initiates the
// offer and client answers
func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
@@ -394,7 +416,8 @@ func (p *ParticipantImpl) AddICECandidate(candidate webrtc.ICECandidateInit, tar
func (p *ParticipantImpl) Start() {
p.once.Do(func() {
p.LocalParticipant.Start()
p.UptrackManager.Start()
go p.rtcpSendWorker()
go p.downTracksRTCPWorker()
})
}
@@ -414,7 +437,11 @@ func (p *ParticipantImpl) Close(sendLeave bool) error {
})
}
p.LocalParticipant.Close()
p.UptrackManager.Close()
p.pendingTracksLock.Lock()
p.pendingTracks = make(map[string]*pendingTrackInfo)
p.pendingTracksLock.Unlock()
p.lock.Lock()
disallowedSubscriptions := make(map[livekit.TrackID]livekit.ParticipantID)
@@ -469,7 +496,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {
var pendingOffer *webrtc.SessionDescription
p.migrateState.Store(s)
if s == types.MigrateStateSync {
if !p.LocalParticipant.HasPendingMigratedTrack() {
if !p.hasPendingMigratedTrack() {
p.migrateState.Store(types.MigrateStateComplete)
}
pendingOffer = p.pendingOffer
@@ -634,12 +661,26 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro
})
}
p.LocalParticipant.SetTrackMuted(trackID, muted)
p.setTrackMuted(trackID, muted)
}
func (p *ParticipantImpl) GetAudioLevel() (level uint8, active bool) {
level = SilentAudioLevel
for _, pt := range p.GetPublishedTracks() {
tl, ta := pt.(types.LocalMediaTrack).GetAudioLevel()
if ta {
active = true
if tl < level {
level = tl
}
}
}
return
}
func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo {
// avg loss across all tracks, weigh published the same as subscribed
scores, numTracks := p.LocalParticipant.GetConnectionQuality()
scores, numTracks := p.getPublisherConnectionQuality()
p.lock.RLock()
for _, subTrack := range p.subscribedTracks {
@@ -665,11 +706,6 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
}
}
func (p *ParticipantImpl) isSubscribedTo(participantID livekit.ParticipantID) bool {
_, ok := p.subscribedTo.Load(participantID)
return ok
}
func (p *ParticipantImpl) GetSubscribedParticipants() []livekit.ParticipantID {
var participantIDs []livekit.ParticipantID
p.subscribedTo.Range(func(key, _ interface{}) bool {
@@ -810,29 +846,41 @@ func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.Parti
}
}
func (p *ParticipantImpl) setupLocalParticipant() {
p.LocalParticipant = NewLocalParticipant(LocalParticipantParams{
Identity: p.params.Identity,
SID: p.params.SID,
Config: p.params.Config,
AudioConfig: p.params.AudioConfig,
Telemetry: p.params.Telemetry,
ThrottleConfig: p.params.ThrottleConfig,
Logger: p.params.Logger,
SimTracks: p.params.SimTracks,
func (p *ParticipantImpl) UpdateSubscribedQuality(nodeID string, trackID livekit.TrackID, maxQuality livekit.VideoQuality) error {
track := p.GetPublishedTrack(trackID)
if track == nil {
p.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find track")
}
if mt, ok := track.(*MediaTrack); ok {
mt.NotifySubscriberNodeMaxQuality(nodeID, maxQuality)
}
return nil
}
func (p *ParticipantImpl) UpdateMediaLoss(nodeID string, trackID livekit.TrackID, fractionalLoss uint32) error {
track := p.GetPublishedTrack(trackID)
if track == nil {
p.params.Logger.Warnw("could not find track", nil, "trackID", trackID)
return errors.New("could not find track")
}
if mt, ok := track.(*MediaTrack); ok {
mt.NotifySubscriberNodeMediaLoss(nodeID, uint8(fractionalLoss))
}
return nil
}
func (p *ParticipantImpl) setupUptrackManager() {
p.UptrackManager = NewUptrackManager(UptrackManagerParams{
SID: p.params.SID,
Logger: p.params.Logger,
})
p.LocalParticipant.OnTrackPublished(func(track types.PublishedTrack) {
if !p.LocalParticipant.HasPendingMigratedTrack() {
p.SetMigrateState(types.MigrateStateComplete)
}
if p.onTrackPublished != nil {
p.onTrackPublished(p, track)
}
})
p.LocalParticipant.OnPublishedTrackUpdated(func(track types.PublishedTrack, onlyIfReady bool) {
p.UptrackManager.OnPublishedTrackUpdated(func(track types.MediaTrack, onlyIfReady bool) {
if onlyIfReady && !p.IsReady() {
return
}
@@ -842,13 +890,9 @@ func (p *ParticipantImpl) setupLocalParticipant() {
}
})
p.LocalParticipant.OnWriteRTCP(func(pkts []rtcp.Packet) {
if err := p.publisher.pc.WriteRTCP(pkts); err != nil {
p.params.Logger.Errorw("could not write RTCP to participant", err)
}
})
p.UptrackManager.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange)
p.LocalParticipant.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange)
p.UptrackManager.OnUptrackManagerClose(p.onUptrackManagerClose)
}
func (p *ParticipantImpl) sendIceCandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) {
@@ -939,15 +983,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
return
}
var mid string
for _, tr := range p.publisher.pc.GetTransceivers() {
if tr.Receiver() == rtpReceiver {
mid = tr.Mid()
break
}
}
publishedTrack, isNewTrack := p.LocalParticipant.MediaTrackReceived(track, rtpReceiver, mid)
publishedTrack, isNewTrack := p.mediaTrackReceived(track, rtpReceiver)
if !isNewTrack && publishedTrack != nil && p.IsReady() && p.onTrackUpdated != nil {
p.onTrackUpdated(p, publishedTrack)
}
@@ -1108,7 +1144,7 @@ func (p *ParticipantImpl) configureReceiverDTX() {
// multiple audio tracks. At that point, there might be a need to
// rely on something like order of tracks. TODO
//
enableDTX := p.LocalParticipant.GetDTX()
enableDTX := p.getDTX()
transceivers := p.publisher.pc.GetTransceivers()
for _, transceiver := range transceivers {
if transceiver.Kind() != webrtc.RTPCodecTypeAudio {
@@ -1156,6 +1192,11 @@ func (p *ParticipantImpl) configureReceiverDTX() {
}
}
func (p *ParticipantImpl) isSubscribedTo(participantID livekit.ParticipantID) bool {
_, ok := p.subscribedTo.Load(participantID)
return ok
}
func (p *ParticipantImpl) onStreamStateChange(update *sfu.StreamStateUpdate) error {
if len(update.StreamStates) == 0 {
return nil
@@ -1198,13 +1239,295 @@ func (p *ParticipantImpl) onSubscribedMaxQualityChange(trackID livekit.TrackID,
})
}
func (p *ParticipantImpl) addPendingTrack(req *livekit.AddTrackRequest) *livekit.TrackInfo {
if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil {
return nil
}
p.pendingTracksLock.Lock()
defer p.pendingTracksLock.Unlock()
// if track is already published, reject
if p.pendingTracks[req.Cid] != nil {
return nil
}
ti := &livekit.TrackInfo{
Type: req.Type,
Name: req.Name,
Sid: utils.NewGuid(utils.TrackPrefix),
Width: req.Width,
Height: req.Height,
Muted: req.Muted,
DisableDtx: req.DisableDtx,
Source: req.Source,
Layers: req.Layers,
}
p.pendingTracks[req.Cid] = &pendingTrackInfo{TrackInfo: ti}
return ti
}
func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) {
track := p.UptrackManager.SetPublishedTrackMuted(trackID, muted)
if track != nil {
// handled in UptrackManager for a published track, no need to update state of pending track
return
}
isPending := false
p.pendingTracksLock.RLock()
for _, ti := range p.pendingTracks {
if livekit.TrackID(ti.Sid) == trackID {
ti.Muted = muted
isPending = true
break
}
}
p.pendingTracksLock.RUnlock()
if !isPending {
p.params.Logger.Warnw("could not locate track", nil, "track", trackID)
}
}
func (p *ParticipantImpl) getPublisherConnectionQuality() (scores float64, numTracks int) {
for _, pt := range p.GetPublishedTracks() {
if pt.IsMuted() {
continue
}
scores += pt.(types.LocalMediaTrack).GetConnectionScore()
numTracks++
}
return
}
func (p *ParticipantImpl) getDTX() bool {
p.pendingTracksLock.RLock()
defer p.pendingTracksLock.RUnlock()
//
// Although DTX is set per track, there are cases where
// pending track has to be looked up by kind. This happens
// when clients change track id between signalling and SDP.
// In that case, look at all pending tracks by kind and
// enable DTX even if one has it enabled.
//
// Most of the time in practice, there is going to be one
// audio kind track and hence this is fine.
//
for _, ti := range p.pendingTracks {
if ti.Type == livekit.TrackType_AUDIO {
if !ti.TrackInfo.DisableDtx {
return true
}
}
}
return false
}
func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) (types.MediaTrack, bool) {
p.pendingTracksLock.Lock()
newTrack := false
// use existing mediatrack to handle simulcast
mt, ok := p.getPublishedTrackBySdpCid(track.ID()).(*MediaTrack)
if !ok {
signalCid, ti := p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()))
if ti == nil {
p.pendingTracksLock.Unlock()
return nil, false
}
ti.MimeType = track.Codec().MimeType
var mid string
for _, tr := range p.publisher.pc.GetTransceivers() {
if tr.Receiver() == rtpReceiver {
mid = tr.Mid()
break
}
}
ti.Mid = mid
mt = NewMediaTrack(track, MediaTrackParams{
TrackInfo: ti,
SignalCid: signalCid,
SdpCid: track.ID(),
ParticipantID: p.params.SID,
ParticipantIdentity: p.params.Identity,
RTCPChan: p.rtcpCh,
BufferFactory: p.params.Config.BufferFactory,
ReceiverConfig: p.params.Config.Receiver,
AudioConfig: p.params.AudioConfig,
Telemetry: p.params.Telemetry,
Logger: p.params.Logger,
SubscriberConfig: p.params.Config.Subscriber,
})
for ssrc, info := range p.params.SimTracks {
if info.Mid == mid {
mt.TrySetSimulcastSSRC(uint8(sfu.RidToLayer(info.Rid)), ssrc)
}
}
// add to published and clean up pending
p.UptrackManager.AddPublishedTrack(mt)
delete(p.pendingTracks, signalCid)
newTrack = true
}
ssrc := uint32(track.SSRC())
p.pliThrottle.addTrack(ssrc, track.RID())
if p.twcc == nil {
p.twcc = twcc.NewTransportWideCCResponder(ssrc)
p.twcc.OnFeedback(func(pkt rtcp.RawPacket) {
if err := p.publisher.pc.WriteRTCP([]rtcp.Packet{&pkt}); err != nil {
p.params.Logger.Errorw("could not write RTCP to participant", err)
}
})
}
p.pendingTracksLock.Unlock()
mt.AddReceiver(rtpReceiver, track, p.twcc)
if newTrack {
p.handleTrackPublished(mt)
}
return mt, newTrack
}
func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack) {
if !p.hasPendingMigratedTrack() {
p.SetMigrateState(types.MigrateStateComplete)
}
if p.onTrackPublished != nil {
p.onTrackPublished(p, track)
}
}
func (p *ParticipantImpl) hasPendingMigratedTrack() bool {
p.pendingTracksLock.RLock()
defer p.pendingTracksLock.RUnlock()
for _, t := range p.pendingTracks {
if t.migrated {
return true
}
}
return false
}
func (p *ParticipantImpl) onUptrackManagerClose() {
close(p.rtcpCh)
}
func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackType) (string, *livekit.TrackInfo) {
signalCid := clientId
trackInfo := p.pendingTracks[clientId]
if trackInfo == nil {
//
// If no match on client id, find first one matching type
// as MediaStreamTrack can change client id when transceiver
// is added to peer connection.
//
for cid, ti := range p.pendingTracks {
if ti.Type == kind {
trackInfo = ti
signalCid = cid
break
}
}
}
// if still not found, we are done
if trackInfo == nil {
p.params.Logger.Errorw("track info not published prior to track", nil, "clientId", clientId)
}
return signalCid, trackInfo.TrackInfo
}
func (p *ParticipantImpl) getPublishedTrackBySignalCid(clientId string) types.MediaTrack {
for _, publishedTrack := range p.GetPublishedTracks() {
if publishedTrack.(types.LocalMediaTrack).SignalCid() == clientId {
return publishedTrack
}
}
return nil
}
func (p *ParticipantImpl) getPublishedTrackBySdpCid(clientId string) types.MediaTrack {
for _, publishedTrack := range p.GetPublishedTracks() {
if publishedTrack.(types.LocalMediaTrack).SdpCid() == clientId {
return publishedTrack
}
}
return nil
}
func (p *ParticipantImpl) rtcpSendWorker() {
defer Recover()
// read from rtcpChan
for pkts := range p.rtcpCh {
if pkts == nil {
return
}
fwdPkts := make([]rtcp.Packet, 0, len(pkts))
for _, pkt := range pkts {
switch pkt.(type) {
case *rtcp.PictureLossIndication:
mediaSSRC := pkt.(*rtcp.PictureLossIndication).MediaSSRC
if p.pliThrottle.canSend(mediaSSRC) {
fwdPkts = append(fwdPkts, pkt)
}
case *rtcp.FullIntraRequest:
mediaSSRC := pkt.(*rtcp.FullIntraRequest).MediaSSRC
if p.pliThrottle.canSend(mediaSSRC) {
fwdPkts = append(fwdPkts, pkt)
}
default:
fwdPkts = append(fwdPkts, pkt)
}
}
if len(fwdPkts) > 0 {
if err := p.publisher.pc.WriteRTCP(fwdPkts); err != nil {
p.params.Logger.Errorw("could not write RTCP to participant", err)
}
}
}
}
func (p *ParticipantImpl) DebugInfo() map[string]interface{} {
info := map[string]interface{}{
"ID": p.params.SID,
"State": p.State().String(),
}
localParticipantInfo := p.LocalParticipant.DebugInfo()
pendingTrackInfo := make(map[string]interface{})
p.pendingTracksLock.RLock()
for clientID, ti := range p.pendingTracks {
pendingTrackInfo[clientID] = map[string]interface{}{
"Sid": ti.Sid,
"Type": ti.Type.String(),
"Simulcast": ti.Simulcast,
}
}
p.pendingTracksLock.RUnlock()
info["PendingTracks"] = pendingTrackInfo
info["UptrackManager"] = p.UptrackManager.DebugInfo()
subscribedTrackInfo := make(map[livekit.TrackID]interface{})
p.lock.RLock()
@@ -1214,8 +1537,6 @@ func (p *ParticipantImpl) DebugInfo() map[string]interface{} {
subscribedTrackInfo[track.ID()] = dt
}
p.lock.RUnlock()
info["LocalParticipant"] = localParticipantInfo
info["SubscribedTracks"] = subscribedTrackInfo
return info
+17 -17
View File
@@ -52,7 +52,7 @@ func TestICEStateChange(t *testing.T) {
t.Run("onClose gets called when ICE disconnected", func(t *testing.T) {
p := newParticipantForTest("test")
closeChan := make(chan struct{})
p.onClose = func(participant types.Participant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
p.onClose = func(participant types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
close(closeChan)
}
p.handlePrimaryStateChange(webrtc.PeerConnectionStateFailed)
@@ -70,18 +70,18 @@ func TestTrackPublishing(t *testing.T) {
t.Run("should send the correct events", func(t *testing.T) {
p := newParticipantForTest("test")
p.state.Store(livekit.ParticipantInfo_ACTIVE)
track := &typesfakes.FakePublishedTrack{}
track := &typesfakes.FakeMediaTrack{}
track.IDReturns("id")
published := false
updated := false
p.OnTrackUpdated(func(p types.Participant, track types.PublishedTrack) {
p.OnTrackUpdated(func(p types.LocalParticipant, track types.MediaTrack) {
updated = true
})
p.OnTrackPublished(func(p types.Participant, track types.PublishedTrack) {
p.OnTrackPublished(func(p types.LocalParticipant, track types.MediaTrack) {
published = true
})
p.LocalParticipant.AddPublishedTrack(track)
p.LocalParticipant.handleTrackPublished(track)
p.UptrackManager.AddPublishedTrack(track)
p.handleTrackPublished(track)
require.True(t, published)
require.False(t, updated)
@@ -134,7 +134,7 @@ func TestTrackPublishing(t *testing.T) {
p := newParticipantForTest("test")
sink := p.params.Sink.(*routingfakes.FakeMessageSink)
track := &typesfakes.FakePublishedTrack{}
track := &typesfakes.FakeLocalMediaTrack{}
track.SignalCidReturns("cid")
// directly add to publishedTracks without lock - for testing purpose only
p.UptrackManager.publishedTracks["cid"] = track
@@ -151,7 +151,7 @@ func TestTrackPublishing(t *testing.T) {
p := newParticipantForTest("test")
sink := p.params.Sink.(*routingfakes.FakeMessageSink)
track := &typesfakes.FakePublishedTrack{}
track := &typesfakes.FakeLocalMediaTrack{}
track.SdpCidReturns("cid")
// directly add to publishedTracks without lock - for testing purpose only
p.UptrackManager.publishedTracks["cid"] = track
@@ -202,9 +202,9 @@ func TestDisconnectTiming(t *testing.T) {
t.Log("received message from chan", msg)
}
}()
track := &typesfakes.FakePublishedTrack{}
p.LocalParticipant.AddPublishedTrack(track)
p.LocalParticipant.handleTrackPublished(track)
track := &typesfakes.FakeMediaTrack{}
p.UptrackManager.AddPublishedTrack(track)
p.handleTrackPublished(track)
// close channel and then try to Negotiate
msg.Close()
@@ -222,7 +222,7 @@ func TestMuteSetting(t *testing.T) {
t.Run("can set mute when track is pending", func(t *testing.T) {
p := newParticipantForTest("test")
ti := &livekit.TrackInfo{Sid: "testTrack"}
p.LocalParticipant.pendingTracks["cid"] = &pendingTrackInfo{TrackInfo: ti}
p.pendingTracks["cid"] = &pendingTrackInfo{TrackInfo: ti}
p.SetTrackMuted(livekit.TrackID(ti.Sid), true, false)
require.True(t, ti.Muted)
@@ -236,7 +236,7 @@ func TestMuteSetting(t *testing.T) {
Muted: true,
})
_, ti := p.LocalParticipant.getPendingTrack("cid", livekit.TrackType_AUDIO)
_, ti := p.getPendingTrack("cid", livekit.TrackType_AUDIO)
require.NotNil(t, ti)
require.True(t, ti.Muted)
})
@@ -253,16 +253,16 @@ func TestConnectionQuality(t *testing.T) {
return connectionquality.Loss2Score(loss, reducedQuality)
}
testPublishedVideoTrack := func(loss, numPublishing, numRegistered uint32) *typesfakes.FakePublishedTrack {
tr := &typesfakes.FakePublishedTrack{}
testPublishedVideoTrack := func(loss, numPublishing, numRegistered uint32) *typesfakes.FakeLocalMediaTrack {
tr := &typesfakes.FakeLocalMediaTrack{}
score := videoScore(loss, numPublishing, numRegistered)
t.Log("video score: ", score)
tr.GetConnectionScoreReturns(score)
return tr
}
testPublishedAudioTrack := func(totalPackets, packetsLost uint32) *typesfakes.FakePublishedTrack {
tr := &typesfakes.FakePublishedTrack{}
testPublishedAudioTrack := func(totalPackets, packetsLost uint32) *typesfakes.FakeLocalMediaTrack {
tr := &typesfakes.FakeLocalMediaTrack{}
stat := &connectionquality.ConnectionStat{
PacketsLost: packetsLost,
+24 -24
View File
@@ -37,7 +37,7 @@ type Room struct {
telemetry telemetry.TelemetryService
// map of identity -> Participant
participants map[livekit.ParticipantIdentity]types.Participant
participants map[livekit.ParticipantIdentity]types.LocalParticipant
participantOpts map[livekit.ParticipantIdentity]*ParticipantOptions
bufferFactory *buffer.Factory
@@ -48,7 +48,7 @@ type Room struct {
closed chan struct{}
closeOnce sync.Once
onParticipantChanged func(p types.Participant)
onParticipantChanged func(p types.LocalParticipant)
onMetadataUpdate func(metadata string)
onClose func()
}
@@ -64,7 +64,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC
config: config,
audioConfig: audioConfig,
telemetry: telemetry,
participants: make(map[livekit.ParticipantIdentity]types.Participant),
participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant),
participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions),
bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize, logr.Logger{}),
closed: make(chan struct{}),
@@ -86,13 +86,13 @@ func (r *Room) Name() livekit.RoomName {
return livekit.RoomName(r.Room.Name)
}
func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.Participant {
func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.LocalParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
return r.participants[identity]
}
func (r *Room) GetParticipantBySid(participantID livekit.ParticipantID) types.Participant {
func (r *Room) GetParticipantBySid(participantID livekit.ParticipantID) types.LocalParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -105,10 +105,10 @@ func (r *Room) GetParticipantBySid(participantID livekit.ParticipantID) types.Pa
return nil
}
func (r *Room) GetParticipants() []types.Participant {
func (r *Room) GetParticipants() []types.LocalParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
participants := make([]types.Participant, 0, len(r.participants))
participants := make([]types.LocalParticipant, 0, len(r.participants))
for _, p := range r.participants {
participants = append(participants, p)
}
@@ -156,7 +156,7 @@ func (r *Room) LastLeftAt() int64 {
return 0
}
func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error {
func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error {
if r.IsClosed() {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1)
return ErrRoomClosed
@@ -184,7 +184,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
// it's important to set this before connection, we don't want to miss out on any publishedTracks
participant.OnTrackPublished(r.onTrackPublished)
participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) {
participant.OnStateChange(func(p types.LocalParticipant, oldState livekit.ParticipantInfo_State) {
r.Logger.Debugw("participant state changed",
"state", p.State(),
"participant", p.Identity(),
@@ -262,7 +262,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
return nil
}
func (r *Room) ResumeParticipant(p types.Participant, responseSink routing.MessageSink) error {
func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing.MessageSink) error {
// close previous sink, and link to new one
if prevSink := p.GetResponseSink(); prevSink != nil {
prevSink.Close()
@@ -338,7 +338,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity) {
}
func (r *Room) UpdateSubscriptions(
participant types.Participant,
participant types.LocalParticipant,
trackIDs []livekit.TrackID,
participantTracks []*livekit.ParticipantTracks,
subscribe bool,
@@ -379,7 +379,7 @@ func (r *Room) UpdateSubscriptions(
return nil
}
func (r *Room) SyncState(participant types.Participant, state *livekit.SyncState) error {
func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.SyncState) error {
return nil
}
@@ -387,7 +387,7 @@ func (r *Room) UpdateSubscriptionPermissions(participant types.Participant, perm
return participant.UpdateSubscriptionPermissions(permissions, r.GetParticipantBySid)
}
func (r *Room) RemoveDisallowedSubscriptions(sub types.Participant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
func (r *Room) RemoveDisallowedSubscriptions(sub types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
for trackID, publisherID := range disallowedSubscriptions {
pub := r.GetParticipantBySid(publisherID)
if pub == nil {
@@ -461,7 +461,7 @@ func (r *Room) OnClose(f func()) {
r.onClose = f
}
func (r *Room) OnParticipantChanged(f func(participant types.Participant)) {
func (r *Room) OnParticipantChanged(f func(participant types.LocalParticipant)) {
r.onParticipantChanged = f
}
@@ -505,7 +505,7 @@ func (r *Room) OnMetadataUpdate(f func(metadata string)) {
r.onMetadataUpdate = f
}
func (r *Room) SimulateScenario(participant types.Participant, simulateScenario *livekit.SimulateScenario) error {
func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error {
switch scenario := simulateScenario.Scenario.(type) {
case *livekit.SimulateScenario_SpeakerUpdate:
r.Logger.Infow("simulating speaker update", "participant", participant.Identity())
@@ -534,7 +534,7 @@ func (r *Room) SimulateScenario(participant types.Participant, simulateScenario
}
// checks if participant should be autosubscribed to new tracks, assumes lock is already acquired
func (r *Room) autoSubscribe(participant types.Participant) bool {
func (r *Room) autoSubscribe(participant types.LocalParticipant) bool {
if !participant.CanSubscribe() {
return false
}
@@ -548,14 +548,14 @@ func (r *Room) autoSubscribe(participant types.Participant) bool {
}
// a ParticipantImpl in the room added a new remoteTrack, subscribe other participants to it
func (r *Room) onTrackPublished(participant types.Participant, track types.PublishedTrack) {
func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) {
// publish participant update, since track state is changed
r.broadcastParticipantState(participant, true)
r.lock.RLock()
defer r.lock.RUnlock()
// subscribe all existing participants to this PublishedTrack
// subscribe all existing participants to this MediaTrack
for _, existingParticipant := range r.participants {
if existingParticipant == participant {
// skip publishing participant
@@ -586,7 +586,7 @@ func (r *Room) onTrackPublished(participant types.Participant, track types.Publi
}
}
func (r *Room) onTrackUpdated(p types.Participant, _ types.PublishedTrack) {
func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) {
// send track updates to everyone, especially if track was updated by admin
r.broadcastParticipantState(p, false)
if r.onParticipantChanged != nil {
@@ -594,14 +594,14 @@ func (r *Room) onTrackUpdated(p types.Participant, _ types.PublishedTrack) {
}
}
func (r *Room) onParticipantMetadataUpdate(p types.Participant) {
func (r *Room) onParticipantMetadataUpdate(p types.LocalParticipant) {
r.broadcastParticipantState(p, false)
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
}
}
func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) {
func (r *Room) onDataPacket(source types.LocalParticipant, dp *livekit.DataPacket) {
// don't forward if source isn't allowed to publish data
if source != nil && !source.CanPublishData() {
return
@@ -631,7 +631,7 @@ func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) {
}
}
func (r *Room) subscribeToExistingTracks(p types.Participant) {
func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) {
r.lock.RLock()
shouldSubscribe := r.autoSubscribe(p)
r.lock.RUnlock()
@@ -662,14 +662,14 @@ func (r *Room) subscribeToExistingTracks(p types.Participant) {
}
// broadcast an update about participant p
func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
func (r *Room) broadcastParticipantState(p types.LocalParticipant, skipSource bool) {
//
// This is a critical section to ensure that participant update time and
// the corresponding data are paired properly.
//
r.lock.Lock()
updatedAt := time.Now()
updates := ToProtoParticipants([]types.Participant{p})
updates := ToProtoParticipants([]types.LocalParticipant{p})
r.lock.Unlock()
if p.Hidden() {
+31 -31
View File
@@ -92,7 +92,7 @@ func TestRoomJoin(t *testing.T) {
if p == op {
continue
}
mockP := op.(*typesfakes.FakeParticipant)
mockP := op.(*typesfakes.FakeLocalParticipant)
require.NotZero(t, mockP.AddSubscriberCallCount())
// last call should be to add the newest participant
sub, params := mockP.AddSubscriberArgsForCall(mockP.AddSubscriberCallCount() - 1)
@@ -104,12 +104,12 @@ func TestRoomJoin(t *testing.T) {
t.Run("participant state change is broadcasted to others", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants})
var changedParticipant types.Participant
rm.OnParticipantChanged(func(participant types.Participant) {
rm.OnParticipantChanged(func(participant types.LocalParticipant) {
changedParticipant = participant
})
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
disconnectedParticipant := participants[1].(*typesfakes.FakeParticipant)
p := participants[0].(*typesfakes.FakeLocalParticipant)
disconnectedParticipant := participants[1].(*typesfakes.FakeLocalParticipant)
disconnectedParticipant.StateReturns(livekit.ParticipantInfo_DISCONNECTED)
rm.RemoveParticipant(p.Identity())
@@ -123,7 +123,7 @@ func TestRoomJoin(t *testing.T) {
require.Zero(t, p.SendParticipantUpdateCallCount())
continue
}
fakeP := op.(*typesfakes.FakeParticipant)
fakeP := op.(*typesfakes.FakeLocalParticipant)
require.Equal(t, 1, fakeP.SendParticipantUpdateCallCount())
numUpdates += 1
}
@@ -145,26 +145,26 @@ func TestParticipantUpdate(t *testing.T) {
tests := []struct {
name string
sendToSender bool // should sender receive it
action func(p types.Participant)
action func(p types.LocalParticipant)
}{
{
"track mutes are sent to everyone",
true,
func(p types.Participant) {
func(p types.LocalParticipant) {
p.SetTrackMuted("", true, false)
},
},
{
"track metadata updates are sent to everyone",
true,
func(p types.Participant) {
func(p types.LocalParticipant) {
p.SetMetadata("")
},
},
{
"track publishes are sent to existing participants",
true,
func(p types.Participant) {
func(p types.LocalParticipant) {
p.AddTrack(&livekit.AddTrackRequest{
Type: livekit.TrackType_VIDEO,
})
@@ -178,7 +178,7 @@ func TestParticipantUpdate(t *testing.T) {
// remember how many times send has been called for each
callCounts := make(map[livekit.ParticipantID]int)
for _, p := range rm.GetParticipants() {
fp := p.(*typesfakes.FakeParticipant)
fp := p.(*typesfakes.FakeLocalParticipant)
callCounts[p.ID()] = fp.SendParticipantUpdateCallCount()
}
@@ -191,7 +191,7 @@ func TestParticipantUpdate(t *testing.T) {
if p != sender || test.sendToSender {
expected += 1
}
fp := p.(*typesfakes.FakeParticipant)
fp := p.(*typesfakes.FakeLocalParticipant)
require.Equal(t, expected, fp.SendParticipantUpdateCallCount())
}
})
@@ -248,12 +248,12 @@ func TestNewTrack(t *testing.T) {
t.Run("new track should be added to ready participants", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 3})
participants := rm.GetParticipants()
p0 := participants[0].(*typesfakes.FakeParticipant)
p0 := participants[0].(*typesfakes.FakeLocalParticipant)
p0.StateReturns(livekit.ParticipantInfo_JOINED)
p1 := participants[1].(*typesfakes.FakeParticipant)
p1 := participants[1].(*typesfakes.FakeLocalParticipant)
p1.StateReturns(livekit.ParticipantInfo_ACTIVE)
pub := participants[2].(*typesfakes.FakeParticipant)
pub := participants[2].(*typesfakes.FakeLocalParticipant)
// pub adds track
track := newMockTrack(livekit.TrackType_VIDEO, "webcam")
@@ -270,7 +270,7 @@ func TestNewTrack(t *testing.T) {
func TestActiveSpeakers(t *testing.T) {
t.Parallel()
getActiveSpeakerUpdates := func(p *typesfakes.FakeParticipant) []*livekit.ActiveSpeakerUpdate {
getActiveSpeakerUpdates := func(p *typesfakes.FakeLocalParticipant) []*livekit.ActiveSpeakerUpdate {
var updates []*livekit.ActiveSpeakerUpdate
numCalls := p.SendDataPacketCallCount()
for i := 0; i < numCalls; i++ {
@@ -287,7 +287,7 @@ func TestActiveSpeakers(t *testing.T) {
t.Run("participant should not be getting audio updates (protocol 2)", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 1, protocol: types.DefaultProtocol})
defer rm.Close()
p := rm.GetParticipants()[0].(*typesfakes.FakeParticipant)
p := rm.GetParticipants()[0].(*typesfakes.FakeLocalParticipant)
require.Empty(t, rm.GetActiveSpeakers())
time.Sleep(audioUpdateDuration)
@@ -300,8 +300,8 @@ func TestActiveSpeakers(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
p2 := participants[1].(*typesfakes.FakeParticipant)
p := participants[0].(*typesfakes.FakeLocalParticipant)
p2 := participants[1].(*typesfakes.FakeLocalParticipant)
p.GetAudioLevelReturns(10, true)
p2.GetAudioLevelReturns(20, true)
@@ -315,7 +315,7 @@ func TestActiveSpeakers(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
p := participants[0].(*typesfakes.FakeLocalParticipant)
time.Sleep(time.Millisecond) // let the first update cycle run
p.GetAudioLevelReturns(30, true)
@@ -325,7 +325,7 @@ func TestActiveSpeakers(t *testing.T) {
testutils.WithTimeout(t, "ensure everyone has gotten an audio update", func() bool {
for _, op := range participants {
op := op.(*typesfakes.FakeParticipant)
op := op.(*typesfakes.FakeLocalParticipant)
updates := getActiveSpeakerUpdates(op)
if len(updates) == 0 {
return false
@@ -348,8 +348,8 @@ func TestActiveSpeakers(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, protocol: types.DefaultProtocol, audioSmoothIntervals: 3})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
op := participants[1].(*typesfakes.FakeParticipant)
p := participants[0].(*typesfakes.FakeLocalParticipant)
op := participants[1].(*typesfakes.FakeLocalParticipant)
p.GetAudioLevelReturns(30, true)
convertedLevel := rtc.ConvertAudioLevel(30)
@@ -406,7 +406,7 @@ func TestDataChannel(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 3})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
p := participants[0].(*typesfakes.FakeLocalParticipant)
packet := livekit.DataPacket{
Kind: livekit.DataPacket_RELIABLE,
@@ -421,7 +421,7 @@ func TestDataChannel(t *testing.T) {
// ensure everyone has received the packet
for _, op := range participants {
fp := op.(*typesfakes.FakeParticipant)
fp := op.(*typesfakes.FakeLocalParticipant)
if fp == p {
require.Zero(t, fp.SendDataPacketCallCount())
continue
@@ -435,8 +435,8 @@ func TestDataChannel(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 4})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
p1 := participants[1].(*typesfakes.FakeParticipant)
p := participants[0].(*typesfakes.FakeLocalParticipant)
p1 := participants[1].(*typesfakes.FakeLocalParticipant)
packet := livekit.DataPacket{
Kind: livekit.DataPacket_RELIABLE,
@@ -452,7 +452,7 @@ func TestDataChannel(t *testing.T) {
// only p1 should receive the data
for _, op := range participants {
fp := op.(*typesfakes.FakeParticipant)
fp := op.(*typesfakes.FakeLocalParticipant)
if fp != p1 {
require.Zero(t, fp.SendDataPacketCallCount())
}
@@ -465,7 +465,7 @@ func TestDataChannel(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2})
defer rm.Close()
participants := rm.GetParticipants()
p := participants[0].(*typesfakes.FakeParticipant)
p := participants[0].(*typesfakes.FakeLocalParticipant)
p.CanPublishDataReturns(false)
packet := livekit.DataPacket{
@@ -480,7 +480,7 @@ func TestDataChannel(t *testing.T) {
// no one should've been sent packet
for _, op := range participants {
fp := op.(*typesfakes.FakeParticipant)
fp := op.(*typesfakes.FakeLocalParticipant)
require.Zero(t, fp.SendDataPacketCallCount())
}
})
@@ -519,7 +519,7 @@ func TestHiddenParticipants(t *testing.T) {
if p == op {
continue
}
mockP := op.(*typesfakes.FakeParticipant)
mockP := op.(*typesfakes.FakeLocalParticipant)
require.NotZero(t, mockP.AddSubscriberCallCount())
// last call should be to add the newest participant
sub, params := mockP.AddSubscriberArgsForCall(mockP.AddSubscriberCallCount() - 1)
@@ -537,7 +537,7 @@ func TestRoomUpdate(t *testing.T) {
rm.SetMetadata("test metadata...")
for _, op := range rm.GetParticipants() {
fp := op.(*typesfakes.FakeParticipant)
fp := op.(*typesfakes.FakeLocalParticipant)
require.Equal(t, 1, fp.SendRoomUpdateCallCount())
}
})
+1 -1
View File
@@ -7,7 +7,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
)
func HandleParticipantSignal(room types.Room, participant types.Participant, req *livekit.SignalRequest, pLogger logger.Logger) error {
func HandleParticipantSignal(room types.Room, participant types.LocalParticipant, req *livekit.SignalRequest, pLogger logger.Logger) error {
switch msg := req.Message.(type) {
case *livekit.SignalRequest_Offer:
_, err := participant.HandleOffer(FromProtoSessionDescription(msg.Offer))
+60 -76
View File
@@ -32,8 +32,37 @@ const (
MigrateStateComplete
)
//counterfeiter:generate . Participant
type Participant interface {
ID() livekit.ParticipantID
Identity() livekit.ParticipantIdentity
ToProto() *livekit.ParticipantInfo
SetMetadata(metadata string)
GetPublishedTrack(sid livekit.TrackID) MediaTrack
GetPublishedTracks() []MediaTrack
AddSubscriber(op LocalParticipant, params AddSubscriberParams) (int, error)
RemoveSubscriber(op LocalParticipant, trackID livekit.TrackID)
// permissions
Hidden() bool
Start()
Close(sendLeave bool) error
UpdateSubscriptionPermissions(permissions *livekit.UpdateSubscriptionPermissions, resolver func(participantID livekit.ParticipantID) LocalParticipant) error
UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error
DebugInfo() map[string]interface{}
}
//counterfeiter:generate . LocalParticipant
type LocalParticipant interface {
Participant
ProtocolVersion() ProtocolVersion
ConnectedAt() time.Time
@@ -87,12 +116,14 @@ type LocalParticipant interface {
SubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool)
// callbacks
OnStateChange(func(p Participant, oldState livekit.ParticipantInfo_State))
OnStateChange(func(p LocalParticipant, oldState livekit.ParticipantInfo_State))
// OnTrackPublished - remote added a track
OnTrackPublished(func(LocalParticipant, MediaTrack))
// OnTrackUpdated - one of its publishedTracks changed in status
OnTrackUpdated(callback func(Participant, PublishedTrack))
OnMetadataUpdate(callback func(Participant))
OnDataPacket(callback func(Participant, *livekit.DataPacket))
OnClose(_callback func(Participant, map[livekit.TrackID]livekit.ParticipantID))
OnTrackUpdated(callback func(LocalParticipant, MediaTrack))
OnMetadataUpdate(callback func(LocalParticipant))
OnDataPacket(callback func(LocalParticipant, *livekit.DataPacket))
OnClose(_callback func(LocalParticipant, map[livekit.TrackID]livekit.ParticipantID))
// updates from remotes
UpdateSubscribedQuality(nodeID string, trackID livekit.TrackID, maxQuality livekit.VideoQuality) error
@@ -105,77 +136,45 @@ type LocalParticipant interface {
SetPreviousAnswer(previous *webrtc.SessionDescription)
}
//counterfeiter:generate . Participant
type Participant interface {
LocalParticipant
ID() livekit.ParticipantID
Identity() livekit.ParticipantIdentity
ToProto() *livekit.ParticipantInfo
SetMetadata(metadata string)
GetPublishedTrack(sid livekit.TrackID) PublishedTrack
GetPublishedTracks() []PublishedTrack
AddSubscriber(op Participant, params AddSubscriberParams) (int, error)
RemoveSubscriber(op Participant, trackID livekit.TrackID)
// permissions
Hidden() bool
Start()
Close(sendLeave bool) error
// callbacks
// OnTrackPublished - remote added a remoteTrack
OnTrackPublished(func(Participant, PublishedTrack))
UpdateSubscriptionPermissions(permissions *livekit.UpdateSubscriptionPermissions, resolver func(participantID livekit.ParticipantID) Participant) error
UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error
DebugInfo() map[string]interface{}
}
// Room is a container of participants, and can provide room level actions
//counterfeiter:generate . Room
type Room interface {
Name() livekit.RoomName
UpdateSubscriptions(participant Participant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) error
UpdateSubscriptions(participant LocalParticipant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) error
UpdateSubscriptionPermissions(participant Participant, permissions *livekit.UpdateSubscriptionPermissions) error
SyncState(participant Participant, state *livekit.SyncState) error
SimulateScenario(participant Participant, scenario *livekit.SimulateScenario) error
SyncState(participant LocalParticipant, state *livekit.SyncState) error
SimulateScenario(participant LocalParticipant, scenario *livekit.SimulateScenario) error
UpdateVideoLayers(participant Participant, updateVideoLayers *livekit.UpdateVideoLayers) error
}
//counterfeiter:generate . LocalMediaTrack
type LocalMediaTrack interface {
NotifySubscriberNodeMediaLoss(nodeID string, fractionalLoss uint8)
}
// MediaTrack represents a media track
//counterfeiter:generate . MediaTrack
type MediaTrack interface {
LocalMediaTrack
ID() livekit.TrackID
Kind() livekit.TrackType
Name() string
IsMuted() bool
SetMuted(muted bool)
UpdateVideoLayers(layers []*livekit.VideoLayer)
Source() livekit.TrackSource
IsSimulcast() bool
ToProto() *livekit.TrackInfo
PublisherID() livekit.ParticipantID
PublisherIdentity() livekit.ParticipantIdentity
ToProto() *livekit.TrackInfo
IsMuted() bool
SetMuted(muted bool)
UpdateVideoLayers(layers []*livekit.VideoLayer)
IsSimulcast() bool
Receiver() sfu.TrackReceiver
// callbacks
OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error)
AddOnClose(func())
// subscribers
AddSubscriber(participant Participant) error
AddSubscriber(participant LocalParticipant) error
RemoveSubscriber(participantID livekit.ParticipantID)
IsSubscriber(subID livekit.ParticipantID) bool
GetAllSubscriberIDs() []livekit.ParticipantID
@@ -189,35 +188,20 @@ type MediaTrack interface {
NotifySubscriberNodeMaxQuality(nodeID string, quality livekit.VideoQuality)
}
//counterfeiter:generate . LocalPublishedTrack
type LocalPublishedTrack interface {
//counterfeiter:generate . LocalMediaTrack
type LocalMediaTrack interface {
MediaTrack
SignalCid() string
SdpCid() string
GetAudioLevel() (level uint8, active bool)
GetConnectionScore() float64
NotifySubscriberNodeMediaLoss(nodeID string, fractionalLoss uint8)
}
// PublishedTrack is the main interface representing a track published to the room
// it's responsible for managing subscribers and forwarding data from the input track to all subscribers
//counterfeiter:generate . PublishedTrack
type PublishedTrack interface {
MediaTrack
ToProto() *livekit.TrackInfo
Receiver() sfu.TrackReceiver
UpdateVideoLayers(layers []*livekit.VideoLayer)
OnSubscribedMaxQualityChange(f func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error)
// callbacks
AddOnClose(func())
LocalPublishedTrack
}
// MediaTrack is the main interface representing a track published to the room
//counterfeiter:generate . SubscribedTrack
type SubscribedTrack interface {
OnBind(f func())
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -1,302 +0,0 @@
// Code generated by counterfeiter. DO NOT EDIT.
package typesfakes
import (
"sync"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
type FakeLocalPublishedTrack struct {
GetAudioLevelStub func() (uint8, bool)
getAudioLevelMutex sync.RWMutex
getAudioLevelArgsForCall []struct {
}
getAudioLevelReturns struct {
result1 uint8
result2 bool
}
getAudioLevelReturnsOnCall map[int]struct {
result1 uint8
result2 bool
}
GetConnectionScoreStub func() float64
getConnectionScoreMutex sync.RWMutex
getConnectionScoreArgsForCall []struct {
}
getConnectionScoreReturns struct {
result1 float64
}
getConnectionScoreReturnsOnCall map[int]struct {
result1 float64
}
SdpCidStub func() string
sdpCidMutex sync.RWMutex
sdpCidArgsForCall []struct {
}
sdpCidReturns struct {
result1 string
}
sdpCidReturnsOnCall map[int]struct {
result1 string
}
SignalCidStub func() string
signalCidMutex sync.RWMutex
signalCidArgsForCall []struct {
}
signalCidReturns struct {
result1 string
}
signalCidReturnsOnCall map[int]struct {
result1 string
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeLocalPublishedTrack) GetAudioLevel() (uint8, bool) {
fake.getAudioLevelMutex.Lock()
ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)]
fake.getAudioLevelArgsForCall = append(fake.getAudioLevelArgsForCall, struct {
}{})
stub := fake.GetAudioLevelStub
fakeReturns := fake.getAudioLevelReturns
fake.recordInvocation("GetAudioLevel", []interface{}{})
fake.getAudioLevelMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeLocalPublishedTrack) GetAudioLevelCallCount() int {
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
return len(fake.getAudioLevelArgsForCall)
}
func (fake *FakeLocalPublishedTrack) GetAudioLevelCalls(stub func() (uint8, bool)) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = stub
}
func (fake *FakeLocalPublishedTrack) GetAudioLevelReturns(result1 uint8, result2 bool) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = nil
fake.getAudioLevelReturns = struct {
result1 uint8
result2 bool
}{result1, result2}
}
func (fake *FakeLocalPublishedTrack) GetAudioLevelReturnsOnCall(i int, result1 uint8, result2 bool) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = nil
if fake.getAudioLevelReturnsOnCall == nil {
fake.getAudioLevelReturnsOnCall = make(map[int]struct {
result1 uint8
result2 bool
})
}
fake.getAudioLevelReturnsOnCall[i] = struct {
result1 uint8
result2 bool
}{result1, result2}
}
func (fake *FakeLocalPublishedTrack) GetConnectionScore() float64 {
fake.getConnectionScoreMutex.Lock()
ret, specificReturn := fake.getConnectionScoreReturnsOnCall[len(fake.getConnectionScoreArgsForCall)]
fake.getConnectionScoreArgsForCall = append(fake.getConnectionScoreArgsForCall, struct {
}{})
stub := fake.GetConnectionScoreStub
fakeReturns := fake.getConnectionScoreReturns
fake.recordInvocation("GetConnectionScore", []interface{}{})
fake.getConnectionScoreMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalPublishedTrack) GetConnectionScoreCallCount() int {
fake.getConnectionScoreMutex.RLock()
defer fake.getConnectionScoreMutex.RUnlock()
return len(fake.getConnectionScoreArgsForCall)
}
func (fake *FakeLocalPublishedTrack) GetConnectionScoreCalls(stub func() float64) {
fake.getConnectionScoreMutex.Lock()
defer fake.getConnectionScoreMutex.Unlock()
fake.GetConnectionScoreStub = stub
}
func (fake *FakeLocalPublishedTrack) GetConnectionScoreReturns(result1 float64) {
fake.getConnectionScoreMutex.Lock()
defer fake.getConnectionScoreMutex.Unlock()
fake.GetConnectionScoreStub = nil
fake.getConnectionScoreReturns = struct {
result1 float64
}{result1}
}
func (fake *FakeLocalPublishedTrack) GetConnectionScoreReturnsOnCall(i int, result1 float64) {
fake.getConnectionScoreMutex.Lock()
defer fake.getConnectionScoreMutex.Unlock()
fake.GetConnectionScoreStub = nil
if fake.getConnectionScoreReturnsOnCall == nil {
fake.getConnectionScoreReturnsOnCall = make(map[int]struct {
result1 float64
})
}
fake.getConnectionScoreReturnsOnCall[i] = struct {
result1 float64
}{result1}
}
func (fake *FakeLocalPublishedTrack) SdpCid() string {
fake.sdpCidMutex.Lock()
ret, specificReturn := fake.sdpCidReturnsOnCall[len(fake.sdpCidArgsForCall)]
fake.sdpCidArgsForCall = append(fake.sdpCidArgsForCall, struct {
}{})
stub := fake.SdpCidStub
fakeReturns := fake.sdpCidReturns
fake.recordInvocation("SdpCid", []interface{}{})
fake.sdpCidMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalPublishedTrack) SdpCidCallCount() int {
fake.sdpCidMutex.RLock()
defer fake.sdpCidMutex.RUnlock()
return len(fake.sdpCidArgsForCall)
}
func (fake *FakeLocalPublishedTrack) SdpCidCalls(stub func() string) {
fake.sdpCidMutex.Lock()
defer fake.sdpCidMutex.Unlock()
fake.SdpCidStub = stub
}
func (fake *FakeLocalPublishedTrack) SdpCidReturns(result1 string) {
fake.sdpCidMutex.Lock()
defer fake.sdpCidMutex.Unlock()
fake.SdpCidStub = nil
fake.sdpCidReturns = struct {
result1 string
}{result1}
}
func (fake *FakeLocalPublishedTrack) SdpCidReturnsOnCall(i int, result1 string) {
fake.sdpCidMutex.Lock()
defer fake.sdpCidMutex.Unlock()
fake.SdpCidStub = nil
if fake.sdpCidReturnsOnCall == nil {
fake.sdpCidReturnsOnCall = make(map[int]struct {
result1 string
})
}
fake.sdpCidReturnsOnCall[i] = struct {
result1 string
}{result1}
}
func (fake *FakeLocalPublishedTrack) SignalCid() string {
fake.signalCidMutex.Lock()
ret, specificReturn := fake.signalCidReturnsOnCall[len(fake.signalCidArgsForCall)]
fake.signalCidArgsForCall = append(fake.signalCidArgsForCall, struct {
}{})
stub := fake.SignalCidStub
fakeReturns := fake.signalCidReturns
fake.recordInvocation("SignalCid", []interface{}{})
fake.signalCidMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalPublishedTrack) SignalCidCallCount() int {
fake.signalCidMutex.RLock()
defer fake.signalCidMutex.RUnlock()
return len(fake.signalCidArgsForCall)
}
func (fake *FakeLocalPublishedTrack) SignalCidCalls(stub func() string) {
fake.signalCidMutex.Lock()
defer fake.signalCidMutex.Unlock()
fake.SignalCidStub = stub
}
func (fake *FakeLocalPublishedTrack) SignalCidReturns(result1 string) {
fake.signalCidMutex.Lock()
defer fake.signalCidMutex.Unlock()
fake.SignalCidStub = nil
fake.signalCidReturns = struct {
result1 string
}{result1}
}
func (fake *FakeLocalPublishedTrack) SignalCidReturnsOnCall(i int, result1 string) {
fake.signalCidMutex.Lock()
defer fake.signalCidMutex.Unlock()
fake.SignalCidStub = nil
if fake.signalCidReturnsOnCall == nil {
fake.signalCidReturnsOnCall = make(map[int]struct {
result1 string
})
}
fake.signalCidReturnsOnCall[i] = struct {
result1 string
}{result1}
}
func (fake *FakeLocalPublishedTrack) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
fake.getConnectionScoreMutex.RLock()
defer fake.getConnectionScoreMutex.RUnlock()
fake.sdpCidMutex.RLock()
defer fake.sdpCidMutex.RUnlock()
fake.signalCidMutex.RLock()
defer fake.signalCidMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *FakeLocalPublishedTrack) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ types.LocalPublishedTrack = new(FakeLocalPublishedTrack)
+139 -36
View File
@@ -5,14 +5,20 @@ import (
"sync"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/protocol/livekit"
)
type FakeMediaTrack struct {
AddSubscriberStub func(types.Participant) error
AddOnCloseStub func(func())
addOnCloseMutex sync.RWMutex
addOnCloseArgsForCall []struct {
arg1 func()
}
AddSubscriberStub func(types.LocalParticipant) error
addSubscriberMutex sync.RWMutex
addSubscriberArgsForCall []struct {
arg1 types.Participant
arg1 types.LocalParticipant
}
addSubscriberReturns struct {
result1 error
@@ -115,11 +121,10 @@ type FakeMediaTrack struct {
arg1 string
arg2 livekit.VideoQuality
}
NotifySubscriberNodeMediaLossStub func(string, uint8)
notifySubscriberNodeMediaLossMutex sync.RWMutex
notifySubscriberNodeMediaLossArgsForCall []struct {
arg1 string
arg2 uint8
OnSubscribedMaxQualityChangeStub func(func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error)
onSubscribedMaxQualityChangeMutex sync.RWMutex
onSubscribedMaxQualityChangeArgsForCall []struct {
arg1 func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error
}
PublisherIDStub func() livekit.ParticipantID
publisherIDMutex sync.RWMutex
@@ -141,6 +146,16 @@ type FakeMediaTrack struct {
publisherIdentityReturnsOnCall map[int]struct {
result1 livekit.ParticipantIdentity
}
ReceiverStub func() sfu.TrackReceiver
receiverMutex sync.RWMutex
receiverArgsForCall []struct {
}
receiverReturns struct {
result1 sfu.TrackReceiver
}
receiverReturnsOnCall map[int]struct {
result1 sfu.TrackReceiver
}
RemoveAllSubscribersStub func()
removeAllSubscribersMutex sync.RWMutex
removeAllSubscribersArgsForCall []struct {
@@ -195,11 +210,43 @@ type FakeMediaTrack struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeMediaTrack) AddSubscriber(arg1 types.Participant) error {
func (fake *FakeMediaTrack) AddOnClose(arg1 func()) {
fake.addOnCloseMutex.Lock()
fake.addOnCloseArgsForCall = append(fake.addOnCloseArgsForCall, struct {
arg1 func()
}{arg1})
stub := fake.AddOnCloseStub
fake.recordInvocation("AddOnClose", []interface{}{arg1})
fake.addOnCloseMutex.Unlock()
if stub != nil {
fake.AddOnCloseStub(arg1)
}
}
func (fake *FakeMediaTrack) AddOnCloseCallCount() int {
fake.addOnCloseMutex.RLock()
defer fake.addOnCloseMutex.RUnlock()
return len(fake.addOnCloseArgsForCall)
}
func (fake *FakeMediaTrack) AddOnCloseCalls(stub func(func())) {
fake.addOnCloseMutex.Lock()
defer fake.addOnCloseMutex.Unlock()
fake.AddOnCloseStub = stub
}
func (fake *FakeMediaTrack) AddOnCloseArgsForCall(i int) func() {
fake.addOnCloseMutex.RLock()
defer fake.addOnCloseMutex.RUnlock()
argsForCall := fake.addOnCloseArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeMediaTrack) AddSubscriber(arg1 types.LocalParticipant) error {
fake.addSubscriberMutex.Lock()
ret, specificReturn := fake.addSubscriberReturnsOnCall[len(fake.addSubscriberArgsForCall)]
fake.addSubscriberArgsForCall = append(fake.addSubscriberArgsForCall, struct {
arg1 types.Participant
arg1 types.LocalParticipant
}{arg1})
stub := fake.AddSubscriberStub
fakeReturns := fake.addSubscriberReturns
@@ -220,13 +267,13 @@ func (fake *FakeMediaTrack) AddSubscriberCallCount() int {
return len(fake.addSubscriberArgsForCall)
}
func (fake *FakeMediaTrack) AddSubscriberCalls(stub func(types.Participant) error) {
func (fake *FakeMediaTrack) AddSubscriberCalls(stub func(types.LocalParticipant) error) {
fake.addSubscriberMutex.Lock()
defer fake.addSubscriberMutex.Unlock()
fake.AddSubscriberStub = stub
}
func (fake *FakeMediaTrack) AddSubscriberArgsForCall(i int) types.Participant {
func (fake *FakeMediaTrack) AddSubscriberArgsForCall(i int) types.LocalParticipant {
fake.addSubscriberMutex.RLock()
defer fake.addSubscriberMutex.RUnlock()
argsForCall := fake.addSubscriberArgsForCall[i]
@@ -763,37 +810,36 @@ func (fake *FakeMediaTrack) NotifySubscriberNodeMaxQualityArgsForCall(i int) (st
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLoss(arg1 string, arg2 uint8) {
fake.notifySubscriberNodeMediaLossMutex.Lock()
fake.notifySubscriberNodeMediaLossArgsForCall = append(fake.notifySubscriberNodeMediaLossArgsForCall, struct {
arg1 string
arg2 uint8
}{arg1, arg2})
stub := fake.NotifySubscriberNodeMediaLossStub
fake.recordInvocation("NotifySubscriberNodeMediaLoss", []interface{}{arg1, arg2})
fake.notifySubscriberNodeMediaLossMutex.Unlock()
func (fake *FakeMediaTrack) OnSubscribedMaxQualityChange(arg1 func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error) {
fake.onSubscribedMaxQualityChangeMutex.Lock()
fake.onSubscribedMaxQualityChangeArgsForCall = append(fake.onSubscribedMaxQualityChangeArgsForCall, struct {
arg1 func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error
}{arg1})
stub := fake.OnSubscribedMaxQualityChangeStub
fake.recordInvocation("OnSubscribedMaxQualityChange", []interface{}{arg1})
fake.onSubscribedMaxQualityChangeMutex.Unlock()
if stub != nil {
fake.NotifySubscriberNodeMediaLossStub(arg1, arg2)
fake.OnSubscribedMaxQualityChangeStub(arg1)
}
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossCallCount() int {
fake.notifySubscriberNodeMediaLossMutex.RLock()
defer fake.notifySubscriberNodeMediaLossMutex.RUnlock()
return len(fake.notifySubscriberNodeMediaLossArgsForCall)
func (fake *FakeMediaTrack) OnSubscribedMaxQualityChangeCallCount() int {
fake.onSubscribedMaxQualityChangeMutex.RLock()
defer fake.onSubscribedMaxQualityChangeMutex.RUnlock()
return len(fake.onSubscribedMaxQualityChangeArgsForCall)
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossCalls(stub func(string, uint8)) {
fake.notifySubscriberNodeMediaLossMutex.Lock()
defer fake.notifySubscriberNodeMediaLossMutex.Unlock()
fake.NotifySubscriberNodeMediaLossStub = stub
func (fake *FakeMediaTrack) OnSubscribedMaxQualityChangeCalls(stub func(func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error)) {
fake.onSubscribedMaxQualityChangeMutex.Lock()
defer fake.onSubscribedMaxQualityChangeMutex.Unlock()
fake.OnSubscribedMaxQualityChangeStub = stub
}
func (fake *FakeMediaTrack) NotifySubscriberNodeMediaLossArgsForCall(i int) (string, uint8) {
fake.notifySubscriberNodeMediaLossMutex.RLock()
defer fake.notifySubscriberNodeMediaLossMutex.RUnlock()
argsForCall := fake.notifySubscriberNodeMediaLossArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
func (fake *FakeMediaTrack) OnSubscribedMaxQualityChangeArgsForCall(i int) func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxQuality livekit.VideoQuality) error {
fake.onSubscribedMaxQualityChangeMutex.RLock()
defer fake.onSubscribedMaxQualityChangeMutex.RUnlock()
argsForCall := fake.onSubscribedMaxQualityChangeArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeMediaTrack) PublisherID() livekit.ParticipantID {
@@ -902,6 +948,59 @@ func (fake *FakeMediaTrack) PublisherIdentityReturnsOnCall(i int, result1 liveki
}{result1}
}
func (fake *FakeMediaTrack) Receiver() sfu.TrackReceiver {
fake.receiverMutex.Lock()
ret, specificReturn := fake.receiverReturnsOnCall[len(fake.receiverArgsForCall)]
fake.receiverArgsForCall = append(fake.receiverArgsForCall, struct {
}{})
stub := fake.ReceiverStub
fakeReturns := fake.receiverReturns
fake.recordInvocation("Receiver", []interface{}{})
fake.receiverMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeMediaTrack) ReceiverCallCount() int {
fake.receiverMutex.RLock()
defer fake.receiverMutex.RUnlock()
return len(fake.receiverArgsForCall)
}
func (fake *FakeMediaTrack) ReceiverCalls(stub func() sfu.TrackReceiver) {
fake.receiverMutex.Lock()
defer fake.receiverMutex.Unlock()
fake.ReceiverStub = stub
}
func (fake *FakeMediaTrack) ReceiverReturns(result1 sfu.TrackReceiver) {
fake.receiverMutex.Lock()
defer fake.receiverMutex.Unlock()
fake.ReceiverStub = nil
fake.receiverReturns = struct {
result1 sfu.TrackReceiver
}{result1}
}
func (fake *FakeMediaTrack) ReceiverReturnsOnCall(i int, result1 sfu.TrackReceiver) {
fake.receiverMutex.Lock()
defer fake.receiverMutex.Unlock()
fake.ReceiverStub = nil
if fake.receiverReturnsOnCall == nil {
fake.receiverReturnsOnCall = make(map[int]struct {
result1 sfu.TrackReceiver
})
}
fake.receiverReturnsOnCall[i] = struct {
result1 sfu.TrackReceiver
}{result1}
}
func (fake *FakeMediaTrack) RemoveAllSubscribers() {
fake.removeAllSubscribersMutex.Lock()
fake.removeAllSubscribersArgsForCall = append(fake.removeAllSubscribersArgsForCall, struct {
@@ -1202,6 +1301,8 @@ func (fake *FakeMediaTrack) UpdateVideoLayersArgsForCall(i int) []*livekit.Video
func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.addOnCloseMutex.RLock()
defer fake.addOnCloseMutex.RUnlock()
fake.addSubscriberMutex.RLock()
defer fake.addSubscriberMutex.RUnlock()
fake.getAllSubscriberIDsMutex.RLock()
@@ -1224,12 +1325,14 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} {
defer fake.notifySubscriberMaxQualityMutex.RUnlock()
fake.notifySubscriberNodeMaxQualityMutex.RLock()
defer fake.notifySubscriberNodeMaxQualityMutex.RUnlock()
fake.notifySubscriberNodeMediaLossMutex.RLock()
defer fake.notifySubscriberNodeMediaLossMutex.RUnlock()
fake.onSubscribedMaxQualityChangeMutex.RLock()
defer fake.onSubscribedMaxQualityChangeMutex.RUnlock()
fake.publisherIDMutex.RLock()
defer fake.publisherIDMutex.RUnlock()
fake.publisherIdentityMutex.RLock()
defer fake.publisherIdentityMutex.RUnlock()
fake.receiverMutex.RLock()
defer fake.receiverMutex.RUnlock()
fake.removeAllSubscribersMutex.RLock()
defer fake.removeAllSubscribersMutex.RUnlock()
fake.removeSubscriberMutex.RLock()
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+18 -18
View File
@@ -19,10 +19,10 @@ type FakeRoom struct {
nameReturnsOnCall map[int]struct {
result1 livekit.RoomName
}
SimulateScenarioStub func(types.Participant, *livekit.SimulateScenario) error
SimulateScenarioStub func(types.LocalParticipant, *livekit.SimulateScenario) error
simulateScenarioMutex sync.RWMutex
simulateScenarioArgsForCall []struct {
arg1 types.Participant
arg1 types.LocalParticipant
arg2 *livekit.SimulateScenario
}
simulateScenarioReturns struct {
@@ -31,10 +31,10 @@ type FakeRoom struct {
simulateScenarioReturnsOnCall map[int]struct {
result1 error
}
SyncStateStub func(types.Participant, *livekit.SyncState) error
SyncStateStub func(types.LocalParticipant, *livekit.SyncState) error
syncStateMutex sync.RWMutex
syncStateArgsForCall []struct {
arg1 types.Participant
arg1 types.LocalParticipant
arg2 *livekit.SyncState
}
syncStateReturns struct {
@@ -55,10 +55,10 @@ type FakeRoom struct {
updateSubscriptionPermissionsReturnsOnCall map[int]struct {
result1 error
}
UpdateSubscriptionsStub func(types.Participant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) error
UpdateSubscriptionsStub func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) error
updateSubscriptionsMutex sync.RWMutex
updateSubscriptionsArgsForCall []struct {
arg1 types.Participant
arg1 types.LocalParticipant
arg2 []livekit.TrackID
arg3 []*livekit.ParticipantTracks
arg4 bool
@@ -138,11 +138,11 @@ func (fake *FakeRoom) NameReturnsOnCall(i int, result1 livekit.RoomName) {
}{result1}
}
func (fake *FakeRoom) SimulateScenario(arg1 types.Participant, arg2 *livekit.SimulateScenario) error {
func (fake *FakeRoom) SimulateScenario(arg1 types.LocalParticipant, arg2 *livekit.SimulateScenario) error {
fake.simulateScenarioMutex.Lock()
ret, specificReturn := fake.simulateScenarioReturnsOnCall[len(fake.simulateScenarioArgsForCall)]
fake.simulateScenarioArgsForCall = append(fake.simulateScenarioArgsForCall, struct {
arg1 types.Participant
arg1 types.LocalParticipant
arg2 *livekit.SimulateScenario
}{arg1, arg2})
stub := fake.SimulateScenarioStub
@@ -164,13 +164,13 @@ func (fake *FakeRoom) SimulateScenarioCallCount() int {
return len(fake.simulateScenarioArgsForCall)
}
func (fake *FakeRoom) SimulateScenarioCalls(stub func(types.Participant, *livekit.SimulateScenario) error) {
func (fake *FakeRoom) SimulateScenarioCalls(stub func(types.LocalParticipant, *livekit.SimulateScenario) error) {
fake.simulateScenarioMutex.Lock()
defer fake.simulateScenarioMutex.Unlock()
fake.SimulateScenarioStub = stub
}
func (fake *FakeRoom) SimulateScenarioArgsForCall(i int) (types.Participant, *livekit.SimulateScenario) {
func (fake *FakeRoom) SimulateScenarioArgsForCall(i int) (types.LocalParticipant, *livekit.SimulateScenario) {
fake.simulateScenarioMutex.RLock()
defer fake.simulateScenarioMutex.RUnlock()
argsForCall := fake.simulateScenarioArgsForCall[i]
@@ -200,11 +200,11 @@ func (fake *FakeRoom) SimulateScenarioReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRoom) SyncState(arg1 types.Participant, arg2 *livekit.SyncState) error {
func (fake *FakeRoom) SyncState(arg1 types.LocalParticipant, arg2 *livekit.SyncState) error {
fake.syncStateMutex.Lock()
ret, specificReturn := fake.syncStateReturnsOnCall[len(fake.syncStateArgsForCall)]
fake.syncStateArgsForCall = append(fake.syncStateArgsForCall, struct {
arg1 types.Participant
arg1 types.LocalParticipant
arg2 *livekit.SyncState
}{arg1, arg2})
stub := fake.SyncStateStub
@@ -226,13 +226,13 @@ func (fake *FakeRoom) SyncStateCallCount() int {
return len(fake.syncStateArgsForCall)
}
func (fake *FakeRoom) SyncStateCalls(stub func(types.Participant, *livekit.SyncState) error) {
func (fake *FakeRoom) SyncStateCalls(stub func(types.LocalParticipant, *livekit.SyncState) error) {
fake.syncStateMutex.Lock()
defer fake.syncStateMutex.Unlock()
fake.SyncStateStub = stub
}
func (fake *FakeRoom) SyncStateArgsForCall(i int) (types.Participant, *livekit.SyncState) {
func (fake *FakeRoom) SyncStateArgsForCall(i int) (types.LocalParticipant, *livekit.SyncState) {
fake.syncStateMutex.RLock()
defer fake.syncStateMutex.RUnlock()
argsForCall := fake.syncStateArgsForCall[i]
@@ -324,7 +324,7 @@ func (fake *FakeRoom) UpdateSubscriptionPermissionsReturnsOnCall(i int, result1
}{result1}
}
func (fake *FakeRoom) UpdateSubscriptions(arg1 types.Participant, arg2 []livekit.TrackID, arg3 []*livekit.ParticipantTracks, arg4 bool) error {
func (fake *FakeRoom) UpdateSubscriptions(arg1 types.LocalParticipant, arg2 []livekit.TrackID, arg3 []*livekit.ParticipantTracks, arg4 bool) error {
var arg2Copy []livekit.TrackID
if arg2 != nil {
arg2Copy = make([]livekit.TrackID, len(arg2))
@@ -338,7 +338,7 @@ func (fake *FakeRoom) UpdateSubscriptions(arg1 types.Participant, arg2 []livekit
fake.updateSubscriptionsMutex.Lock()
ret, specificReturn := fake.updateSubscriptionsReturnsOnCall[len(fake.updateSubscriptionsArgsForCall)]
fake.updateSubscriptionsArgsForCall = append(fake.updateSubscriptionsArgsForCall, struct {
arg1 types.Participant
arg1 types.LocalParticipant
arg2 []livekit.TrackID
arg3 []*livekit.ParticipantTracks
arg4 bool
@@ -362,13 +362,13 @@ func (fake *FakeRoom) UpdateSubscriptionsCallCount() int {
return len(fake.updateSubscriptionsArgsForCall)
}
func (fake *FakeRoom) UpdateSubscriptionsCalls(stub func(types.Participant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) error) {
func (fake *FakeRoom) UpdateSubscriptionsCalls(stub func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) error) {
fake.updateSubscriptionsMutex.Lock()
defer fake.updateSubscriptionsMutex.Unlock()
fake.UpdateSubscriptionsStub = stub
}
func (fake *FakeRoom) UpdateSubscriptionsArgsForCall(i int) (types.Participant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) {
func (fake *FakeRoom) UpdateSubscriptionsArgsForCall(i int) (types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) {
fake.updateSubscriptionsMutex.RLock()
defer fake.updateSubscriptionsMutex.RUnlock()
argsForCall := fake.updateSubscriptionsArgsForCall[i]
+21 -62
View File
@@ -21,7 +21,7 @@ type UptrackManager struct {
closed bool
// publishedTracks that participant is publishing
publishedTracks map[livekit.TrackID]types.PublishedTrack
publishedTracks map[livekit.TrackID]types.MediaTrack
// keeps track of subscriptions that are awaiting permissions
subscriptionPermissions map[livekit.ParticipantID]*livekit.TrackPermission // subscriberID => *livekit.TrackPermission
// keeps tracks of track specific subscribers who are awaiting permission
@@ -31,14 +31,14 @@ type UptrackManager struct {
// callbacks & handlers
onClose func()
onTrackUpdated func(track types.PublishedTrack, onlyIfReady bool)
onTrackUpdated func(track types.MediaTrack, onlyIfReady bool)
onSubscribedMaxQualityChange func(trackID livekit.TrackID, subscribedQualities []*livekit.SubscribedQuality, maxSubscribedQuality livekit.VideoQuality) error
}
func NewUptrackManager(params UptrackManagerParams) *UptrackManager {
return &UptrackManager{
params: params,
publishedTracks: make(map[livekit.TrackID]types.PublishedTrack, 0),
publishedTracks: make(map[livekit.TrackID]types.MediaTrack, 0),
pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantID),
}
}
@@ -79,7 +79,7 @@ func (u *UptrackManager) ToProto() []*livekit.TrackInfo {
return trackInfos
}
func (u *UptrackManager) OnPublishedTrackUpdated(f func(track types.PublishedTrack, onlyIfReady bool)) {
func (u *UptrackManager) OnPublishedTrackUpdated(f func(track types.MediaTrack, onlyIfReady bool)) {
u.onTrackUpdated = f
}
@@ -88,8 +88,8 @@ func (u *UptrackManager) OnSubscribedMaxQualityChange(f func(trackID livekit.Tra
}
// AddSubscriber subscribes op to all publishedTracks
func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSubscriberParams) (int, error) {
var tracks []types.PublishedTrack
func (u *UptrackManager) AddSubscriber(sub types.LocalParticipant, params types.AddSubscriberParams) (int, error) {
var tracks []types.MediaTrack
if params.AllTracks {
tracks = u.GetPublishedTracks()
} else {
@@ -132,7 +132,7 @@ func (u *UptrackManager) AddSubscriber(sub types.Participant, params types.AddSu
return n, nil
}
func (u *UptrackManager) RemoveSubscriber(sub types.Participant, trackID livekit.TrackID) {
func (u *UptrackManager) RemoveSubscriber(sub types.LocalParticipant, trackID livekit.TrackID) {
track := u.GetPublishedTrack(trackID)
if track != nil {
track.RemoveSubscriber(sub.ID())
@@ -143,7 +143,7 @@ func (u *UptrackManager) RemoveSubscriber(sub types.Participant, trackID livekit
u.lock.Unlock()
}
func (u *UptrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) types.PublishedTrack {
func (u *UptrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) types.MediaTrack {
u.lock.RLock()
track := u.publishedTracks[trackID]
u.lock.RUnlock()
@@ -163,18 +163,18 @@ func (u *UptrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted b
return track
}
func (u *UptrackManager) GetPublishedTrack(trackID livekit.TrackID) types.PublishedTrack {
func (u *UptrackManager) GetPublishedTrack(trackID livekit.TrackID) types.MediaTrack {
u.lock.RLock()
defer u.lock.RUnlock()
return u.getPublishedTrack(trackID)
}
func (u *UptrackManager) GetPublishedTracks() []types.PublishedTrack {
func (u *UptrackManager) GetPublishedTracks() []types.MediaTrack {
u.lock.RLock()
defer u.lock.RUnlock()
tracks := make([]types.PublishedTrack, 0, len(u.publishedTracks))
tracks := make([]types.MediaTrack, 0, len(u.publishedTracks))
for _, t := range u.publishedTracks {
tracks = append(tracks, t)
}
@@ -183,7 +183,7 @@ func (u *UptrackManager) GetPublishedTracks() []types.PublishedTrack {
func (u *UptrackManager) UpdateSubscriptionPermissions(
permissions *livekit.UpdateSubscriptionPermissions,
resolver func(participantID livekit.ParticipantID) types.Participant,
resolver func(participantID livekit.ParticipantID) types.LocalParticipant,
) error {
u.lock.Lock()
defer u.lock.Unlock()
@@ -207,7 +207,7 @@ func (u *UptrackManager) UpdateVideoLayers(updateVideoLayers *livekit.UpdateVide
return nil
}
func (u *UptrackManager) AddPublishedTrack(track types.PublishedTrack) {
func (u *UptrackManager) AddPublishedTrack(track types.MediaTrack) {
track.OnSubscribedMaxQualityChange(u.onSubscribedMaxQualityChange)
u.lock.Lock()
@@ -242,56 +242,15 @@ func (u *UptrackManager) AddPublishedTrack(track types.PublishedTrack) {
})
}
func (u *UptrackManager) RemovePublishedTrack(track types.PublishedTrack) {
func (u *UptrackManager) RemovePublishedTrack(track types.MediaTrack) {
track.RemoveAllSubscribers()
}
// should be called with lock held
func (u *UptrackManager) getPublishedTrack(trackID livekit.TrackID) types.PublishedTrack {
func (u *UptrackManager) getPublishedTrack(trackID livekit.TrackID) types.MediaTrack {
return u.publishedTracks[trackID]
}
func (u *UptrackManager) GetPublishedTrackBySignalCidOrSdpCid(clientId string) types.PublishedTrack {
u.lock.RLock()
defer u.lock.RUnlock()
track := u.getPublishedTrackBySignalCid(clientId)
if track == nil {
track = u.getPublishedTrackBySdpCid(clientId)
}
return track
}
// should be called with lock held
func (u *UptrackManager) getPublishedTrackBySignalCid(clientId string) types.PublishedTrack {
for _, publishedTrack := range u.publishedTracks {
if publishedTrack.SignalCid() == clientId {
return publishedTrack
}
}
return nil
}
func (u *UptrackManager) GetPublishedTrackBySdpCid(clientId string) types.PublishedTrack {
u.lock.RLock()
defer u.lock.RUnlock()
return u.getPublishedTrackBySdpCid(clientId)
}
// should be called with lock held
func (u *UptrackManager) getPublishedTrackBySdpCid(clientId string) types.PublishedTrack {
for _, publishedTrack := range u.publishedTracks {
if publishedTrack.SdpCid() == clientId {
return publishedTrack
}
}
return nil
}
func (u *UptrackManager) updateSubscriptionPermissions(permissions *livekit.UpdateSubscriptionPermissions) {
// every update overrides the existing
@@ -355,7 +314,7 @@ func (u *UptrackManager) getAllowedSubscribers(trackID livekit.TrackID) []liveki
return allowed
}
func (u *UptrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, sub types.Participant) {
func (u *UptrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, sub types.LocalParticipant) {
subscriberID := sub.ID()
pending := u.pendingSubscriptions[trackID]
@@ -370,7 +329,7 @@ func (u *UptrackManager) maybeAddPendingSubscription(trackID livekit.TrackID, su
go sub.SubscriptionPermissionUpdate(u.params.SID, trackID, false)
}
func (u *UptrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.Participant) {
func (u *UptrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID, sub types.LocalParticipant) {
subscriberID := sub.ID()
pending := u.pendingSubscriptions[trackID]
@@ -387,7 +346,7 @@ func (u *UptrackManager) maybeRemovePendingSubscription(trackID livekit.TrackID,
}
}
func (u *UptrackManager) processPendingSubscriptions(resolver func(participantID livekit.ParticipantID) types.Participant) {
func (u *UptrackManager) processPendingSubscriptions(resolver func(participantID livekit.ParticipantID) types.LocalParticipant) {
updatedPendingSubscriptions := make(map[livekit.TrackID][]livekit.ParticipantID)
for trackID, pending := range u.pendingSubscriptions {
track := u.getPublishedTrack(trackID)
@@ -397,7 +356,7 @@ func (u *UptrackManager) processPendingSubscriptions(resolver func(participantID
var updatedPending []livekit.ParticipantID
for _, sid := range pending {
var sub types.Participant
var sub types.LocalParticipant
if resolver != nil {
sub = resolver(sid)
}
@@ -427,7 +386,7 @@ func (u *UptrackManager) processPendingSubscriptions(resolver func(participantID
u.pendingSubscriptions = updatedPendingSubscriptions
}
func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantID livekit.ParticipantID) types.Participant) {
func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantID livekit.ParticipantID) types.LocalParticipant) {
for _, track := range u.publishedTracks {
trackID := track.ID()
allowed := u.getAllowedSubscribers(trackID)
@@ -438,7 +397,7 @@ func (u *UptrackManager) maybeRevokeSubscriptions(resolver func(participantID li
revokedSubscribers := track.RevokeDisallowedSubscribers(allowed)
for _, subID := range revokedSubscribers {
var sub types.Participant
var sub types.LocalParticipant
if resolver != nil {
sub = resolver(subID)
}
+6 -6
View File
@@ -12,11 +12,11 @@ func TestUpdateSubscriptionPermissions(t *testing.T) {
t.Run("updates permissions", func(t *testing.T) {
um := NewUptrackManager(UptrackManagerParams{})
tra := &typesfakes.FakePublishedTrack{}
tra := &typesfakes.FakeMediaTrack{}
tra.IDReturns("audio")
um.publishedTracks["audio"] = tra
trv := &typesfakes.FakePublishedTrack{}
trv := &typesfakes.FakeMediaTrack{}
trv.IDReturns("video")
um.publishedTracks["video"] = trv
@@ -87,11 +87,11 @@ func TestPermissions(t *testing.T) {
t.Run("checks permissions", func(t *testing.T) {
um := NewUptrackManager(UptrackManagerParams{})
tra := &typesfakes.FakePublishedTrack{}
tra := &typesfakes.FakeMediaTrack{}
tra.IDReturns("audio")
um.publishedTracks["audio"] = tra
trv := &typesfakes.FakePublishedTrack{}
trv := &typesfakes.FakeMediaTrack{}
trv.IDReturns("video")
um.publishedTracks["video"] = trv
@@ -131,7 +131,7 @@ func TestPermissions(t *testing.T) {
require.True(t, um.hasPermission("video", "p2"))
// add a new track after permissions are set
trs := &typesfakes.FakePublishedTrack{}
trs := &typesfakes.FakeMediaTrack{}
trs.IDReturns("screen")
um.publishedTracks["screen"] = trs
@@ -173,7 +173,7 @@ func TestPermissions(t *testing.T) {
require.False(t, um.hasPermission("screen", "p3"))
// add a new track after restrictive permissions are set
trw := &typesfakes.FakePublishedTrack{}
trw := &typesfakes.FakeMediaTrack{}
trw.IDReturns("watch")
um.publishedTracks["watch"] = trw
+1 -1
View File
@@ -50,7 +50,7 @@ func FixedPointToPercent(frac uint8) uint32 {
return (uint32(frac) * 100) >> 8
}
func ToProtoParticipants(participants []types.Participant) []*livekit.ParticipantInfo {
func ToProtoParticipants(participants []types.LocalParticipant) []*livekit.ParticipantInfo {
infos := make([]*livekit.ParticipantInfo, 0, len(participants))
for _, op := range participants {
infos = append(infos, op.ToProto())
+3 -3
View File
@@ -277,7 +277,7 @@ func (r *RoomManager) StartSession(ctx context.Context, roomName livekit.RoomNam
}
r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto(), pi.Client)
participant.OnClose(func(p types.Participant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
participant.OnClose(func(p types.LocalParticipant, disallowedSubscriptions map[livekit.TrackID]livekit.ParticipantID) {
if err := r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()); err != nil {
pLogger.Errorw("could not delete participant", err)
}
@@ -329,7 +329,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
logger.Errorw("could not handle metadata update", err)
}
})
room.OnParticipantChanged(func(p types.Participant) {
room.OnParticipantChanged(func(p types.LocalParticipant) {
if p.State() != livekit.ParticipantInfo_DISCONNECTED {
if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil {
logger.Errorw("could not handle participant change", err)
@@ -344,7 +344,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
}
// manages an RTC session for a participant, runs on the RTC node
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) {
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalParticipant, requestSource routing.MessageSource) {
defer func() {
logger.Debugw("RTC session finishing",
"participant", participant.Identity(),