Initial commit of signal deduper. (#1243)

* Initial commit of signal deduper.

Idea is protect against signal storm from misbehaving clients.

Design:
- SignalDeduper interface with one method to handle a SignalRequest and
  return if dupe or not.
- Signal specific deduper. Could have made a single de-duper which could
  handle all signal message types, but making it per type so that the
  code is cleaner.
- Some module (like the router) can instantiate whatever signal types
  it wants to de-dupe. When a signal message is received, that module
  can run the signal message through the list of de-dupers and
  potentially drop the message if any of the de-dupers declare that the
  message is a dupe. Making it a list makes things a little bit
  inefficient, but keeps things cleaner. Hopefully, not many de-dupers
  will be needed so that the inefficiency is not pronounced.

* re-arrange comments

* helper function

* add ParticipantClosed
This commit is contained in:
Raja Subramanian
2022-12-21 09:29:56 +05:30
committed by GitHub
parent c1d7dbd4fc
commit f24c1b95c2
3 changed files with 319 additions and 0 deletions
@@ -0,0 +1,175 @@
package signaldeduper
import (
"sync"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
// --------------------------------------------------
type subscriptionSetting struct {
isEnabled bool
quality livekit.VideoQuality
width uint32
height uint32
fps uint32
}
func subscriptionSettingFromUpdateSubscription(us *livekit.UpdateSubscription) *subscriptionSetting {
return &subscriptionSetting{
isEnabled: us.Subscribe,
}
}
func subscriptionSettingPatchFromUpdateSubscription(us *livekit.UpdateSubscription, from *subscriptionSetting) *subscriptionSetting {
return &subscriptionSetting{
isEnabled: us.Subscribe,
quality: from.quality,
width: from.width,
height: from.height,
fps: from.fps,
}
}
func subscriptionSettingFromUpdateTrackSettings(uts *livekit.UpdateTrackSettings) *subscriptionSetting {
return &subscriptionSetting{
isEnabled: !uts.Disabled,
quality: uts.Quality,
width: uts.Width,
height: uts.Height,
fps: uts.Fps,
}
}
func (s *subscriptionSetting) Equal(other *subscriptionSetting) bool {
return s.isEnabled == other.isEnabled &&
s.quality == other.quality &&
s.width == other.width &&
s.height == other.height &&
s.fps == other.fps
}
// --------------------------------------------------
type SubscriptionDeduper struct {
logger logger.Logger
lock sync.RWMutex
participantsSubscriptions map[livekit.ParticipantKey]map[livekit.TrackID]*subscriptionSetting
}
func NewSubscriptionDeduper(logger logger.Logger) types.SignalDeduper {
return &SubscriptionDeduper{
logger: logger,
participantsSubscriptions: make(map[livekit.ParticipantKey]map[livekit.TrackID]*subscriptionSetting),
}
}
func (s *SubscriptionDeduper) Dedupe(participantKey livekit.ParticipantKey, req *livekit.SignalRequest) bool {
isDupe := false
switch msg := req.Message.(type) {
case *livekit.SignalRequest_Subscription:
isDupe = s.updateSubscriptionsFromUpdateSubscription(participantKey, msg.Subscription)
case *livekit.SignalRequest_TrackSetting:
isDupe = s.updateSubscriptionsFromUpdateTrackSettings(participantKey, msg.TrackSetting)
}
return isDupe
}
func (s *SubscriptionDeduper) ParticipantClosed(participantKey livekit.ParticipantKey) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.participantsSubscriptions, participantKey)
}
func (s *SubscriptionDeduper) updateSubscriptionsFromUpdateSubscription(
participantKey livekit.ParticipantKey,
us *livekit.UpdateSubscription,
) bool {
isDupe := true
s.lock.Lock()
defer s.lock.Unlock()
participantSubscriptions := s.getOrCreateParticipantSubscriptions(participantKey)
numTracks := len(us.TrackSids)
for _, pt := range us.ParticipantTracks {
numTracks += len(pt.TrackSids)
}
trackIDs := make(map[livekit.TrackID]bool, numTracks)
for _, trackSid := range us.TrackSids {
trackIDs[livekit.TrackID(trackSid)] = true
}
for _, pt := range us.ParticipantTracks {
for _, trackSid := range pt.TrackSids {
trackIDs[livekit.TrackID(trackSid)] = true
}
}
for trackID := range trackIDs {
subscriptionSetting := participantSubscriptions[trackID]
if subscriptionSetting == nil {
// new track seen
subscriptionSetting := subscriptionSettingFromUpdateSubscription(us)
participantSubscriptions[trackID] = subscriptionSetting
isDupe = false
} else {
newSubscriptionSetting := subscriptionSettingPatchFromUpdateSubscription(us, subscriptionSetting)
if !subscriptionSetting.Equal(newSubscriptionSetting) {
// subscription setting change
participantSubscriptions[trackID] = newSubscriptionSetting
isDupe = false
}
}
}
return isDupe
}
func (s *SubscriptionDeduper) updateSubscriptionsFromUpdateTrackSettings(
participantKey livekit.ParticipantKey,
uts *livekit.UpdateTrackSettings,
) bool {
isDupe := true
s.lock.Lock()
defer s.lock.Unlock()
participantSubscriptions := s.getOrCreateParticipantSubscriptions(participantKey)
for _, trackSid := range uts.TrackSids {
subscriptionSetting := participantSubscriptions[livekit.TrackID(trackSid)]
if subscriptionSetting == nil {
// new track seen
subscriptionSetting := subscriptionSettingFromUpdateTrackSettings(uts)
participantSubscriptions[livekit.TrackID(trackSid)] = subscriptionSetting
isDupe = false
} else {
newSubscriptionSetting := subscriptionSettingFromUpdateTrackSettings(uts)
if !subscriptionSetting.Equal(newSubscriptionSetting) {
// subscription setting change
participantSubscriptions[livekit.TrackID(trackSid)] = newSubscriptionSetting
isDupe = false
}
}
}
return isDupe
}
func (s *SubscriptionDeduper) getOrCreateParticipantSubscriptions(participantKey livekit.ParticipantKey) map[livekit.TrackID]*subscriptionSetting {
participantSubscriptions := s.participantsSubscriptions[participantKey]
if participantSubscriptions == nil {
participantSubscriptions = make(map[livekit.TrackID]*subscriptionSetting)
s.participantsSubscriptions[participantKey] = participantSubscriptions
}
return participantSubscriptions
}
@@ -0,0 +1,136 @@
package signaldeduper
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
func TestSubscriptionDeduper(t *testing.T) {
t.Run("dedupes subscription", func(t *testing.T) {
sd := NewSubscriptionDeduper(logger.GetLogger())
// new track using UpdateSubscription
us := &livekit.SignalRequest{
Message: &livekit.SignalRequest_Subscription{
Subscription: &livekit.UpdateSubscription{
TrackSids: []string{
"p1.track1",
"p1.track2",
},
Subscribe: true,
},
},
}
require.False(t, sd.Dedupe("p0", us))
// new track using UpdateSubscription - using ParticipantTracks
us = &livekit.SignalRequest{
Message: &livekit.SignalRequest_Subscription{
Subscription: &livekit.UpdateSubscription{
ParticipantTracks: []*livekit.ParticipantTracks{
&livekit.ParticipantTracks{
ParticipantSid: "p2",
TrackSids: []string{
"p2.track1",
"p2.track2",
},
},
&livekit.ParticipantTracks{
ParticipantSid: "p3",
TrackSids: []string{
"p3.track1",
"p3.track2",
},
},
},
Subscribe: true,
},
},
}
require.False(t, sd.Dedupe("p0", us))
// some tracks re-subscribing, should be a dupe
us = &livekit.SignalRequest{
Message: &livekit.SignalRequest_Subscription{
Subscription: &livekit.UpdateSubscription{
TrackSids: []string{
"p1.track1",
},
ParticipantTracks: []*livekit.ParticipantTracks{
&livekit.ParticipantTracks{
ParticipantSid: "p2",
TrackSids: []string{
"p2.track1",
},
},
},
Subscribe: true,
},
},
}
require.True(t, sd.Dedupe("p0", us))
// update track settings, should not be a dupe
uts := &livekit.SignalRequest{
Message: &livekit.SignalRequest_TrackSetting{
TrackSetting: &livekit.UpdateTrackSettings{
TrackSids: []string{
"p1.track1",
},
Width: 1280,
},
},
}
require.False(t, sd.Dedupe("p0", uts))
// same message again will be a dupe
require.True(t, sd.Dedupe("p0", uts))
// unsubscribe a track, should not be a dupe
uts = &livekit.SignalRequest{
Message: &livekit.SignalRequest_TrackSetting{
TrackSetting: &livekit.UpdateTrackSettings{
TrackSids: []string{
"p2.track1",
},
Disabled: true,
},
},
}
require.False(t, sd.Dedupe("p0", uts))
// use UpdateSubscription and unsubscribe, although different protocol message, effect is the same, hence should be dupe
us = &livekit.SignalRequest{
Message: &livekit.SignalRequest_Subscription{
Subscription: &livekit.UpdateSubscription{
TrackSids: []string{
"p2.track1",
},
},
},
}
require.True(t, sd.Dedupe("p0", us))
//
// Although unsubscribed, updating track setting with some other value populated should return not a dupe.
// Although track is still unsubscribed, deduper does not extrapolate functionality and does only a equality comparison
// to be on the safe side.
//
uts = &livekit.SignalRequest{
Message: &livekit.SignalRequest_TrackSetting{
TrackSetting: &livekit.UpdateTrackSettings{
TrackSids: []string{
"p2.track1",
},
Disabled: true,
Quality: livekit.VideoQuality_HIGH,
},
},
}
require.False(t, sd.Dedupe("p0", uts))
})
}
+8
View File
@@ -473,3 +473,11 @@ type OperationMonitor interface {
Check() error
IsIdle() bool
}
//
// SignalDeduper related definitions
//
type SignalDeduper interface {
Dedupe(participantKey livekit.ParticipantKey, req *livekit.SignalRequest) bool
ParticipantClosed(participantKey livekit.ParticipantKey)
}