From 850fecf931c81ef044c5e6d8e8de8563245d7d43 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 13 Jul 2021 21:35:08 -0700 Subject: [PATCH] Allow publishers to set layer availability (#51) * support client message to update active layers * update to match new protocol --- pkg/rtc/mediatrack.go | 12 +++++ pkg/rtc/types/interfaces.go | 1 + .../types/typesfakes/fake_published_track.go | 44 +++++++++++++++++++ pkg/service/roommanager.go | 10 +++++ 4 files changed, 67 insertions(+) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index ca425f19e..ea5d2db0f 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -12,6 +12,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/rtcerr" + "github.com/thoas/go-funk" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/logger" @@ -105,6 +106,17 @@ func (t *MediaTrack) SetMuted(muted bool) { t.lock.RUnlock() } +func (t *MediaTrack) SetSimulcastLayers(layers []livekit.VideoQuality) { + t.lock.RLock() + defer t.lock.RUnlock() + if t.receiver != nil { + layers16 := funk.Map(layers, func(l livekit.VideoQuality) uint16 { + return uint16(spatialLayerForQuality(l)) + }).([]uint16) + t.receiver.SetAvailableLayers(layers16) + } +} + func (t *MediaTrack) OnClose(f func()) { t.onClose = f } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 698d1de94..226e732c6 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -92,6 +92,7 @@ type PublishedTrack interface { Name() string IsMuted() bool SetMuted(muted bool) + SetSimulcastLayers(layers []livekit.VideoQuality) AddSubscriber(participant Participant) error RemoveSubscriber(participantId string) IsSubscriber(subId string) bool diff --git a/pkg/rtc/types/typesfakes/fake_published_track.go b/pkg/rtc/types/typesfakes/fake_published_track.go index 0db086599..87860cabf 100644 --- a/pkg/rtc/types/typesfakes/fake_published_track.go +++ b/pkg/rtc/types/typesfakes/fake_published_track.go @@ -90,6 +90,11 @@ type FakePublishedTrack struct { setMutedArgsForCall []struct { arg1 bool } + SetSimulcastLayersStub func([]livekit.VideoQuality) + setSimulcastLayersMutex sync.RWMutex + setSimulcastLayersArgsForCall []struct { + arg1 []livekit.VideoQuality + } StartStub func() startMutex sync.RWMutex startArgsForCall []struct { @@ -562,6 +567,43 @@ func (fake *FakePublishedTrack) SetMutedArgsForCall(i int) bool { return argsForCall.arg1 } +func (fake *FakePublishedTrack) SetSimulcastLayers(arg1 []livekit.VideoQuality) { + var arg1Copy []livekit.VideoQuality + if arg1 != nil { + arg1Copy = make([]livekit.VideoQuality, len(arg1)) + copy(arg1Copy, arg1) + } + fake.setSimulcastLayersMutex.Lock() + fake.setSimulcastLayersArgsForCall = append(fake.setSimulcastLayersArgsForCall, struct { + arg1 []livekit.VideoQuality + }{arg1Copy}) + stub := fake.SetSimulcastLayersStub + fake.recordInvocation("SetSimulcastLayers", []interface{}{arg1Copy}) + fake.setSimulcastLayersMutex.Unlock() + if stub != nil { + fake.SetSimulcastLayersStub(arg1) + } +} + +func (fake *FakePublishedTrack) SetSimulcastLayersCallCount() int { + fake.setSimulcastLayersMutex.RLock() + defer fake.setSimulcastLayersMutex.RUnlock() + return len(fake.setSimulcastLayersArgsForCall) +} + +func (fake *FakePublishedTrack) SetSimulcastLayersCalls(stub func([]livekit.VideoQuality)) { + fake.setSimulcastLayersMutex.Lock() + defer fake.setSimulcastLayersMutex.Unlock() + fake.SetSimulcastLayersStub = stub +} + +func (fake *FakePublishedTrack) SetSimulcastLayersArgsForCall(i int) []livekit.VideoQuality { + fake.setSimulcastLayersMutex.RLock() + defer fake.setSimulcastLayersMutex.RUnlock() + argsForCall := fake.setSimulcastLayersArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakePublishedTrack) Start() { fake.startMutex.Lock() fake.startArgsForCall = append(fake.startArgsForCall, struct { @@ -662,6 +704,8 @@ func (fake *FakePublishedTrack) Invocations() map[string][][]interface{} { defer fake.removeSubscriberMutex.RUnlock() fake.setMutedMutex.RLock() defer fake.setMutedMutex.RUnlock() + fake.setSimulcastLayersMutex.RLock() + defer fake.setSimulcastLayersMutex.RUnlock() fake.startMutex.RLock() defer fake.startMutex.RUnlock() fake.toProtoMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 4e5e84ff2..27ad2a3d4 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -442,6 +442,16 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici } case *livekit.SignalRequest_Leave: _ = participant.Close() + case *livekit.SignalRequest_Simulcast: + for _, track := range participant.GetPublishedTracks() { + if track.ID() == msg.Simulcast.TrackSid { + logger.Debugw("updating simulcast layers", + "participant", participant.Identity(), + "track", track.ID(), + "layers", msg.Simulcast.Layers) + track.SetSimulcastLayers(msg.Simulcast.Layers) + } + } } } }