From f24c1b95c24f4e3b80d602bb8b67a4f48cb078ce Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 21 Dec 2022 09:29:56 +0530 Subject: [PATCH] Initial commit of signal deduper. (#1243) * Initial commit of signal deduper. Idea is protect against signal storm from misbehaving clients. Design: - SignalDeduper interface with one method to handle a SignalRequest and return if dupe or not. - Signal specific deduper. Could have made a single de-duper which could handle all signal message types, but making it per type so that the code is cleaner. - Some module (like the router) can instantiate whatever signal types it wants to de-dupe. When a signal message is received, that module can run the signal message through the list of de-dupers and potentially drop the message if any of the de-dupers declare that the message is a dupe. Making it a list makes things a little bit inefficient, but keeps things cleaner. Hopefully, not many de-dupers will be needed so that the inefficiency is not pronounced. * re-arrange comments * helper function * add ParticipantClosed --- pkg/rtc/signaldeduper/subscriptiondeduper.go | 175 ++++++++++++++++++ .../signaldeduper/subscriptiondeduper_test.go | 136 ++++++++++++++ pkg/rtc/types/interfaces.go | 8 + 3 files changed, 319 insertions(+) create mode 100644 pkg/rtc/signaldeduper/subscriptiondeduper.go create mode 100644 pkg/rtc/signaldeduper/subscriptiondeduper_test.go diff --git a/pkg/rtc/signaldeduper/subscriptiondeduper.go b/pkg/rtc/signaldeduper/subscriptiondeduper.go new file mode 100644 index 000000000..79e146c95 --- /dev/null +++ b/pkg/rtc/signaldeduper/subscriptiondeduper.go @@ -0,0 +1,175 @@ +package signaldeduper + +import ( + "sync" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + + "github.com/livekit/livekit-server/pkg/rtc/types" +) + +// -------------------------------------------------- + +type subscriptionSetting struct { + isEnabled bool + quality livekit.VideoQuality + width uint32 + height uint32 + fps uint32 +} + +func subscriptionSettingFromUpdateSubscription(us *livekit.UpdateSubscription) *subscriptionSetting { + return &subscriptionSetting{ + isEnabled: us.Subscribe, + } +} + +func subscriptionSettingPatchFromUpdateSubscription(us *livekit.UpdateSubscription, from *subscriptionSetting) *subscriptionSetting { + return &subscriptionSetting{ + isEnabled: us.Subscribe, + quality: from.quality, + width: from.width, + height: from.height, + fps: from.fps, + } +} + +func subscriptionSettingFromUpdateTrackSettings(uts *livekit.UpdateTrackSettings) *subscriptionSetting { + return &subscriptionSetting{ + isEnabled: !uts.Disabled, + quality: uts.Quality, + width: uts.Width, + height: uts.Height, + fps: uts.Fps, + } +} + +func (s *subscriptionSetting) Equal(other *subscriptionSetting) bool { + return s.isEnabled == other.isEnabled && + s.quality == other.quality && + s.width == other.width && + s.height == other.height && + s.fps == other.fps +} + +// -------------------------------------------------- + +type SubscriptionDeduper struct { + logger logger.Logger + + lock sync.RWMutex + participantsSubscriptions map[livekit.ParticipantKey]map[livekit.TrackID]*subscriptionSetting +} + +func NewSubscriptionDeduper(logger logger.Logger) types.SignalDeduper { + return &SubscriptionDeduper{ + logger: logger, + participantsSubscriptions: make(map[livekit.ParticipantKey]map[livekit.TrackID]*subscriptionSetting), + } +} + +func (s *SubscriptionDeduper) Dedupe(participantKey livekit.ParticipantKey, req *livekit.SignalRequest) bool { + isDupe := false + switch msg := req.Message.(type) { + case *livekit.SignalRequest_Subscription: + isDupe = s.updateSubscriptionsFromUpdateSubscription(participantKey, msg.Subscription) + case *livekit.SignalRequest_TrackSetting: + isDupe = s.updateSubscriptionsFromUpdateTrackSettings(participantKey, msg.TrackSetting) + } + + return isDupe +} + +func (s *SubscriptionDeduper) ParticipantClosed(participantKey livekit.ParticipantKey) { + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.participantsSubscriptions, participantKey) +} + +func (s *SubscriptionDeduper) updateSubscriptionsFromUpdateSubscription( + participantKey livekit.ParticipantKey, + us *livekit.UpdateSubscription, +) bool { + isDupe := true + + s.lock.Lock() + defer s.lock.Unlock() + + participantSubscriptions := s.getOrCreateParticipantSubscriptions(participantKey) + + numTracks := len(us.TrackSids) + for _, pt := range us.ParticipantTracks { + numTracks += len(pt.TrackSids) + } + trackIDs := make(map[livekit.TrackID]bool, numTracks) + for _, trackSid := range us.TrackSids { + trackIDs[livekit.TrackID(trackSid)] = true + } + for _, pt := range us.ParticipantTracks { + for _, trackSid := range pt.TrackSids { + trackIDs[livekit.TrackID(trackSid)] = true + } + } + + for trackID := range trackIDs { + subscriptionSetting := participantSubscriptions[trackID] + if subscriptionSetting == nil { + // new track seen + subscriptionSetting := subscriptionSettingFromUpdateSubscription(us) + participantSubscriptions[trackID] = subscriptionSetting + isDupe = false + } else { + newSubscriptionSetting := subscriptionSettingPatchFromUpdateSubscription(us, subscriptionSetting) + if !subscriptionSetting.Equal(newSubscriptionSetting) { + // subscription setting change + participantSubscriptions[trackID] = newSubscriptionSetting + isDupe = false + } + } + } + + return isDupe +} + +func (s *SubscriptionDeduper) updateSubscriptionsFromUpdateTrackSettings( + participantKey livekit.ParticipantKey, + uts *livekit.UpdateTrackSettings, +) bool { + isDupe := true + + s.lock.Lock() + defer s.lock.Unlock() + + participantSubscriptions := s.getOrCreateParticipantSubscriptions(participantKey) + + for _, trackSid := range uts.TrackSids { + subscriptionSetting := participantSubscriptions[livekit.TrackID(trackSid)] + if subscriptionSetting == nil { + // new track seen + subscriptionSetting := subscriptionSettingFromUpdateTrackSettings(uts) + participantSubscriptions[livekit.TrackID(trackSid)] = subscriptionSetting + isDupe = false + } else { + newSubscriptionSetting := subscriptionSettingFromUpdateTrackSettings(uts) + if !subscriptionSetting.Equal(newSubscriptionSetting) { + // subscription setting change + participantSubscriptions[livekit.TrackID(trackSid)] = newSubscriptionSetting + isDupe = false + } + } + } + + return isDupe +} + +func (s *SubscriptionDeduper) getOrCreateParticipantSubscriptions(participantKey livekit.ParticipantKey) map[livekit.TrackID]*subscriptionSetting { + participantSubscriptions := s.participantsSubscriptions[participantKey] + if participantSubscriptions == nil { + participantSubscriptions = make(map[livekit.TrackID]*subscriptionSetting) + s.participantsSubscriptions[participantKey] = participantSubscriptions + } + + return participantSubscriptions +} diff --git a/pkg/rtc/signaldeduper/subscriptiondeduper_test.go b/pkg/rtc/signaldeduper/subscriptiondeduper_test.go new file mode 100644 index 000000000..eaf52f9ad --- /dev/null +++ b/pkg/rtc/signaldeduper/subscriptiondeduper_test.go @@ -0,0 +1,136 @@ +package signaldeduper + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +func TestSubscriptionDeduper(t *testing.T) { + t.Run("dedupes subscription", func(t *testing.T) { + sd := NewSubscriptionDeduper(logger.GetLogger()) + + // new track using UpdateSubscription + us := &livekit.SignalRequest{ + Message: &livekit.SignalRequest_Subscription{ + Subscription: &livekit.UpdateSubscription{ + TrackSids: []string{ + "p1.track1", + "p1.track2", + }, + Subscribe: true, + }, + }, + } + require.False(t, sd.Dedupe("p0", us)) + + // new track using UpdateSubscription - using ParticipantTracks + us = &livekit.SignalRequest{ + Message: &livekit.SignalRequest_Subscription{ + Subscription: &livekit.UpdateSubscription{ + ParticipantTracks: []*livekit.ParticipantTracks{ + &livekit.ParticipantTracks{ + ParticipantSid: "p2", + TrackSids: []string{ + "p2.track1", + "p2.track2", + }, + }, + &livekit.ParticipantTracks{ + ParticipantSid: "p3", + TrackSids: []string{ + "p3.track1", + "p3.track2", + }, + }, + }, + Subscribe: true, + }, + }, + } + require.False(t, sd.Dedupe("p0", us)) + + // some tracks re-subscribing, should be a dupe + us = &livekit.SignalRequest{ + Message: &livekit.SignalRequest_Subscription{ + Subscription: &livekit.UpdateSubscription{ + TrackSids: []string{ + "p1.track1", + }, + ParticipantTracks: []*livekit.ParticipantTracks{ + &livekit.ParticipantTracks{ + ParticipantSid: "p2", + TrackSids: []string{ + "p2.track1", + }, + }, + }, + Subscribe: true, + }, + }, + } + require.True(t, sd.Dedupe("p0", us)) + + // update track settings, should not be a dupe + uts := &livekit.SignalRequest{ + Message: &livekit.SignalRequest_TrackSetting{ + TrackSetting: &livekit.UpdateTrackSettings{ + TrackSids: []string{ + "p1.track1", + }, + Width: 1280, + }, + }, + } + require.False(t, sd.Dedupe("p0", uts)) + + // same message again will be a dupe + require.True(t, sd.Dedupe("p0", uts)) + + // unsubscribe a track, should not be a dupe + uts = &livekit.SignalRequest{ + Message: &livekit.SignalRequest_TrackSetting{ + TrackSetting: &livekit.UpdateTrackSettings{ + TrackSids: []string{ + "p2.track1", + }, + Disabled: true, + }, + }, + } + require.False(t, sd.Dedupe("p0", uts)) + + // use UpdateSubscription and unsubscribe, although different protocol message, effect is the same, hence should be dupe + us = &livekit.SignalRequest{ + Message: &livekit.SignalRequest_Subscription{ + Subscription: &livekit.UpdateSubscription{ + TrackSids: []string{ + "p2.track1", + }, + }, + }, + } + require.True(t, sd.Dedupe("p0", us)) + + // + // Although unsubscribed, updating track setting with some other value populated should return not a dupe. + // Although track is still unsubscribed, deduper does not extrapolate functionality and does only a equality comparison + // to be on the safe side. + // + uts = &livekit.SignalRequest{ + Message: &livekit.SignalRequest_TrackSetting{ + TrackSetting: &livekit.UpdateTrackSettings{ + TrackSids: []string{ + "p2.track1", + }, + Disabled: true, + Quality: livekit.VideoQuality_HIGH, + }, + }, + } + require.False(t, sd.Dedupe("p0", uts)) + }) +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 2f7ccf323..f498718d3 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -473,3 +473,11 @@ type OperationMonitor interface { Check() error IsIdle() bool } + +// +// SignalDeduper related definitions +// +type SignalDeduper interface { + Dedupe(participantKey livekit.ParticipantKey, req *livekit.SignalRequest) bool + ParticipantClosed(participantKey livekit.ParticipantKey) +}