Files
livekit/pkg/rtc/signalhandler.go
Raja Subramanian 07c43e0972 Supervisor beginnings (#1005)
* Remove VP9 from media engine set up.

* Remove vp9 from config sample

* Supervisor beginnings

Eventual goal is to have a reconciler which moves state from
actual -> desired. First step along the way is to observe/monitor.
The first step even in that is an initial implementation to get
feedback on the direction.

This PR is a start in that direction
- Concept of a supervisor at local participant level
- This supervisor will be responsible for periodically monitor
  actual vs desired (this is the one which will eventually trigger
  other things to reconcile, but for now it just logs on error)
- A new interface `OperationMonitor` which requires two methods
  o Check() returns an error based on actual vs desired state.
  o IsIdle() returns bool. Returns true if the monitor is idle.
- The supervisor maintains a list of monitors and does periodic check.

In the above framework, starting with list of
subscriptions/unsubscriptions. There is a new module
`SubscriptionMonitor` which checks subscription transitions.
A subscription transition is queued on subscribe/unsubscribe.
The transition can be satisfied when a subscribedTrack is added OR
removed. Error condition is when a transition is not satisfied for
10 seconds. Idle is when the transition queue is empty and
subscribedTrack is nil, i. e. the last transition would have been
unsubscribe and subscribed track removed (unsubscribe satisfied).

The idea is individual monitors can check on different things.
Some more things that I am thinking about are
- PublishedTrackMonitor - started when an add track happens,
  satisfied when OnTrack happens, error if `OnTrack` does not
  fire for a while and track is not muted, idle when there is
  nothing pending.
- PublishedTrackStreamingMonitor - to ensure that a published track
  is receiving media at the server (accounting for dynacast, mute, etc)
- SubscribedTrackStreamingMonitor - to ensure down track is sending
  data unless muted.

* Remove debug

* Protect against early casting errors

* Adding PublicationMonitor
2022-09-15 11:16:37 +05:30

94 lines
3.2 KiB
Go

package rtc
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
func HandleParticipantSignal(room types.Room, participant types.LocalParticipant, req *livekit.SignalRequest, pLogger logger.Logger) error {
switch msg := req.Message.(type) {
case *livekit.SignalRequest_Offer:
participant.HandleOffer(FromProtoSessionDescription(msg.Offer))
case *livekit.SignalRequest_Answer:
participant.HandleAnswer(FromProtoSessionDescription(msg.Answer))
case *livekit.SignalRequest_Trickle:
candidateInit, err := FromProtoTrickle(msg.Trickle)
if err != nil {
pLogger.Warnw("could not decode trickle", err)
return nil
}
participant.AddICECandidate(candidateInit, msg.Trickle.Target)
case *livekit.SignalRequest_AddTrack:
pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid)
participant.AddTrack(msg.AddTrack)
case *livekit.SignalRequest_Mute:
participant.SetTrackMuted(livekit.TrackID(msg.Mute.Sid), msg.Mute.Muted, false)
case *livekit.SignalRequest_Subscription:
var err error
// always allow unsubscribe
if participant.CanSubscribe() || !msg.Subscription.Subscribe {
updateErr := room.UpdateSubscriptions(
participant,
livekit.StringsAsTrackIDs(msg.Subscription.TrackSids),
msg.Subscription.ParticipantTracks,
msg.Subscription.Subscribe,
)
if updateErr != nil {
err = updateErr
}
} else {
err = ErrCannotSubscribe
}
if err != nil {
pLogger.Warnw("could not update subscription", err,
"tracks", msg.Subscription.TrackSids,
"subscribe", msg.Subscription.Subscribe)
} else {
pLogger.Infow("updated subscription",
"tracks", msg.Subscription.TrackSids,
"subscribe", msg.Subscription.Subscribe)
}
case *livekit.SignalRequest_TrackSetting:
for _, sid := range livekit.StringsAsTrackIDs(msg.TrackSetting.TrackSids) {
err := participant.UpdateSubscribedTrackSettings(sid, msg.TrackSetting)
if err != nil {
pLogger.Errorw("failed to update subscribed track settings", err, "trackID", sid)
continue
}
pLogger.Infow("updated subscribed track settings", "trackID", sid, "settings", msg.TrackSetting)
}
case *livekit.SignalRequest_Leave:
pLogger.Infow("client leaving room")
room.RemoveParticipant(participant.Identity(), types.ParticipantCloseReasonClientRequestLeave)
case *livekit.SignalRequest_UpdateLayers:
err := room.UpdateVideoLayers(participant, msg.UpdateLayers)
if err != nil {
pLogger.Warnw("could not update video layers", err,
"update", msg.UpdateLayers)
return nil
}
case *livekit.SignalRequest_SubscriptionPermission:
err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission)
if err != nil {
pLogger.Warnw("could not update subscription permission", err,
"permissions", msg.SubscriptionPermission)
}
case *livekit.SignalRequest_SyncState:
err := room.SyncState(participant, msg.SyncState)
if err != nil {
pLogger.Warnw("could not sync state", err,
"state", msg.SyncState)
}
case *livekit.SignalRequest_Simulate:
err := room.SimulateScenario(participant, msg.Simulate)
if err != nil {
pLogger.Warnw("could not simulate scenario", err,
"simulate", msg.Simulate)
}
}
return nil
}