diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index bff233137..42bdd25ed 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -47,6 +47,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/metric" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/signalling" "github.com/livekit/livekit-server/pkg/rtc/supervisor" "github.com/livekit/livekit-server/pkg/rtc/transport" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -280,16 +281,21 @@ type ParticipantImpl struct { version atomic.Uint32 // callbacks & handlers - onTrackPublished func(types.LocalParticipant, types.MediaTrack) - onTrackUpdated func(types.LocalParticipant, types.MediaTrack) - onTrackUnpublished func(types.LocalParticipant, types.MediaTrack) - onStateChange func(p types.LocalParticipant) - onSubscriberReady func(p types.LocalParticipant) - onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState) - onParticipantUpdate func(types.LocalParticipant) - onDataPacket func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) - onDataMessage func(types.LocalParticipant, []byte) - onMetrics func(types.Participant, *livekit.DataPacket) + onTrackPublished func(types.LocalParticipant, types.MediaTrack) + onTrackUpdated func(types.LocalParticipant, types.MediaTrack) + onTrackUnpublished func(types.LocalParticipant, types.MediaTrack) + onStateChange func(p types.LocalParticipant) + onSubscriberReady func(p types.LocalParticipant) + onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState) + onParticipantUpdate func(types.LocalParticipant) + onDataPacket func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) + onDataMessage func(types.LocalParticipant, []byte) + onMetrics func(types.Participant, *livekit.DataPacket) + onUpdateSubscriptions func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) + onUpdateSubscriptionPermission func(types.LocalParticipant, *livekit.SubscriptionPermission) error + onSyncState func(types.LocalParticipant, *livekit.SyncState) error + onSimulateScenario func(types.LocalParticipant, *livekit.SimulateScenario) error + onLeave func(types.LocalParticipant, types.ParticipantCloseReason) migrateState atomic.Value // types.MigrateState migratedTracksPublishedFuse core.Fuse @@ -309,6 +315,9 @@ type ParticipantImpl struct { metricsCollector *metric.MetricsCollector metricsReporter *metric.MetricsReporter + signalling signalling.ParticipantSignalling + signaller signalling.ParticipantSignaller + // loggers for publisher and subscriber pubLogger logger.Logger subLogger logger.Logger @@ -345,6 +354,14 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { joiningMessageLastWrittenSeqs: make(map[livekit.ParticipantID]uint32), }, } + p.signalling = signalling.NewSignalling(signalling.SignallingParams{ + Logger: params.Logger, + }) + p.signaller = signalling.NewSignallerAsync(signalling.SignallerAsyncParams{ + Logger: params.Logger, + Participant: p, + }) + p.id.Store(params.SID) p.dataChannelStats = telemetry.NewBytesTrackStats( telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, p.ID()), @@ -908,6 +925,66 @@ func (p *ParticipantImpl) getOnMetrics() func(types.Participant, *livekit.DataPa return p.onMetrics } +func (p *ParticipantImpl) OnUpdateSubscriptions(callback func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)) { + p.lock.Lock() + p.onUpdateSubscriptions = callback + p.lock.Unlock() +} + +func (p *ParticipantImpl) getOnUpdateSubscriptions() func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onUpdateSubscriptions +} + +func (p *ParticipantImpl) OnUpdateSubscriptionPermission(callback func(types.LocalParticipant, *livekit.SubscriptionPermission) error) { + p.lock.Lock() + p.onUpdateSubscriptionPermission = callback + p.lock.Unlock() +} + +func (p *ParticipantImpl) getOnUpdateSubscriptionPermission() func(types.LocalParticipant, *livekit.SubscriptionPermission) error { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onUpdateSubscriptionPermission +} + +func (p *ParticipantImpl) OnSyncState(callback func(types.LocalParticipant, *livekit.SyncState) error) { + p.lock.Lock() + p.onSyncState = callback + p.lock.Unlock() +} + +func (p *ParticipantImpl) getOnSyncState() func(types.LocalParticipant, *livekit.SyncState) error { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onSyncState +} + +func (p *ParticipantImpl) OnSimulateScenario(callback func(types.LocalParticipant, *livekit.SimulateScenario) error) { + p.lock.Lock() + p.onSimulateScenario = callback + p.lock.Unlock() +} + +func (p *ParticipantImpl) getOnSimulateScenario() func(types.LocalParticipant, *livekit.SimulateScenario) error { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onSimulateScenario +} + +func (p *ParticipantImpl) OnLeave(callback func(types.LocalParticipant, types.ParticipantCloseReason)) { + p.lock.Lock() + p.onLeave = callback + p.lock.Unlock() +} + +func (p *ParticipantImpl) getOnLeave() func(types.LocalParticipant, types.ParticipantCloseReason) { + p.lock.RLock() + defer p.lock.RUnlock() + return p.onLeave +} + func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) { if p.isClosed.Load() { go callback(p) @@ -3625,3 +3702,43 @@ func (p *ParticipantImpl) GetLastReliableSequence(migrateOut bool) uint32 { } return p.reliableDataInfo.lastPubReliableSeq.Load() } + +func (p *ParticipantImpl) HandleUpdateSubscriptions( + trackIDs []livekit.TrackID, + participantTracks []*livekit.ParticipantTracks, + subscribe bool, +) { + if onUpdateSubscriptions := p.getOnUpdateSubscriptions(); onUpdateSubscriptions != nil { + onUpdateSubscriptions(p, trackIDs, participantTracks, subscribe) + } +} + +func (p *ParticipantImpl) HandleUpdateSubscriptionPermission(subscriptionPermission *livekit.SubscriptionPermission) error { + if onUpdateSubscriptionPermission := p.getOnUpdateSubscriptionPermission(); onUpdateSubscriptionPermission != nil { + return onUpdateSubscriptionPermission(p, subscriptionPermission) + } + + return errors.New("no handler") +} + +func (p *ParticipantImpl) HandleSyncState(syncState *livekit.SyncState) error { + if onSyncState := p.getOnSyncState(); onSyncState != nil { + return onSyncState(p, syncState) + } + + return errors.New("no handler") +} + +func (p *ParticipantImpl) HandleSimulateScenario(simulateScenario *livekit.SimulateScenario) error { + if onSimulateScenario := p.getOnSimulateScenario(); onSimulateScenario != nil { + return onSimulateScenario(p, simulateScenario) + } + + return errors.New("no handler") +} + +func (p *ParticipantImpl) HandleLeaveRequest(reason types.ParticipantCloseReason) { + if onLeave := p.getOnLeave(); onLeave != nil { + onLeave(p, reason) + } +} diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index f8ee0e297..3d0cf1dc3 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -227,7 +227,7 @@ func TestOutOfOrderUpdates(t *testing.T) { p := newParticipantForTest("test") p.updateState(livekit.ParticipantInfo_JOINED) p.SetMetadata("initial metadata") - sink := p.getResponseSink().(*routingfakes.FakeMessageSink) + sink := p.GetResponseSink().(*routingfakes.FakeMessageSink) pi1 := p.ToProto() p.SetMetadata("second update") pi2 := p.ToProto() diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 9cb0d70e8..473134ae3 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -15,30 +15,29 @@ package rtc import ( - "fmt" "time" "github.com/pion/webrtc/v4" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" - "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" + + "google.golang.org/protobuf/proto" ) -func (p *ParticipantImpl) getResponseSink() routing.MessageSink { - p.resSinkMu.Lock() - defer p.resSinkMu.Unlock() - return p.resSink +func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) { + p.signaller.SetResponseSink(sink) } -func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) { - p.resSinkMu.Lock() - defer p.resSinkMu.Unlock() - p.resSink = sink +func (p *ParticipantImpl) GetResponseSink() routing.MessageSink { + return p.signaller.GetResponseSink() +} + +func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) { + p.signaller.CloseSignalConnection(reason) } func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error { @@ -55,11 +54,7 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e p.updateLock.Unlock() // send Join response - err := p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Join{ - Join: joinResponse, - }, - }) + err := p.signaller.WriteMessage(p.signalling.SignalJoinResponse(joinResponse)) if err != nil { return err } @@ -127,17 +122,7 @@ func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit. } p.updateLock.Unlock() - if len(validUpdates) == 0 { - return nil - } - - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Update{ - Update: &livekit.ParticipantUpdate{ - Participants: validUpdates, - }, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalParticipantUpdate(validUpdates)) } // SendSpeakerUpdate notifies participant changes to speakers. only send members that have changed since last update @@ -158,43 +143,19 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, for } } - if len(scopedSpeakers) == 0 { - return nil - } - - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_SpeakersChanged{ - SpeakersChanged: &livekit.SpeakersChanged{ - Speakers: scopedSpeakers, - }, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalSpeakerUpdate(scopedSpeakers)) } func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_RoomUpdate{ - RoomUpdate: &livekit.RoomUpdate{ - Room: room, - }, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalRoomUpdate(room)) } func (p *ParticipantImpl) SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_ConnectionQuality{ - ConnectionQuality: update, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalConnectionQualityUpdate(update)) } func (p *ParticipantImpl) SendRefreshToken(token string) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_RefreshToken{ - RefreshToken: token, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalRefreshToken(token)) } func (p *ParticipantImpl) SendRequestResponse(requestResponse *livekit.RequestResponse) error { @@ -206,19 +167,11 @@ func (p *ParticipantImpl) SendRequestResponse(requestResponse *livekit.RequestRe return nil } - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_RequestResponse{ - RequestResponse: requestResponse, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalRequestResponse(requestResponse)) } func (p *ParticipantImpl) SendRoomMovedResponse(roomMovedResponse *livekit.RoomMovedResponse) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_RoomMoved{ - RoomMoved: roomMovedResponse, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalRoomMovedResponse(roomMovedResponse)) } func (p *ParticipantImpl) HandleReconnectAndSendResponse(reconnectReason livekit.ReconnectReason, reconnectResponse *livekit.ReconnectResponse) error { @@ -227,11 +180,7 @@ func (p *ParticipantImpl) HandleReconnectAndSendResponse(reconnectReason livekit if !p.params.ClientInfo.CanHandleReconnectResponse() { return nil } - if err := p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Reconnect{ - Reconnect: reconnectResponse, - }, - }); err != nil { + if err := p.signaller.WriteMessage(p.signalling.SignalReconnectResponse(reconnectResponse)); err != nil { return err } @@ -263,17 +212,7 @@ func (p *ParticipantImpl) sendDisconnectUpdatesForReconnect() error { } p.updateLock.Unlock() - if len(disconnectedParticipants) == 0 { - return nil - } - - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Update{ - Update: &livekit.ParticipantUpdate{ - Participants: disconnectedParticipants, - }, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalParticipantUpdate(disconnectedParticipants)) } func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error { @@ -285,87 +224,40 @@ func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livek trickle := ToProtoTrickle(prevIC.ToJSON(), target, ic == nil) p.params.Logger.Debugw("sending ICE candidate", "transport", target, "trickle", logger.Proto(trickle)) - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Trickle{ - Trickle: trickle, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalICECandidate(trickle)) } func (p *ParticipantImpl) sendTrackMuted(trackID livekit.TrackID, muted bool) { - _ = p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Mute{ - Mute: &livekit.MuteTrackRequest{ - Sid: string(trackID), - Muted: muted, - }, - }, - }) + _ = p.signaller.WriteMessage(p.signalling.SignalTrackMuted(&livekit.MuteTrackRequest{ + Sid: string(trackID), + Muted: muted, + })) +} + +func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) error { + p.pubLogger.Debugw("sending track published", "cid", cid, "trackInfo", logger.Proto(ti)) + return p.signaller.WriteMessage(p.signalling.SignalTrackPublished(&livekit.TrackPublishedResponse{ + Cid: cid, + Track: ti, + })) } func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) { - _ = p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_TrackUnpublished{ - TrackUnpublished: &livekit.TrackUnpublishedResponse{ - TrackSid: string(trackID), - }, - }, - }) + _ = p.signaller.WriteMessage(p.signalling.SignalTrackUnpublished(&livekit.TrackUnpublishedResponse{ + TrackSid: string(trackID), + })) } func (p *ParticipantImpl) sendTrackHasBeenSubscribed(trackID livekit.TrackID) { if !p.params.ClientInfo.SupportTrackSubscribedEvent() { return } - _ = p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_TrackSubscribed{ - TrackSubscribed: &livekit.TrackSubscribed{ - TrackSid: string(trackID), - }, - }, - }) + _ = p.signaller.WriteMessage(p.signalling.SignalTrackSubscribed(&livekit.TrackSubscribed{ + TrackSid: string(trackID), + })) p.params.Logger.Debugw("track has been subscribed", "trackID", trackID) } -func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { - if p.IsDisconnected() || (!p.IsReady() && msg.GetJoin() == nil) { - return nil - } - - sink := p.getResponseSink() - if sink == nil { - p.params.Logger.Debugw("could not send message to participant", "messageType", fmt.Sprintf("%T", msg.Message)) - return nil - } - - err := sink.WriteMessage(msg) - if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) { - p.params.Logger.Debugw( - "could not send message to participant", - "error", err, - "messageType", fmt.Sprintf("%T", msg.Message), - ) - return nil - } else if err != nil { - p.params.Logger.Warnw( - "could not send message to participant", err, - "messageType", fmt.Sprintf("%T", msg.Message), - ) - return err - } - return nil -} - -// closes signal connection to notify client to resume/reconnect -func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) { - sink := p.getResponseSink() - if sink != nil { - p.params.Logger.Debugw("closing signal connection", "reason", reason, "connID", sink.ConnectionID()) - sink.Close() - p.SetResponseSink(nil) - } -} - func (p *ParticipantImpl) sendLeaveRequest( reason types.ParticipantCloseReason, isExpectedToResume bool, @@ -397,85 +289,75 @@ func (p *ParticipantImpl) sendLeaveRequest( } } } - if leave != nil { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Leave{ - Leave: leave, - }, - }) - } - - return nil + return p.signaller.WriteMessage(p.signalling.SignalLeaveRequest(leave)) } func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Answer{ - Answer: ToProtoSessionDescription(answer, answerId), - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(ToProtoSessionDescription(answer, answerId))) } func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_Offer{ - Offer: ToProtoSessionDescription(offer, offerId), - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(ToProtoSessionDescription(offer, offerId))) } func (p *ParticipantImpl) sendStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_StreamStateUpdate{ - StreamStateUpdate: streamStateUpdate, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalStreamStateUpdate(streamStateUpdate)) } func (p *ParticipantImpl) sendSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_SubscribedQualityUpdate{ - SubscribedQualityUpdate: subscribedQualityUpdate, - }, - }) -} - -func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) error { - p.pubLogger.Debugw("sending track published", "cid", cid, "trackInfo", logger.Proto(ti)) - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_TrackPublished{ - TrackPublished: &livekit.TrackPublishedResponse{ - Cid: cid, - Track: ti, - }, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalSubscribedQualityUpdate(subscribedQualityUpdate)) } func (p *ParticipantImpl) sendSubscriptionResponse(trackID livekit.TrackID, subErr livekit.SubscriptionError) error { - return p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_SubscriptionResponse{ - SubscriptionResponse: &livekit.SubscriptionResponse{ - TrackSid: string(trackID), - Err: subErr, - }, - }, - }) + return p.signaller.WriteMessage(p.signalling.SignalSubscriptionResponse(&livekit.SubscriptionResponse{ + TrackSid: string(trackID), + Err: subErr, + })) } func (p *ParticipantImpl) SendSubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) error { p.subLogger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed) - err := p.writeMessage(&livekit.SignalResponse{ - Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{ - SubscriptionPermissionUpdate: &livekit.SubscriptionPermissionUpdate{ - ParticipantSid: string(publisherID), - TrackSid: string(trackID), - Allowed: allowed, - }, - }, - }) + err := p.signaller.WriteMessage(p.signalling.SignalSubscriptionPermissionUpdate(&livekit.SubscriptionPermissionUpdate{ + ParticipantSid: string(publisherID), + TrackSid: string(trackID), + Allowed: allowed, + })) if err != nil { p.subLogger.Errorw("could not send subscription permission update", err) } return err } + +func (p *ParticipantImpl) SendConnectResponse(connectResponse *livekit.ConnectResponse) error { + // keep track of participant updates and versions + p.updateLock.Lock() + for _, op := range connectResponse.OtherParticipants { + p.updateCache.Add(livekit.ParticipantID(op.Sid), participantUpdateInfo{ + identity: livekit.ParticipantIdentity(op.Identity), + version: op.Version, + state: op.State, + updatedAt: time.Now(), + }) + } + p.updateLock.Unlock() + + err := p.signaller.WriteMessage(p.signalling.SignalConnectResponse(connectResponse)) + if err != nil { + return err + } + + // update state after sending message, so that no participant updates could slip through before JoinResponse is sent + p.updateLock.Lock() + if p.State() == livekit.ParticipantInfo_JOINING { + p.updateState(livekit.ParticipantInfo_JOINED) + } + queuedUpdates := p.queuedUpdates + p.queuedUpdates = nil + p.updateLock.Unlock() + + return p.SendParticipantUpdate(queuedUpdates) +} + +func (p *ParticipantImpl) SignalPendingMessages() proto.Message { + return p.signalling.PendingMessages() +} diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 108649f9d..a2db7bd96 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -552,6 +552,11 @@ func (r *Room) Join( }, true) } }) + participant.OnUpdateSubscriptions(r.onUpdateSubscriptions) + participant.OnUpdateSubscriptionPermission(r.onUpdateSubscriptionPermission) + participant.OnSyncState(r.onSyncState) + participant.OnSimulateScenario(r.onSimulateScenario) + participant.OnLeave(r.onLeave) r.launchTargetAgents(maps.Values(r.agentDispatches), participant, livekit.JobType_JT_PARTICIPANT) @@ -792,6 +797,27 @@ func (r *Room) Joinv2( return nil, err } connectResponse.SubscriberSdp = ToProtoSessionDescription(offer, 0) // SIGNALLING-V2-TODO - need to proper offerId? + // for sync response, this does not actually send, only generates messageId and caches the message + if err := participant.SendConnectResponse(connectResponse); err != nil { + prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) + return nil, err + } + + if wireMessage := participant.SignalPendingMessages(); wireMessage != nil { + if wireMessage, ok := wireMessage.(*livekit.Signalv2WireMessage); ok { + switch msg := wireMessage.GetMessage().(type) { + case *livekit.Signalv2WireMessage_Envelope: + got_connect_response: + for _, innerMsg := range msg.Envelope.GetServerMessages() { + switch serverMessage := innerMsg.GetMessage().(type) { + case *livekit.Signalv2ServerMessage_ConnectResponse: + connectResponse = serverMessage.ConnectResponse + break got_connect_response + } + } + } + } + } prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1) return connectResponse, nil @@ -878,129 +904,7 @@ func (r *Room) ResumeParticipant( return nil } -func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livekit.ParticipantID, reason types.ParticipantCloseReason) { - r.lock.Lock() - p, ok := r.participants[identity] - if !ok { - r.lock.Unlock() - return - } - - if pID != "" && p.ID() != pID { - // participant session has been replaced - r.lock.Unlock() - return - } - - agentJob := r.agentParticpants[identity] - - delete(r.participants, identity) - delete(r.participantOpts, identity) - delete(r.participantRequestSources, identity) - delete(r.hasPublished, identity) - delete(r.agentParticpants, identity) - if !p.Hidden() { - r.protoRoom.NumParticipants-- - } - - immediateChange := false - if p.IsRecorder() { - activeRecording := false - for _, op := range r.participants { - if op.IsRecorder() { - activeRecording = true - break - } - } - - if r.protoRoom.ActiveRecording != activeRecording { - r.protoRoom.ActiveRecording = activeRecording - immediateChange = true - } - } - r.lock.Unlock() - r.protoProxy.MarkDirty(immediateChange) - - if !p.HasConnected() { - fields := append( - connectionDetailsFields(p.GetICEConnectionInfo()), - "reason", reason.String(), - "clientInfo", logger.Proto(sutils.ClientInfoWithoutAddress(p.GetClientInfo())), - ) - p.GetLogger().Infow("removing participant without connection", fields...) - } - - // send broadcast only if it's not already closed - sendUpdates := !p.IsDisconnected() - - // remove all published tracks - for _, t := range p.GetPublishedTracks() { - p.RemovePublishedTrack(t, false) - r.trackManager.RemoveTrack(t) - } - - if agentJob != nil { - agentJob.participantLeft() - - go func() { - _, err := r.agentClient.TerminateJob(context.Background(), agentJob.Id, rpc.JobTerminateReason_AGENT_LEFT_ROOM) - if err != nil { - r.logger.Infow("failed sending TerminateJob RPC", "error", err, "jobID", agentJob.Id, "participant", identity) - } - }() - } - - p.OnTrackUpdated(nil) - p.OnTrackPublished(nil) - p.OnTrackUnpublished(nil) - p.OnStateChange(nil) - p.OnSubscriberReady(nil) - p.OnParticipantUpdate(nil) - p.OnDataPacket(nil) - p.OnDataMessage(nil) - p.OnMetrics(nil) - p.OnSubscribeStatusChanged(nil) - - // close participant as well - _ = p.Close(true, reason, false) - - r.leftAt.Store(time.Now().Unix()) - - if sendUpdates { - if r.onParticipantChanged != nil { - r.onParticipantChanged(p) - } - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) - } -} - -func (r *Room) UpdateSubscriptions( - participant types.LocalParticipant, - trackIDs []livekit.TrackID, - participantTracks []*livekit.ParticipantTracks, - subscribe bool, -) { - // handle subscription changes - for _, trackID := range trackIDs { - if subscribe { - participant.SubscribeToTrack(trackID, false) - } else { - participant.UnsubscribeFromTrack(trackID) - } - } - - for _, pt := range participantTracks { - for _, trackID := range livekit.StringsAsIDs[livekit.TrackID](pt.TrackSids) { - if subscribe { - participant.SubscribeToTrack(trackID, false) - } else { - participant.UnsubscribeFromTrack(trackID) - } - } - } -} - -func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.SyncState) error { +func (r *Room) onSyncState(participant types.LocalParticipant, state *livekit.SyncState) error { pLogger := participant.GetLogger() pLogger.Infow("setting sync state", "state", logger.Proto(state)) @@ -1046,8 +950,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync participant.UpdateSubscribedTrackSettings(livekit.TrackID(trackSid), &livekit.UpdateTrackSettings{Disabled: true}) } - r.UpdateSubscriptions( - participant, + participant.HandleUpdateSubscriptions( livekit.StringsAsIDs[livekit.TrackID](state.Subscription.TrackSids), state.Subscription.ParticipantTracks, state.Subscription.Subscribe, @@ -1055,7 +958,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync return nil } -func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error { +func (r *Room) onUpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error { if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion(0), r.GetParticipantByID); err != nil { return err } @@ -1285,7 +1188,7 @@ func (r *Room) OnRoomUpdated(f func()) { r.onRoomUpdated = f } -func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error { +func (r *Room) onSimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error { switch scenario := simulateScenario.Scenario.(type) { case *livekit.SimulateScenario_SpeakerUpdate: r.logger.Infow("simulating speaker update", "participant", participant.Identity(), "duration", scenario.SpeakerUpdate) @@ -1543,6 +1446,150 @@ func (r *Room) onMetrics(source types.Participant, dp *livekit.DataPacket) { BroadcastMetricsForRoom(r, source, dp, r.logger) } +func (r *Room) onUpdateSubscriptions( + participant types.LocalParticipant, + trackIDs []livekit.TrackID, + participantTracks []*livekit.ParticipantTracks, + subscribe bool, +) { + r.UpdateSubscriptions(participant, trackIDs, participantTracks, subscribe) +} + +func (r *Room) UpdateSubscriptions( + participant types.LocalParticipant, + trackIDs []livekit.TrackID, + participantTracks []*livekit.ParticipantTracks, + subscribe bool, +) { + // handle subscription changes + for _, trackID := range trackIDs { + if subscribe { + participant.SubscribeToTrack(trackID, false) + } else { + participant.UnsubscribeFromTrack(trackID) + } + } + + for _, pt := range participantTracks { + for _, trackID := range livekit.StringsAsIDs[livekit.TrackID](pt.TrackSids) { + if subscribe { + participant.SubscribeToTrack(trackID, false) + } else { + participant.UnsubscribeFromTrack(trackID) + } + } + } +} + +func (r *Room) onLeave(p types.LocalParticipant, reason types.ParticipantCloseReason) { + r.RemoveParticipant(p.Identity(), p.ID(), reason) +} + +func (r *Room) RemoveParticipant( + identity livekit.ParticipantIdentity, + pID livekit.ParticipantID, + reason types.ParticipantCloseReason, +) { + r.lock.Lock() + p, ok := r.participants[identity] + if !ok { + r.lock.Unlock() + return + } + + if pID != "" && p.ID() != pID { + // participant session has been replaced + r.lock.Unlock() + return + } + + agentJob := r.agentParticpants[identity] + + delete(r.participants, identity) + delete(r.participantOpts, identity) + delete(r.participantRequestSources, identity) + delete(r.hasPublished, identity) + delete(r.agentParticpants, identity) + if !p.Hidden() { + r.protoRoom.NumParticipants-- + } + + immediateChange := false + if p.IsRecorder() { + activeRecording := false + for _, op := range r.participants { + if op.IsRecorder() { + activeRecording = true + break + } + } + + if r.protoRoom.ActiveRecording != activeRecording { + r.protoRoom.ActiveRecording = activeRecording + immediateChange = true + } + } + r.lock.Unlock() + r.protoProxy.MarkDirty(immediateChange) + + if !p.HasConnected() { + fields := append( + connectionDetailsFields(p.GetICEConnectionInfo()), + "reason", reason.String(), + "clientInfo", logger.Proto(sutils.ClientInfoWithoutAddress(p.GetClientInfo())), + ) + p.GetLogger().Infow("removing participant without connection", fields...) + } + + // send broadcast only if it's not already closed + sendUpdates := !p.IsDisconnected() + + // remove all published tracks + for _, t := range p.GetPublishedTracks() { + p.RemovePublishedTrack(t, false) + r.trackManager.RemoveTrack(t) + } + + if agentJob != nil { + agentJob.participantLeft() + + go func() { + _, err := r.agentClient.TerminateJob(context.Background(), agentJob.Id, rpc.JobTerminateReason_AGENT_LEFT_ROOM) + if err != nil { + r.logger.Infow("failed sending TerminateJob RPC", "error", err, "jobID", agentJob.Id, "participant", identity) + } + }() + } + + p.OnTrackUpdated(nil) + p.OnTrackPublished(nil) + p.OnTrackUnpublished(nil) + p.OnStateChange(nil) + p.OnSubscriberReady(nil) + p.OnParticipantUpdate(nil) + p.OnDataPacket(nil) + p.OnDataMessage(nil) + p.OnMetrics(nil) + p.OnSubscribeStatusChanged(nil) + p.OnUpdateSubscriptions(nil) + p.OnUpdateSubscriptionPermission(nil) + p.OnSyncState(nil) + p.OnSimulateScenario(nil) + p.OnLeave(nil) + + // close participant as well + _ = p.Close(true, reason, false) + + r.leftAt.Store(time.Now().Unix()) + + if sendUpdates { + if r.onParticipantChanged != nil { + r.onParticipantChanged(p) + } + r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + } +} + func (r *Room) subscribeToExistingTracks(p types.LocalParticipant, isSync bool) { r.lock.RLock() shouldSubscribe := r.autoSubscribe(p) diff --git a/pkg/rtc/signalcache_test.go b/pkg/rtc/signalcache_test.go deleted file mode 100644 index 2046d68cb..000000000 --- a/pkg/rtc/signalcache_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package rtc - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/livekit/protocol/livekit" -) - -func TestSignalCache(t *testing.T) { - firstMessageId := uint32(10) - cache := NewSignalCache(SignalCacheParams{ - FirstMessageId: firstMessageId, - }) - - inputMessages := []*livekit.Signalv2ServerMessage{ - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - // SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - } - - // Add() - add one message at a time - for _, inputMessage := range inputMessages { - cache.Add(inputMessage, 2345) - } - - // get all messages in cache - outputMessages := cache.GetFromFront() - require.Equal(t, inputMessages, outputMessages) - - // clear one and get again - cache.Clear(firstMessageId) - - outputMessages = cache.GetFromFront() - require.Equal(t, inputMessages[1:], outputMessages) - - // clearing some evicted messages should not clear anything - cache.Clear(firstMessageId) // firstMessageId has been cleared already at this point - - outputMessages = cache.GetFromFront() - require.Equal(t, inputMessages[1:], outputMessages) - - // clear some and get rest in one go - outputMessages = cache.ClearAndGetFrom(firstMessageId + 3) - require.Equal(t, 1, len(outputMessages)) - require.Equal(t, inputMessages[3:], outputMessages) - - // getting again should get the same messages again as they sill should in cache - outputMessages = cache.GetFromFront() - require.Equal(t, inputMessages[3:], outputMessages) - - // clearing all and getting should return nil - require.Nil(t, cache.ClearAndGetFrom(firstMessageId+uint32(len(inputMessages)))) - - // getting again should return nil as the cache is fully cleared above - require.Nil(t, cache.GetFromFront()) - - // AddBatch() - add all messages at once - cache.AddBatch(inputMessages, 4567) - - // get all messages in cache - outputMessages = cache.GetFromFront() - require.Equal(t, inputMessages, outputMessages) -} diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 35ea7034b..a348476a9 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -16,12 +16,11 @@ package rtc import ( "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/rtc/types" ) -func HandleParticipantSignal(room types.Room, participant types.LocalParticipant, req *livekit.SignalRequest, pLogger logger.Logger) error { +func HandleParticipantSignal(participant types.LocalParticipant, req *livekit.SignalRequest) error { participant.UpdateLastSeenSignal() switch msg := req.GetMessage().(type) { @@ -34,13 +33,13 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant case *livekit.SignalRequest_Trickle: candidateInit, err := FromProtoTrickle(msg.Trickle) if err != nil { - pLogger.Warnw("could not decode trickle", err) + participant.GetLogger().Warnw("could not decode trickle", err) return nil } participant.AddICECandidate(candidateInit, msg.Trickle.Target) case *livekit.SignalRequest_AddTrack: - pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid) + participant.GetLogger().Debugw("add track request", "trackID", msg.AddTrack.Cid) participant.AddTrack(msg.AddTrack) case *livekit.SignalRequest_Mute: @@ -49,8 +48,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant case *livekit.SignalRequest_Subscription: // allow participant to indicate their interest in the subscription // permission check happens later in SubscriptionManager - room.UpdateSubscriptions( - participant, + participant.HandleUpdateSubscriptions( livekit.StringsAsIDs[livekit.TrackID](msg.Subscription.TrackSids), msg.Subscription.ParticipantTracks, msg.Subscription.Subscribe, @@ -71,28 +69,34 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant case livekit.DisconnectReason_USER_REJECTED: reason = types.ParticipantCloseReasonUserRejected } - pLogger.Debugw("client leaving room", "reason", reason) - room.RemoveParticipant(participant.Identity(), participant.ID(), reason) + participant.GetLogger().Debugw("client leaving room", "reason", reason) + participant.HandleLeaveRequest(reason) case *livekit.SignalRequest_SubscriptionPermission: - err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission) + err := participant.HandleUpdateSubscriptionPermission(msg.SubscriptionPermission) if err != nil { - pLogger.Warnw("could not update subscription permission", err, - "permissions", msg.SubscriptionPermission) + participant.GetLogger().Warnw( + "could not update subscription permission", err, + "permissions", msg.SubscriptionPermission, + ) } case *livekit.SignalRequest_SyncState: - err := room.SyncState(participant, msg.SyncState) + err := participant.HandleSyncState(msg.SyncState) if err != nil { - pLogger.Warnw("could not sync state", err, - "state", msg.SyncState) + participant.GetLogger().Warnw( + "could not sync state", err, + "state", msg.SyncState, + ) } case *livekit.SignalRequest_Simulate: - err := room.SimulateScenario(participant, msg.Simulate) + err := participant.HandleSimulateScenario(msg.Simulate) if err != nil { - pLogger.Warnw("could not simulate scenario", err, - "simulate", msg.Simulate) + participant.GetLogger().Warnw( + "could not simulate scenario", err, + "simulate", msg.Simulate, + ) } case *livekit.SignalRequest_PingReq: @@ -121,7 +125,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant participant.SetAttributes(msg.UpdateMetadata.Attributes) } } else { - pLogger.Warnw("could not update metadata", err) + participant.GetLogger().Warnw("could not update metadata", err) switch err { case ErrNameExceedsLimits: @@ -146,12 +150,12 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant case *livekit.SignalRequest_UpdateAudioTrack: if err := participant.UpdateAudioTrack(msg.UpdateAudioTrack); err != nil { - pLogger.Warnw("could not update audio track", err, "update", msg.UpdateAudioTrack) + participant.GetLogger().Warnw("could not update audio track", err, "update", msg.UpdateAudioTrack) } case *livekit.SignalRequest_UpdateVideoTrack: if err := participant.UpdateVideoTrack(msg.UpdateVideoTrack); err != nil { - pLogger.Warnw("could not update video track", err, "update", msg.UpdateVideoTrack) + participant.GetLogger().Warnw("could not update video track", err, "update", msg.UpdateVideoTrack) } } diff --git a/pkg/rtc/signalling/interfaces.go b/pkg/rtc/signalling/interfaces.go new file mode 100644 index 000000000..db5bb42e1 --- /dev/null +++ b/pkg/rtc/signalling/interfaces.go @@ -0,0 +1,63 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "github.com/livekit/protocol/livekit" + + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" + + "google.golang.org/protobuf/proto" +) + +type ParticipantSignaller interface { + SetResponseSink(sink routing.MessageSink) + GetResponseSink() routing.MessageSink + CloseSignalConnection(reason types.SignallingCloseReason) + + WriteMessage(msg proto.Message) error +} + +type ParticipantSignalling interface { + SignalJoinResponse(join *livekit.JoinResponse) proto.Message + SignalParticipantUpdate(participants []*livekit.ParticipantInfo) proto.Message + SignalSpeakerUpdate(speakers []*livekit.SpeakerInfo) proto.Message + SignalRoomUpdate(room *livekit.Room) proto.Message + SignalConnectionQualityUpdate(connectionQuality *livekit.ConnectionQualityUpdate) proto.Message + SignalRefreshToken(token string) proto.Message + SignalRequestResponse(requestResponse *livekit.RequestResponse) proto.Message + SignalRoomMovedResponse(roomMoved *livekit.RoomMovedResponse) proto.Message + SignalReconnectResponse(reconnect *livekit.ReconnectResponse) proto.Message + SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message + SignalTrackMuted(mute *livekit.MuteTrackRequest) proto.Message + SignalTrackPublished(trackPublished *livekit.TrackPublishedResponse) proto.Message + SignalTrackUnpublished(trackUnpublished *livekit.TrackUnpublishedResponse) proto.Message + SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message + SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message + SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message + SignalSdpOffer(offer *livekit.SessionDescription) proto.Message + SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message + SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message + SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message + SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message + + AckMessageId(ackMessageId uint32) + SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) + + PendingMessages() proto.Message + + SignalConnectResponse(connectResponse *livekit.ConnectResponse) proto.Message +} diff --git a/pkg/rtc/signalcache.go b/pkg/rtc/signalling/signalcache.go similarity index 74% rename from pkg/rtc/signalcache.go rename to pkg/rtc/signalling/signalcache.go index 6afbe8061..4d39505c1 100644 --- a/pkg/rtc/signalcache.go +++ b/pkg/rtc/signalling/signalcache.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rtc +package signalling import ( "math/rand" @@ -22,6 +22,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" ) type SignalCacheParams struct { @@ -32,9 +33,10 @@ type SignalCacheParams struct { type SignalCache struct { params SignalCacheParams - lock sync.Mutex - messageId uint32 - messages deque.Deque[*livekit.Signalv2ServerMessage] + lock sync.Mutex + messageId uint32 + lastProcessedRemoteMessageId uint32 + messages deque.Deque[*livekit.Signalv2ServerMessage] } func NewSignalCache(params SignalCacheParams) *SignalCache { @@ -49,18 +51,26 @@ func NewSignalCache(params SignalCacheParams) *SignalCache { return s } -func (s *SignalCache) Add(msg *livekit.Signalv2ServerMessage, lastRemoteId uint32) { - s.AddBatch([]*livekit.Signalv2ServerMessage{msg}, lastRemoteId) +func (s *SignalCache) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) { + s.lock.Lock() + defer s.lock.Unlock() + + s.lastProcessedRemoteMessageId = lastProcessedRemoteMessageId } -func (s *SignalCache) AddBatch(msgs []*livekit.Signalv2ServerMessage, lastRemoteId uint32) { +func (s *SignalCache) Add(msg *livekit.Signalv2ServerMessage) { + if msg != nil { + s.AddBatch([]*livekit.Signalv2ServerMessage{msg}) + } +} + +func (s *SignalCache) AddBatch(msgs []*livekit.Signalv2ServerMessage) { s.lock.Lock() defer s.lock.Unlock() for _, msg := range msgs { msg.Sequencer = &livekit.Sequencer{ - MessageId: s.messageId, - LastProcessedRemoteMessageId: lastRemoteId, + MessageId: s.messageId, } s.messageId++ @@ -95,7 +105,9 @@ func (s *SignalCache) GetFromFront() []*livekit.Signalv2ServerMessage { func (s *SignalCache) getFromFrontLocked() []*livekit.Signalv2ServerMessage { var msgs []*livekit.Signalv2ServerMessage for msg := range s.messages.Iter() { - msgs = append(msgs, msg) + clone := utils.CloneProto(msg) + clone.Sequencer.LastProcessedRemoteMessageId = s.lastProcessedRemoteMessageId + msgs = append(msgs, clone) } return msgs diff --git a/pkg/rtc/signalling/signalcache_test.go b/pkg/rtc/signalling/signalcache_test.go new file mode 100644 index 000000000..e7e2a669c --- /dev/null +++ b/pkg/rtc/signalling/signalcache_test.go @@ -0,0 +1,174 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/livekit/protocol/livekit" +) + +func TestSignalCache(t *testing.T) { + firstMessageId := uint32(10) + lastProcessedRemoteMessageId := uint32(2345) + cache := NewSignalCache(SignalCacheParams{ + FirstMessageId: firstMessageId, + }) + + inputMessages := []*livekit.Signalv2ServerMessage{ + &livekit.Signalv2ServerMessage{ + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + // SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added + &livekit.Signalv2ServerMessage{ + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + &livekit.Signalv2ServerMessage{ + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + &livekit.Signalv2ServerMessage{ + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + } + + expectedOutputMessages := []*livekit.Signalv2ServerMessage{ + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + // SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId + 1, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId + 2, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId + 3, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + } + + cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId) + + // Add() - add one message at a time + for _, inputMessage := range inputMessages { + cache.Add(inputMessage) + } + + // get all messages in cache + outputMessages := cache.GetFromFront() + require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages)) + + // clear one and get again + cache.Clear(firstMessageId) + + outputMessages = cache.GetFromFront() + require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages)) + + // clearing some evicted messages should not clear anything + cache.Clear(firstMessageId) // firstMessageId has been cleared already at this point + + outputMessages = cache.GetFromFront() + require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages)) + + // clear some and get rest in one go + outputMessages = cache.ClearAndGetFrom(firstMessageId + 3) + require.Equal(t, 1, len(outputMessages)) + require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages)) + + // getting again should get the same messages again as they sill should in cache + outputMessages = cache.GetFromFront() + require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages)) + + // clearing all and getting should return nil + require.Nil(t, cache.ClearAndGetFrom(firstMessageId+uint32(len(inputMessages)))) + + // getting again should return nil as the cache is fully cleared above + require.Nil(t, cache.GetFromFront()) + + lastProcessedRemoteMessageId = 4567 + cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId) + + expectedOutputMessages = []*livekit.Signalv2ServerMessage{ + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId + 4, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + // SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId + 1 + 4, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId + 2 + 4, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + &livekit.Signalv2ServerMessage{ + Sequencer: &livekit.Sequencer{ + MessageId: firstMessageId + 3 + 4, + LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, + }, + Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, + }, + } + + // AddBatch() - add all messages at once + cache.AddBatch(inputMessages) + + // get all messages in cache + outputMessages = cache.GetFromFront() + require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages)) +} + +func compareProtoSlices(a []*livekit.Signalv2ServerMessage, b []*livekit.Signalv2ServerMessage) bool { + if len(a) != len(b) { + return false + } + + for i := 0; i < len(a); i++ { + if !proto.Equal(a[i], b[i]) { + return false + } + } + + return true +} diff --git a/pkg/rtc/signalfragment.go b/pkg/rtc/signalling/signalfragment.go similarity index 99% rename from pkg/rtc/signalfragment.go rename to pkg/rtc/signalling/signalfragment.go index d3adf431e..911d4b38c 100644 --- a/pkg/rtc/signalfragment.go +++ b/pkg/rtc/signalling/signalfragment.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rtc +package signalling import ( "math/rand" diff --git a/pkg/rtc/signalfragment_test.go b/pkg/rtc/signalling/signalfragment_test.go similarity index 84% rename from pkg/rtc/signalfragment_test.go rename to pkg/rtc/signalling/signalfragment_test.go index a7b6ac09b..e539fd0dd 100644 --- a/pkg/rtc/signalfragment_test.go +++ b/pkg/rtc/signalling/signalfragment_test.go @@ -1,4 +1,18 @@ -package rtc +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling import ( "testing" diff --git a/pkg/rtc/signalling/signallerasync.go b/pkg/rtc/signalling/signallerasync.go new file mode 100644 index 000000000..080cf3194 --- /dev/null +++ b/pkg/rtc/signalling/signallerasync.go @@ -0,0 +1,107 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "fmt" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" + + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" + + "google.golang.org/protobuf/proto" +) + +var _ ParticipantSignaller = (*signallerAsync)(nil) + +type SignallerAsyncParams struct { + Logger logger.Logger + Participant types.LocalParticipant +} + +type signallerAsync struct { + params SignallerAsyncParams + + *signallerAsyncBase +} + +func NewSignallerAsync(params SignallerAsyncParams) ParticipantSignaller { + return &signallerAsync{ + params: params, + signallerAsyncBase: newSignallerAsyncBase(signallerAsyncBaseParams{Logger: params.Logger}), + } +} + +func (s *signallerAsync) WriteMessage(msg proto.Message) error { + if msg == nil { + return nil + } + + if s.params.Participant.IsDisconnected() { + return nil + } + + if !s.params.Participant.IsReady() { + if typed, ok := msg.(*livekit.SignalResponse); !ok { + s.params.Logger.Warnw( + "unknown message type", nil, + "messageType", fmt.Sprintf("%T", msg), + ) + } else { + if typed.GetJoin() == nil { + return nil + } + } + } + + sink := s.GetResponseSink() + if sink == nil { + if typed, ok := msg.(*livekit.SignalResponse); ok { + s.params.Logger.Debugw( + "could not send message to participant", + "messageType", fmt.Sprintf("%T", typed.Message), + ) + } + return nil + } + + err := sink.WriteMessage(msg) + if err != nil { + // SIGNALLING-V2-TODO: check for data channel errors to treat as non-error + if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) { + if typed, ok := msg.(*livekit.SignalResponse); ok { + s.params.Logger.Debugw( + "could not send message to participant", + "error", err, + "messageType", fmt.Sprintf("%T", typed.Message), + ) + } + return nil + } else { + if typed, ok := msg.(*livekit.SignalResponse); ok { + s.params.Logger.Warnw( + "could not send message to participant", err, + "messageType", fmt.Sprintf("%T", typed.Message), + ) + } + return err + } + } + return nil +} diff --git a/pkg/rtc/signalling/signallerasyncbase.go b/pkg/rtc/signalling/signallerasyncbase.go new file mode 100644 index 000000000..271b3618a --- /dev/null +++ b/pkg/rtc/signalling/signallerasyncbase.go @@ -0,0 +1,67 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "sync" + + "github.com/livekit/protocol/logger" + + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" +) + +type signallerAsyncBaseParams struct { + Logger logger.Logger +} + +type signallerAsyncBase struct { + signallerUnimplemented + + params signallerAsyncBaseParams + + resSinkMu sync.Mutex + resSink routing.MessageSink +} + +func newSignallerAsyncBase(params signallerAsyncBaseParams) *signallerAsyncBase { + return &signallerAsyncBase{ + params: params, + } +} + +func (s *signallerAsyncBase) SetResponseSink(sink routing.MessageSink) { + s.resSinkMu.Lock() + defer s.resSinkMu.Unlock() + s.resSink = sink +} + +func (s *signallerAsyncBase) GetResponseSink() routing.MessageSink { + s.resSinkMu.Lock() + defer s.resSinkMu.Unlock() + return s.resSink +} + +// closes signal connection to notify client to resume/reconnect +func (s *signallerAsyncBase) CloseSignalConnection(reason types.SignallingCloseReason) { + sink := s.GetResponseSink() + if sink == nil { + return + } + + s.params.Logger.Debugw("closing signal connection", "reason", reason, "connID", sink.ConnectionID()) + sink.Close() + s.SetResponseSink(nil) +} diff --git a/pkg/rtc/signalling/signallerunimplemented.go b/pkg/rtc/signalling/signallerunimplemented.go new file mode 100644 index 000000000..4c4be8bff --- /dev/null +++ b/pkg/rtc/signalling/signallerunimplemented.go @@ -0,0 +1,38 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" + + "google.golang.org/protobuf/proto" +) + +var _ ParticipantSignaller = (*signallerUnimplemented)(nil) + +type signallerUnimplemented struct{} + +func (u *signallerUnimplemented) SetResponseSink(sink routing.MessageSink) {} + +func (u *signallerUnimplemented) GetResponseSink() routing.MessageSink { + return nil +} + +func (u *signallerUnimplemented) CloseSignalConnection(reason types.SignallingCloseReason) {} + +func (u *signallerUnimplemented) WriteMessage(msg proto.Message) error { + return nil +} diff --git a/pkg/rtc/signalling/signallerv2async.go b/pkg/rtc/signalling/signallerv2async.go new file mode 100644 index 000000000..2d7cfc647 --- /dev/null +++ b/pkg/rtc/signalling/signallerv2async.go @@ -0,0 +1,172 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "fmt" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" + + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc/types" + + "google.golang.org/protobuf/proto" +) + +var _ ParticipantSignaller = (*signallerv2Async)(nil) + +type Signallerv2AsyncParams struct { + Logger logger.Logger + Participant types.LocalParticipant +} + +type signallerv2Async struct { + params Signallerv2AsyncParams + + *signallerAsyncBase + + signalFragment *SignalFragment +} + +func NewSignallerv2Async(params Signallerv2AsyncParams) ParticipantSignaller { + return &signallerv2Async{ + params: params, + signallerAsyncBase: newSignallerAsyncBase(signallerAsyncBaseParams{Logger: params.Logger}), + signalFragment: NewSignalFragment(SignalFragmentParams{ + Logger: params.Logger, + }), + } +} + +// SIGNALLING-V2-TODO: need to lock write so that fragments do not get interrupted +func (s *signallerv2Async) WriteMessage(msg proto.Message) error { + if msg == nil { + return nil + } + + if s.params.Participant.IsDisconnected() { + return nil + } + + if !s.params.Participant.IsReady() { + if typed, ok := msg.(*livekit.Signalv2WireMessage); !ok { + s.params.Logger.Warnw( + "unknown message type", nil, + "messageType", fmt.Sprintf("%T", msg), + ) + } else { + if !hasConnectResponse(typed) { + return nil + } + } + } + + sink := s.GetResponseSink() + if sink == nil { + if typed, ok := msg.(*livekit.Signalv2WireMessage); ok { + s.params.Logger.Debugw( + "could not send message to participant", + "messageType", fmt.Sprintf("%T", typed.Message), + ) + } + return nil + } + + // SIGNALLING-V2-TODO: avoid double marshalling, + // have to marshal once to get size of serialised packet and decide if it needs fragmentation, + // should used the marshaled bytes if fragmentation is not needed + var fragments []*livekit.Fragment + marshaled, err := proto.Marshal(msg) + if err != nil { + if typed, ok := msg.(*livekit.Signalv2WireMessage); ok { + s.params.Logger.Warnw( + "could not send message to participant", err, + "messageType", fmt.Sprintf("%T", typed.Message), + ) + } + } else { + fragments = s.signalFragment.Segment(marshaled) + } + + sendMsg := func(m proto.Message) error { + if err := sink.WriteMessage(m); err != nil { + // SIGNALLING-V2-TODO: check for data channel errors to treat as debug too + if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) { + if typed, ok := m.(*livekit.Signalv2WireMessage); ok { + s.params.Logger.Debugw( + "could not send message to participant", + "error", err, + "messageType", fmt.Sprintf("%T", typed.Message), + ) + } + return nil + } else { + if typed, ok := m.(*livekit.Signalv2WireMessage); ok { + s.params.Logger.Warnw( + "could not send message to participant", err, + "messageType", fmt.Sprintf("%T", typed.Message), + ) + } + return err + } + } + + return nil + } + + if len(fragments) != 0 { + for _, fragment := range fragments { + wireMessage := &livekit.Signalv2WireMessage{ + Message: &livekit.Signalv2WireMessage_Fragment{ + Fragment: fragment, + }, + } + if err := sendMsg(wireMessage); err != nil { + return err + } + } + } else { + if err := sendMsg(msg); err != nil { + return err + } + } + return nil +} + +// ---------------------------- + +func hasConnectResponse(wireMessage *livekit.Signalv2WireMessage) bool { + switch msg := wireMessage.GetMessage().(type) { + case *livekit.Signalv2WireMessage_Envelope: + for _, innerMsg := range msg.Envelope.GetServerMessages() { + switch innerMsg.GetMessage().(type) { + case *livekit.Signalv2ServerMessage_ConnectResponse: + return true + + default: + return false // first message should be `ConnectResponse` + } + } + + default: + // SIGNALLING-V2-TODO: handle ConnectResponse getting fragmented. + return false + } + + return false +} diff --git a/pkg/rtc/signalling/signallerv2hybrid.go b/pkg/rtc/signalling/signallerv2hybrid.go new file mode 100644 index 000000000..01c4fddd0 --- /dev/null +++ b/pkg/rtc/signalling/signallerv2hybrid.go @@ -0,0 +1,38 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "github.com/livekit/protocol/logger" +) + +var _ ParticipantSignaller = (*signallerv2Hybrid)(nil) + +type Signallerv2HybridParams struct { + Logger logger.Logger +} + +type signallerv2Hybrid struct { + params Signallerv2HybridParams + + *signallerv2Async +} + +func NewSignallerv2Hybrid(params Signallerv2HybridParams) ParticipantSignaller { + return &signallerv2Hybrid{ + params: params, + signallerv2Async: NewSignallerv2Async(Signallerv2AsyncParams{Logger: params.Logger}).(*signallerv2Async), + } +} diff --git a/pkg/rtc/signalling/signalling.go b/pkg/rtc/signalling/signalling.go new file mode 100644 index 000000000..0251efa35 --- /dev/null +++ b/pkg/rtc/signalling/signalling.go @@ -0,0 +1,298 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + + "google.golang.org/protobuf/proto" +) + +var _ ParticipantSignalling = (*signalling)(nil) + +type SignallingParams struct { + Logger logger.Logger +} + +type signalling struct { + signallingUnimplemented + + params SignallingParams +} + +func NewSignalling(params SignallingParams) ParticipantSignalling { + return &signalling{ + params: params, + } +} + +func (s *signalling) SignalJoinResponse(join *livekit.JoinResponse) proto.Message { + if join == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Join{ + Join: join, + }, + } +} + +func (s *signalling) SignalParticipantUpdate(participants []*livekit.ParticipantInfo) proto.Message { + if len(participants) == 0 { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Update{ + Update: &livekit.ParticipantUpdate{ + Participants: participants, + }, + }, + } +} + +func (s *signalling) SignalSpeakerUpdate(speakers []*livekit.SpeakerInfo) proto.Message { + if len(speakers) == 0 { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_SpeakersChanged{ + SpeakersChanged: &livekit.SpeakersChanged{ + Speakers: speakers, + }, + }, + } +} + +func (s *signalling) SignalRoomUpdate(room *livekit.Room) proto.Message { + if room == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_RoomUpdate{ + RoomUpdate: &livekit.RoomUpdate{ + Room: room, + }, + }, + } +} + +func (s *signalling) SignalConnectionQualityUpdate(connectionQuality *livekit.ConnectionQualityUpdate) proto.Message { + if connectionQuality == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_ConnectionQuality{ + ConnectionQuality: connectionQuality, + }, + } +} + +func (s *signalling) SignalRefreshToken(token string) proto.Message { + if token == "" { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_RefreshToken{ + RefreshToken: token, + }, + } +} + +func (s *signalling) SignalRequestResponse(requestResponse *livekit.RequestResponse) proto.Message { + if requestResponse == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_RequestResponse{ + RequestResponse: requestResponse, + }, + } +} + +func (s *signalling) SignalRoomMovedResponse(roomMoved *livekit.RoomMovedResponse) proto.Message { + if roomMoved == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_RoomMoved{ + RoomMoved: roomMoved, + }, + } +} + +func (s *signalling) SignalReconnectResponse(reconnect *livekit.ReconnectResponse) proto.Message { + if reconnect == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Reconnect{ + Reconnect: reconnect, + }, + } +} + +func (s *signalling) SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message { + if trickle == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Trickle{ + Trickle: trickle, + }, + } +} + +func (s *signalling) SignalTrackMuted(mute *livekit.MuteTrackRequest) proto.Message { + if mute == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Mute{ + Mute: mute, + }, + } +} + +func (s *signalling) SignalTrackPublished(trackPublished *livekit.TrackPublishedResponse) proto.Message { + if trackPublished == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_TrackPublished{ + TrackPublished: trackPublished, + }, + } +} + +func (s *signalling) SignalTrackUnpublished(trackUnpublished *livekit.TrackUnpublishedResponse) proto.Message { + if trackUnpublished == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_TrackUnpublished{ + TrackUnpublished: trackUnpublished, + }, + } +} + +func (s *signalling) SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message { + if trackSubscribed == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_TrackSubscribed{ + TrackSubscribed: trackSubscribed, + }, + } +} + +func (s *signalling) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message { + if leave == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Leave{ + Leave: leave, + }, + } +} + +func (s *signalling) SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message { + if answer == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Answer{ + Answer: answer, + }, + } +} + +func (s *signalling) SignalSdpOffer(offer *livekit.SessionDescription) proto.Message { + if offer == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_Offer{ + Offer: offer, + }, + } +} + +func (s *signalling) SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message { + if streamStateUpdate == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_StreamStateUpdate{ + StreamStateUpdate: streamStateUpdate, + }, + } +} + +func (s *signalling) SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message { + if subscribedQualityUpdate == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_SubscribedQualityUpdate{ + SubscribedQualityUpdate: subscribedQualityUpdate, + }, + } +} + +func (s *signalling) SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message { + if subscriptionResponse == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_SubscriptionResponse{ + SubscriptionResponse: subscriptionResponse, + }, + } +} + +func (s *signalling) SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message { + if subscriptionPermissionUpdate == nil { + return nil + } + + return &livekit.SignalResponse{ + Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{ + SubscriptionPermissionUpdate: subscriptionPermissionUpdate, + }, + } +} diff --git a/pkg/rtc/signalling/signallingunimplemented.go b/pkg/rtc/signalling/signallingunimplemented.go new file mode 100644 index 000000000..eefd011e5 --- /dev/null +++ b/pkg/rtc/signalling/signallingunimplemented.go @@ -0,0 +1,122 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "github.com/livekit/protocol/livekit" + + "google.golang.org/protobuf/proto" +) + +var _ ParticipantSignalling = (*signallingUnimplemented)(nil) + +type signallingUnimplemented struct{} + +func (u *signallingUnimplemented) SignalJoinResponse(join *livekit.JoinResponse) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalParticipantUpdate(participants []*livekit.ParticipantInfo) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalSpeakerUpdate(speakers []*livekit.SpeakerInfo) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalRoomUpdate(room *livekit.Room) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalConnectionQualityUpdate(connectionQuality *livekit.ConnectionQualityUpdate) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalRefreshToken(token string) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalRequestResponse(requestResponse *livekit.RequestResponse) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalRoomMovedResponse(roomMoved *livekit.RoomMovedResponse) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalReconnectResponse(reconnect *livekit.ReconnectResponse) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalTrackMuted(mute *livekit.MuteTrackRequest) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalTrackPublished(trackPublished *livekit.TrackPublishedResponse) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalTrackUnpublished(trackUnpublished *livekit.TrackUnpublishedResponse) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalSdpOffer(offer *livekit.SessionDescription) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message { + return nil +} + +func (u *signallingUnimplemented) AckMessageId(ackMessageId uint32) {} + +func (u *signallingUnimplemented) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) { +} + +func (u *signallingUnimplemented) PendingMessages() proto.Message { + return nil +} + +func (u *signallingUnimplemented) SignalConnectResponse(connectResponse *livekit.ConnectResponse) proto.Message { + return nil +} diff --git a/pkg/rtc/signalling/signallingv2.go b/pkg/rtc/signalling/signallingv2.go new file mode 100644 index 000000000..32706626f --- /dev/null +++ b/pkg/rtc/signalling/signallingv2.go @@ -0,0 +1,91 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signalling + +import ( + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "google.golang.org/protobuf/proto" +) + +var _ ParticipantSignalling = (*signallingv2)(nil) + +type Signallingv2Params struct { + Logger logger.Logger +} + +type signallingv2 struct { + signallingUnimplemented + + params Signallingv2Params + + signalCache *SignalCache + signalFragment *SignalFragment +} + +func NewSignallingv2(params Signallingv2Params) ParticipantSignalling { + return &signallingv2{ + params: params, + signalCache: NewSignalCache(SignalCacheParams{ + Logger: params.Logger, + }), + signalFragment: NewSignalFragment(SignalFragmentParams{ + Logger: params.Logger, + }), + } +} + +func (s *signallingv2) AckMessageId(ackMessageId uint32) { + s.signalCache.Clear(ackMessageId) +} + +func (s *signallingv2) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) { + s.signalCache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId) +} + +func (s *signallingv2) PendingMessages() proto.Message { + serverMessages := s.signalCache.GetFromFront() + if len(serverMessages) == 0 { + return nil + } + + return &livekit.Signalv2WireMessage{ + Message: &livekit.Signalv2WireMessage_Envelope{ + Envelope: &livekit.Envelope{ + ServerMessages: serverMessages, + }, + }, + } +} + +func (s *signallingv2) SignalConnectResponse(connectResponse *livekit.ConnectResponse) proto.Message { + if connectResponse == nil { + return nil + } + + serverMessage := &livekit.Signalv2ServerMessage{ + Message: &livekit.Signalv2ServerMessage_ConnectResponse{ + ConnectResponse: connectResponse, + }, + } + s.signalCache.Add(serverMessage) + return &livekit.Signalv2WireMessage{ + Message: &livekit.Signalv2WireMessage_Envelope{ + Envelope: &livekit.Envelope{ + ServerMessages: []*livekit.Signalv2ServerMessage{serverMessage}, + }, + }, + } +} diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index ffdc60675..b18da362e 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -32,6 +32,8 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/mime" "github.com/livekit/livekit-server/pkg/sfu/pacer" + + "google.golang.org/protobuf/proto" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -376,6 +378,7 @@ type LocalParticipant interface { GetLastReliableSequence(migrateOut bool) uint32 SetResponseSink(sink routing.MessageSink) + GetResponseSink() routing.MessageSink CloseSignalConnection(reason SignallingCloseReason) UpdateLastSeenSignal() SetSignalSourceValid(valid bool) @@ -450,6 +453,8 @@ type LocalParticipant interface { HandleReconnectAndSendResponse(reconnectReason livekit.ReconnectReason, reconnectResponse *livekit.ReconnectResponse) error IssueFullReconnect(reason ParticipantCloseReason) SendRoomMovedResponse(moved *livekit.RoomMovedResponse) error + SendConnectResponse(connectResponse *livekit.ConnectResponse) error + SignalPendingMessages() proto.Message // callbacks OnStateChange(func(p LocalParticipant)) @@ -468,6 +473,16 @@ type LocalParticipant interface { OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool)) OnClose(callback func(LocalParticipant)) OnClaimsChanged(callback func(LocalParticipant)) + OnUpdateSubscriptions(func( + LocalParticipant, + []livekit.TrackID, + []*livekit.ParticipantTracks, + bool, + )) + OnUpdateSubscriptionPermission(func(LocalParticipant, *livekit.SubscriptionPermission) error) + OnSyncState(func(LocalParticipant, *livekit.SyncState) error) + OnSimulateScenario(func(LocalParticipant, *livekit.SimulateScenario) error) + OnLeave(func(LocalParticipant, ParticipantCloseReason)) HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport) @@ -508,6 +523,15 @@ type LocalParticipant interface { GetDisableSenderReportPassThrough() bool HandleMetrics(senderParticipantID livekit.ParticipantID, batch *livekit.MetricsBatch) error + HandleUpdateSubscriptions( + []livekit.TrackID, + []*livekit.ParticipantTracks, + bool, + ) + HandleUpdateSubscriptionPermission(*livekit.SubscriptionPermission) error + HandleSyncState(*livekit.SyncState) error + HandleSimulateScenario(*livekit.SimulateScenario) error + HandleLeaveRequest(reason ParticipantCloseReason) } // Room is a container of participants, and can provide room-level actions @@ -517,10 +541,12 @@ type Room interface { Name() livekit.RoomName ID() livekit.RoomID RemoveParticipant(identity livekit.ParticipantIdentity, pID livekit.ParticipantID, reason ParticipantCloseReason) - UpdateSubscriptions(participant LocalParticipant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool) - UpdateSubscriptionPermission(participant LocalParticipant, permissions *livekit.SubscriptionPermission) error - SyncState(participant LocalParticipant, state *livekit.SyncState) error - SimulateScenario(participant LocalParticipant, scenario *livekit.SimulateScenario) error + UpdateSubscriptions( + participant LocalParticipant, + trackIDs []livekit.TrackID, + participantTracks []*livekit.ParticipantTracks, + subscribe bool, + ) ResolveMediaTrackForSubscriber(sub LocalParticipant, trackID livekit.TrackID) MediaResolverResult GetLocalParticipants() []LocalParticipant IsDataMessageUserPacketDuplicate(ip *livekit.UserPacket) bool diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 6a309360d..cc3c4df94 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -17,6 +17,7 @@ import ( "github.com/livekit/protocol/utils" "github.com/pion/rtcp" webrtc "github.com/pion/webrtc/v4" + "google.golang.org/protobuf/proto" ) type FakeLocalParticipant struct { @@ -456,6 +457,16 @@ type FakeLocalParticipant struct { getReporterResolverReturnsOnCall map[int]struct { result1 roomobs.ParticipantReporterResolver } + GetResponseSinkStub func() routing.MessageSink + getResponseSinkMutex sync.RWMutex + getResponseSinkArgsForCall []struct { + } + getResponseSinkReturns struct { + result1 routing.MessageSink + } + getResponseSinkReturnsOnCall map[int]struct { + result1 routing.MessageSink + } GetSubscribedParticipantsStub func() []livekit.ParticipantID getSubscribedParticipantsMutex sync.RWMutex getSubscribedParticipantsArgsForCall []struct { @@ -516,6 +527,11 @@ type FakeLocalParticipant struct { handleICETrickleSDPFragmentReturnsOnCall map[int]struct { result1 error } + HandleLeaveRequestStub func(types.ParticipantCloseReason) + handleLeaveRequestMutex sync.RWMutex + handleLeaveRequestArgsForCall []struct { + arg1 types.ParticipantCloseReason + } HandleMetricsStub func(livekit.ParticipantID, *livekit.MetricsBatch) error handleMetricsMutex sync.RWMutex handleMetricsArgsForCall []struct { @@ -562,6 +578,46 @@ type FakeLocalParticipant struct { handleSignalSourceCloseMutex sync.RWMutex handleSignalSourceCloseArgsForCall []struct { } + HandleSimulateScenarioStub func(*livekit.SimulateScenario) error + handleSimulateScenarioMutex sync.RWMutex + handleSimulateScenarioArgsForCall []struct { + arg1 *livekit.SimulateScenario + } + handleSimulateScenarioReturns struct { + result1 error + } + handleSimulateScenarioReturnsOnCall map[int]struct { + result1 error + } + HandleSyncStateStub func(*livekit.SyncState) error + handleSyncStateMutex sync.RWMutex + handleSyncStateArgsForCall []struct { + arg1 *livekit.SyncState + } + handleSyncStateReturns struct { + result1 error + } + handleSyncStateReturnsOnCall map[int]struct { + result1 error + } + HandleUpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission) error + handleUpdateSubscriptionPermissionMutex sync.RWMutex + handleUpdateSubscriptionPermissionArgsForCall []struct { + arg1 *livekit.SubscriptionPermission + } + handleUpdateSubscriptionPermissionReturns struct { + result1 error + } + handleUpdateSubscriptionPermissionReturnsOnCall map[int]struct { + result1 error + } + HandleUpdateSubscriptionsStub func([]livekit.TrackID, []*livekit.ParticipantTracks, bool) + handleUpdateSubscriptionsMutex sync.RWMutex + handleUpdateSubscriptionsArgsForCall []struct { + arg1 []livekit.TrackID + arg2 []*livekit.ParticipantTracks + arg3 bool + } HasConnectedStub func() bool hasConnectedMutex sync.RWMutex hasConnectedArgsForCall []struct { @@ -808,6 +864,11 @@ type FakeLocalParticipant struct { onICEConfigChangedArgsForCall []struct { arg1 func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) } + OnLeaveStub func(func(types.LocalParticipant, types.ParticipantCloseReason)) + onLeaveMutex sync.RWMutex + onLeaveArgsForCall []struct { + arg1 func(types.LocalParticipant, types.ParticipantCloseReason) + } OnMetricsStub func(func(types.Participant, *livekit.DataPacket)) onMetricsMutex sync.RWMutex onMetricsArgsForCall []struct { @@ -823,6 +884,11 @@ type FakeLocalParticipant struct { onParticipantUpdateArgsForCall []struct { arg1 func(types.LocalParticipant) } + OnSimulateScenarioStub func(func(types.LocalParticipant, *livekit.SimulateScenario) error) + onSimulateScenarioMutex sync.RWMutex + onSimulateScenarioArgsForCall []struct { + arg1 func(types.LocalParticipant, *livekit.SimulateScenario) error + } OnStateChangeStub func(func(p types.LocalParticipant)) onStateChangeMutex sync.RWMutex onStateChangeArgsForCall []struct { @@ -838,6 +904,11 @@ type FakeLocalParticipant struct { onSubscriberReadyArgsForCall []struct { arg1 func(types.LocalParticipant) } + OnSyncStateStub func(func(types.LocalParticipant, *livekit.SyncState) error) + onSyncStateMutex sync.RWMutex + onSyncStateArgsForCall []struct { + arg1 func(types.LocalParticipant, *livekit.SyncState) error + } OnTrackPublishedStub func(func(types.LocalParticipant, types.MediaTrack)) onTrackPublishedMutex sync.RWMutex onTrackPublishedArgsForCall []struct { @@ -853,6 +924,16 @@ type FakeLocalParticipant struct { onTrackUpdatedArgsForCall []struct { arg1 func(types.LocalParticipant, types.MediaTrack) } + OnUpdateSubscriptionPermissionStub func(func(types.LocalParticipant, *livekit.SubscriptionPermission) error) + onUpdateSubscriptionPermissionMutex sync.RWMutex + onUpdateSubscriptionPermissionArgsForCall []struct { + arg1 func(types.LocalParticipant, *livekit.SubscriptionPermission) error + } + OnUpdateSubscriptionsStub func(func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)) + onUpdateSubscriptionsMutex sync.RWMutex + onUpdateSubscriptionsArgsForCall []struct { + arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) + } ProtocolVersionStub func() types.ProtocolVersion protocolVersionMutex sync.RWMutex protocolVersionArgsForCall []struct { @@ -880,6 +961,17 @@ type FakeLocalParticipant struct { removeTrackLocalReturnsOnCall map[int]struct { result1 error } + SendConnectResponseStub func(*livekit.ConnectResponse) error + sendConnectResponseMutex sync.RWMutex + sendConnectResponseArgsForCall []struct { + arg1 *livekit.ConnectResponse + } + sendConnectResponseReturns struct { + result1 error + } + sendConnectResponseReturnsOnCall map[int]struct { + result1 error + } SendConnectionQualityUpdateStub func(*livekit.ConnectionQualityUpdate) error sendConnectionQualityUpdateMutex sync.RWMutex sendConnectionQualityUpdateArgsForCall []struct { @@ -1087,6 +1179,16 @@ type FakeLocalParticipant struct { setTrackMutedReturnsOnCall map[int]struct { result1 *livekit.TrackInfo } + SignalPendingMessagesStub func() proto.Message + signalPendingMessagesMutex sync.RWMutex + signalPendingMessagesArgsForCall []struct { + } + signalPendingMessagesReturns struct { + result1 proto.Message + } + signalPendingMessagesReturnsOnCall map[int]struct { + result1 proto.Message + } StateStub func() livekit.ParticipantInfo_State stateMutex sync.RWMutex stateArgsForCall []struct { @@ -3590,6 +3692,59 @@ func (fake *FakeLocalParticipant) GetReporterResolverReturnsOnCall(i int, result }{result1} } +func (fake *FakeLocalParticipant) GetResponseSink() routing.MessageSink { + fake.getResponseSinkMutex.Lock() + ret, specificReturn := fake.getResponseSinkReturnsOnCall[len(fake.getResponseSinkArgsForCall)] + fake.getResponseSinkArgsForCall = append(fake.getResponseSinkArgsForCall, struct { + }{}) + stub := fake.GetResponseSinkStub + fakeReturns := fake.getResponseSinkReturns + fake.recordInvocation("GetResponseSink", []interface{}{}) + fake.getResponseSinkMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetResponseSinkCallCount() int { + fake.getResponseSinkMutex.RLock() + defer fake.getResponseSinkMutex.RUnlock() + return len(fake.getResponseSinkArgsForCall) +} + +func (fake *FakeLocalParticipant) GetResponseSinkCalls(stub func() routing.MessageSink) { + fake.getResponseSinkMutex.Lock() + defer fake.getResponseSinkMutex.Unlock() + fake.GetResponseSinkStub = stub +} + +func (fake *FakeLocalParticipant) GetResponseSinkReturns(result1 routing.MessageSink) { + fake.getResponseSinkMutex.Lock() + defer fake.getResponseSinkMutex.Unlock() + fake.GetResponseSinkStub = nil + fake.getResponseSinkReturns = struct { + result1 routing.MessageSink + }{result1} +} + +func (fake *FakeLocalParticipant) GetResponseSinkReturnsOnCall(i int, result1 routing.MessageSink) { + fake.getResponseSinkMutex.Lock() + defer fake.getResponseSinkMutex.Unlock() + fake.GetResponseSinkStub = nil + if fake.getResponseSinkReturnsOnCall == nil { + fake.getResponseSinkReturnsOnCall = make(map[int]struct { + result1 routing.MessageSink + }) + } + fake.getResponseSinkReturnsOnCall[i] = struct { + result1 routing.MessageSink + }{result1} +} + func (fake *FakeLocalParticipant) GetSubscribedParticipants() []livekit.ParticipantID { fake.getSubscribedParticipantsMutex.Lock() ret, specificReturn := fake.getSubscribedParticipantsReturnsOnCall[len(fake.getSubscribedParticipantsArgsForCall)] @@ -3907,6 +4062,38 @@ func (fake *FakeLocalParticipant) HandleICETrickleSDPFragmentReturnsOnCall(i int }{result1} } +func (fake *FakeLocalParticipant) HandleLeaveRequest(arg1 types.ParticipantCloseReason) { + fake.handleLeaveRequestMutex.Lock() + fake.handleLeaveRequestArgsForCall = append(fake.handleLeaveRequestArgsForCall, struct { + arg1 types.ParticipantCloseReason + }{arg1}) + stub := fake.HandleLeaveRequestStub + fake.recordInvocation("HandleLeaveRequest", []interface{}{arg1}) + fake.handleLeaveRequestMutex.Unlock() + if stub != nil { + fake.HandleLeaveRequestStub(arg1) + } +} + +func (fake *FakeLocalParticipant) HandleLeaveRequestCallCount() int { + fake.handleLeaveRequestMutex.RLock() + defer fake.handleLeaveRequestMutex.RUnlock() + return len(fake.handleLeaveRequestArgsForCall) +} + +func (fake *FakeLocalParticipant) HandleLeaveRequestCalls(stub func(types.ParticipantCloseReason)) { + fake.handleLeaveRequestMutex.Lock() + defer fake.handleLeaveRequestMutex.Unlock() + fake.HandleLeaveRequestStub = stub +} + +func (fake *FakeLocalParticipant) HandleLeaveRequestArgsForCall(i int) types.ParticipantCloseReason { + fake.handleLeaveRequestMutex.RLock() + defer fake.handleLeaveRequestMutex.RUnlock() + argsForCall := fake.handleLeaveRequestArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) HandleMetrics(arg1 livekit.ParticipantID, arg2 *livekit.MetricsBatch) error { fake.handleMetricsMutex.Lock() ret, specificReturn := fake.handleMetricsReturnsOnCall[len(fake.handleMetricsArgsForCall)] @@ -4150,6 +4337,233 @@ func (fake *FakeLocalParticipant) HandleSignalSourceCloseCalls(stub func()) { fake.HandleSignalSourceCloseStub = stub } +func (fake *FakeLocalParticipant) HandleSimulateScenario(arg1 *livekit.SimulateScenario) error { + fake.handleSimulateScenarioMutex.Lock() + ret, specificReturn := fake.handleSimulateScenarioReturnsOnCall[len(fake.handleSimulateScenarioArgsForCall)] + fake.handleSimulateScenarioArgsForCall = append(fake.handleSimulateScenarioArgsForCall, struct { + arg1 *livekit.SimulateScenario + }{arg1}) + stub := fake.HandleSimulateScenarioStub + fakeReturns := fake.handleSimulateScenarioReturns + fake.recordInvocation("HandleSimulateScenario", []interface{}{arg1}) + fake.handleSimulateScenarioMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) HandleSimulateScenarioCallCount() int { + fake.handleSimulateScenarioMutex.RLock() + defer fake.handleSimulateScenarioMutex.RUnlock() + return len(fake.handleSimulateScenarioArgsForCall) +} + +func (fake *FakeLocalParticipant) HandleSimulateScenarioCalls(stub func(*livekit.SimulateScenario) error) { + fake.handleSimulateScenarioMutex.Lock() + defer fake.handleSimulateScenarioMutex.Unlock() + fake.HandleSimulateScenarioStub = stub +} + +func (fake *FakeLocalParticipant) HandleSimulateScenarioArgsForCall(i int) *livekit.SimulateScenario { + fake.handleSimulateScenarioMutex.RLock() + defer fake.handleSimulateScenarioMutex.RUnlock() + argsForCall := fake.handleSimulateScenarioArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) HandleSimulateScenarioReturns(result1 error) { + fake.handleSimulateScenarioMutex.Lock() + defer fake.handleSimulateScenarioMutex.Unlock() + fake.HandleSimulateScenarioStub = nil + fake.handleSimulateScenarioReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleSimulateScenarioReturnsOnCall(i int, result1 error) { + fake.handleSimulateScenarioMutex.Lock() + defer fake.handleSimulateScenarioMutex.Unlock() + fake.HandleSimulateScenarioStub = nil + if fake.handleSimulateScenarioReturnsOnCall == nil { + fake.handleSimulateScenarioReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.handleSimulateScenarioReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleSyncState(arg1 *livekit.SyncState) error { + fake.handleSyncStateMutex.Lock() + ret, specificReturn := fake.handleSyncStateReturnsOnCall[len(fake.handleSyncStateArgsForCall)] + fake.handleSyncStateArgsForCall = append(fake.handleSyncStateArgsForCall, struct { + arg1 *livekit.SyncState + }{arg1}) + stub := fake.HandleSyncStateStub + fakeReturns := fake.handleSyncStateReturns + fake.recordInvocation("HandleSyncState", []interface{}{arg1}) + fake.handleSyncStateMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) HandleSyncStateCallCount() int { + fake.handleSyncStateMutex.RLock() + defer fake.handleSyncStateMutex.RUnlock() + return len(fake.handleSyncStateArgsForCall) +} + +func (fake *FakeLocalParticipant) HandleSyncStateCalls(stub func(*livekit.SyncState) error) { + fake.handleSyncStateMutex.Lock() + defer fake.handleSyncStateMutex.Unlock() + fake.HandleSyncStateStub = stub +} + +func (fake *FakeLocalParticipant) HandleSyncStateArgsForCall(i int) *livekit.SyncState { + fake.handleSyncStateMutex.RLock() + defer fake.handleSyncStateMutex.RUnlock() + argsForCall := fake.handleSyncStateArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) HandleSyncStateReturns(result1 error) { + fake.handleSyncStateMutex.Lock() + defer fake.handleSyncStateMutex.Unlock() + fake.HandleSyncStateStub = nil + fake.handleSyncStateReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleSyncStateReturnsOnCall(i int, result1 error) { + fake.handleSyncStateMutex.Lock() + defer fake.handleSyncStateMutex.Unlock() + fake.HandleSyncStateStub = nil + if fake.handleSyncStateReturnsOnCall == nil { + fake.handleSyncStateReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.handleSyncStateReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission) error { + fake.handleUpdateSubscriptionPermissionMutex.Lock() + ret, specificReturn := fake.handleUpdateSubscriptionPermissionReturnsOnCall[len(fake.handleUpdateSubscriptionPermissionArgsForCall)] + fake.handleUpdateSubscriptionPermissionArgsForCall = append(fake.handleUpdateSubscriptionPermissionArgsForCall, struct { + arg1 *livekit.SubscriptionPermission + }{arg1}) + stub := fake.HandleUpdateSubscriptionPermissionStub + fakeReturns := fake.handleUpdateSubscriptionPermissionReturns + fake.recordInvocation("HandleUpdateSubscriptionPermission", []interface{}{arg1}) + fake.handleUpdateSubscriptionPermissionMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionCallCount() int { + fake.handleUpdateSubscriptionPermissionMutex.RLock() + defer fake.handleUpdateSubscriptionPermissionMutex.RUnlock() + return len(fake.handleUpdateSubscriptionPermissionArgsForCall) +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission) error) { + fake.handleUpdateSubscriptionPermissionMutex.Lock() + defer fake.handleUpdateSubscriptionPermissionMutex.Unlock() + fake.HandleUpdateSubscriptionPermissionStub = stub +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionArgsForCall(i int) *livekit.SubscriptionPermission { + fake.handleUpdateSubscriptionPermissionMutex.RLock() + defer fake.handleUpdateSubscriptionPermissionMutex.RUnlock() + argsForCall := fake.handleUpdateSubscriptionPermissionArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionReturns(result1 error) { + fake.handleUpdateSubscriptionPermissionMutex.Lock() + defer fake.handleUpdateSubscriptionPermissionMutex.Unlock() + fake.HandleUpdateSubscriptionPermissionStub = nil + fake.handleUpdateSubscriptionPermissionReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionReturnsOnCall(i int, result1 error) { + fake.handleUpdateSubscriptionPermissionMutex.Lock() + defer fake.handleUpdateSubscriptionPermissionMutex.Unlock() + fake.HandleUpdateSubscriptionPermissionStub = nil + if fake.handleUpdateSubscriptionPermissionReturnsOnCall == nil { + fake.handleUpdateSubscriptionPermissionReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.handleUpdateSubscriptionPermissionReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptions(arg1 []livekit.TrackID, arg2 []*livekit.ParticipantTracks, arg3 bool) { + var arg1Copy []livekit.TrackID + if arg1 != nil { + arg1Copy = make([]livekit.TrackID, len(arg1)) + copy(arg1Copy, arg1) + } + var arg2Copy []*livekit.ParticipantTracks + if arg2 != nil { + arg2Copy = make([]*livekit.ParticipantTracks, len(arg2)) + copy(arg2Copy, arg2) + } + fake.handleUpdateSubscriptionsMutex.Lock() + fake.handleUpdateSubscriptionsArgsForCall = append(fake.handleUpdateSubscriptionsArgsForCall, struct { + arg1 []livekit.TrackID + arg2 []*livekit.ParticipantTracks + arg3 bool + }{arg1Copy, arg2Copy, arg3}) + stub := fake.HandleUpdateSubscriptionsStub + fake.recordInvocation("HandleUpdateSubscriptions", []interface{}{arg1Copy, arg2Copy, arg3}) + fake.handleUpdateSubscriptionsMutex.Unlock() + if stub != nil { + fake.HandleUpdateSubscriptionsStub(arg1, arg2, arg3) + } +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionsCallCount() int { + fake.handleUpdateSubscriptionsMutex.RLock() + defer fake.handleUpdateSubscriptionsMutex.RUnlock() + return len(fake.handleUpdateSubscriptionsArgsForCall) +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionsCalls(stub func([]livekit.TrackID, []*livekit.ParticipantTracks, bool)) { + fake.handleUpdateSubscriptionsMutex.Lock() + defer fake.handleUpdateSubscriptionsMutex.Unlock() + fake.HandleUpdateSubscriptionsStub = stub +} + +func (fake *FakeLocalParticipant) HandleUpdateSubscriptionsArgsForCall(i int) ([]livekit.TrackID, []*livekit.ParticipantTracks, bool) { + fake.handleUpdateSubscriptionsMutex.RLock() + defer fake.handleUpdateSubscriptionsMutex.RUnlock() + argsForCall := fake.handleUpdateSubscriptionsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + func (fake *FakeLocalParticipant) HasConnected() bool { fake.hasConnectedMutex.Lock() ret, specificReturn := fake.hasConnectedReturnsOnCall[len(fake.hasConnectedArgsForCall)] @@ -5504,6 +5918,38 @@ func (fake *FakeLocalParticipant) OnICEConfigChangedArgsForCall(i int) func(part return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnLeave(arg1 func(types.LocalParticipant, types.ParticipantCloseReason)) { + fake.onLeaveMutex.Lock() + fake.onLeaveArgsForCall = append(fake.onLeaveArgsForCall, struct { + arg1 func(types.LocalParticipant, types.ParticipantCloseReason) + }{arg1}) + stub := fake.OnLeaveStub + fake.recordInvocation("OnLeave", []interface{}{arg1}) + fake.onLeaveMutex.Unlock() + if stub != nil { + fake.OnLeaveStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnLeaveCallCount() int { + fake.onLeaveMutex.RLock() + defer fake.onLeaveMutex.RUnlock() + return len(fake.onLeaveArgsForCall) +} + +func (fake *FakeLocalParticipant) OnLeaveCalls(stub func(func(types.LocalParticipant, types.ParticipantCloseReason))) { + fake.onLeaveMutex.Lock() + defer fake.onLeaveMutex.Unlock() + fake.OnLeaveStub = stub +} + +func (fake *FakeLocalParticipant) OnLeaveArgsForCall(i int) func(types.LocalParticipant, types.ParticipantCloseReason) { + fake.onLeaveMutex.RLock() + defer fake.onLeaveMutex.RUnlock() + argsForCall := fake.onLeaveArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) OnMetrics(arg1 func(types.Participant, *livekit.DataPacket)) { fake.onMetricsMutex.Lock() fake.onMetricsArgsForCall = append(fake.onMetricsArgsForCall, struct { @@ -5600,6 +6046,38 @@ func (fake *FakeLocalParticipant) OnParticipantUpdateArgsForCall(i int) func(typ return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnSimulateScenario(arg1 func(types.LocalParticipant, *livekit.SimulateScenario) error) { + fake.onSimulateScenarioMutex.Lock() + fake.onSimulateScenarioArgsForCall = append(fake.onSimulateScenarioArgsForCall, struct { + arg1 func(types.LocalParticipant, *livekit.SimulateScenario) error + }{arg1}) + stub := fake.OnSimulateScenarioStub + fake.recordInvocation("OnSimulateScenario", []interface{}{arg1}) + fake.onSimulateScenarioMutex.Unlock() + if stub != nil { + fake.OnSimulateScenarioStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnSimulateScenarioCallCount() int { + fake.onSimulateScenarioMutex.RLock() + defer fake.onSimulateScenarioMutex.RUnlock() + return len(fake.onSimulateScenarioArgsForCall) +} + +func (fake *FakeLocalParticipant) OnSimulateScenarioCalls(stub func(func(types.LocalParticipant, *livekit.SimulateScenario) error)) { + fake.onSimulateScenarioMutex.Lock() + defer fake.onSimulateScenarioMutex.Unlock() + fake.OnSimulateScenarioStub = stub +} + +func (fake *FakeLocalParticipant) OnSimulateScenarioArgsForCall(i int) func(types.LocalParticipant, *livekit.SimulateScenario) error { + fake.onSimulateScenarioMutex.RLock() + defer fake.onSimulateScenarioMutex.RUnlock() + argsForCall := fake.onSimulateScenarioArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant)) { fake.onStateChangeMutex.Lock() fake.onStateChangeArgsForCall = append(fake.onStateChangeArgsForCall, struct { @@ -5696,6 +6174,38 @@ func (fake *FakeLocalParticipant) OnSubscriberReadyArgsForCall(i int) func(types return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnSyncState(arg1 func(types.LocalParticipant, *livekit.SyncState) error) { + fake.onSyncStateMutex.Lock() + fake.onSyncStateArgsForCall = append(fake.onSyncStateArgsForCall, struct { + arg1 func(types.LocalParticipant, *livekit.SyncState) error + }{arg1}) + stub := fake.OnSyncStateStub + fake.recordInvocation("OnSyncState", []interface{}{arg1}) + fake.onSyncStateMutex.Unlock() + if stub != nil { + fake.OnSyncStateStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnSyncStateCallCount() int { + fake.onSyncStateMutex.RLock() + defer fake.onSyncStateMutex.RUnlock() + return len(fake.onSyncStateArgsForCall) +} + +func (fake *FakeLocalParticipant) OnSyncStateCalls(stub func(func(types.LocalParticipant, *livekit.SyncState) error)) { + fake.onSyncStateMutex.Lock() + defer fake.onSyncStateMutex.Unlock() + fake.OnSyncStateStub = stub +} + +func (fake *FakeLocalParticipant) OnSyncStateArgsForCall(i int) func(types.LocalParticipant, *livekit.SyncState) error { + fake.onSyncStateMutex.RLock() + defer fake.onSyncStateMutex.RUnlock() + argsForCall := fake.onSyncStateArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) OnTrackPublished(arg1 func(types.LocalParticipant, types.MediaTrack)) { fake.onTrackPublishedMutex.Lock() fake.onTrackPublishedArgsForCall = append(fake.onTrackPublishedArgsForCall, struct { @@ -5792,6 +6302,70 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo return argsForCall.arg1 } +func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermission(arg1 func(types.LocalParticipant, *livekit.SubscriptionPermission) error) { + fake.onUpdateSubscriptionPermissionMutex.Lock() + fake.onUpdateSubscriptionPermissionArgsForCall = append(fake.onUpdateSubscriptionPermissionArgsForCall, struct { + arg1 func(types.LocalParticipant, *livekit.SubscriptionPermission) error + }{arg1}) + stub := fake.OnUpdateSubscriptionPermissionStub + fake.recordInvocation("OnUpdateSubscriptionPermission", []interface{}{arg1}) + fake.onUpdateSubscriptionPermissionMutex.Unlock() + if stub != nil { + fake.OnUpdateSubscriptionPermissionStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermissionCallCount() int { + fake.onUpdateSubscriptionPermissionMutex.RLock() + defer fake.onUpdateSubscriptionPermissionMutex.RUnlock() + return len(fake.onUpdateSubscriptionPermissionArgsForCall) +} + +func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermissionCalls(stub func(func(types.LocalParticipant, *livekit.SubscriptionPermission) error)) { + fake.onUpdateSubscriptionPermissionMutex.Lock() + defer fake.onUpdateSubscriptionPermissionMutex.Unlock() + fake.OnUpdateSubscriptionPermissionStub = stub +} + +func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermissionArgsForCall(i int) func(types.LocalParticipant, *livekit.SubscriptionPermission) error { + fake.onUpdateSubscriptionPermissionMutex.RLock() + defer fake.onUpdateSubscriptionPermissionMutex.RUnlock() + argsForCall := fake.onUpdateSubscriptionPermissionArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) OnUpdateSubscriptions(arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)) { + fake.onUpdateSubscriptionsMutex.Lock() + fake.onUpdateSubscriptionsArgsForCall = append(fake.onUpdateSubscriptionsArgsForCall, struct { + arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) + }{arg1}) + stub := fake.OnUpdateSubscriptionsStub + fake.recordInvocation("OnUpdateSubscriptions", []interface{}{arg1}) + fake.onUpdateSubscriptionsMutex.Unlock() + if stub != nil { + fake.OnUpdateSubscriptionsStub(arg1) + } +} + +func (fake *FakeLocalParticipant) OnUpdateSubscriptionsCallCount() int { + fake.onUpdateSubscriptionsMutex.RLock() + defer fake.onUpdateSubscriptionsMutex.RUnlock() + return len(fake.onUpdateSubscriptionsArgsForCall) +} + +func (fake *FakeLocalParticipant) OnUpdateSubscriptionsCalls(stub func(func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool))) { + fake.onUpdateSubscriptionsMutex.Lock() + defer fake.onUpdateSubscriptionsMutex.Unlock() + fake.OnUpdateSubscriptionsStub = stub +} + +func (fake *FakeLocalParticipant) OnUpdateSubscriptionsArgsForCall(i int) func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) { + fake.onUpdateSubscriptionsMutex.RLock() + defer fake.onUpdateSubscriptionsMutex.RUnlock() + argsForCall := fake.onUpdateSubscriptionsArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion { fake.protocolVersionMutex.Lock() ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)] @@ -5939,6 +6513,67 @@ func (fake *FakeLocalParticipant) RemoveTrackLocalReturnsOnCall(i int, result1 e }{result1} } +func (fake *FakeLocalParticipant) SendConnectResponse(arg1 *livekit.ConnectResponse) error { + fake.sendConnectResponseMutex.Lock() + ret, specificReturn := fake.sendConnectResponseReturnsOnCall[len(fake.sendConnectResponseArgsForCall)] + fake.sendConnectResponseArgsForCall = append(fake.sendConnectResponseArgsForCall, struct { + arg1 *livekit.ConnectResponse + }{arg1}) + stub := fake.SendConnectResponseStub + fakeReturns := fake.sendConnectResponseReturns + fake.recordInvocation("SendConnectResponse", []interface{}{arg1}) + fake.sendConnectResponseMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) SendConnectResponseCallCount() int { + fake.sendConnectResponseMutex.RLock() + defer fake.sendConnectResponseMutex.RUnlock() + return len(fake.sendConnectResponseArgsForCall) +} + +func (fake *FakeLocalParticipant) SendConnectResponseCalls(stub func(*livekit.ConnectResponse) error) { + fake.sendConnectResponseMutex.Lock() + defer fake.sendConnectResponseMutex.Unlock() + fake.SendConnectResponseStub = stub +} + +func (fake *FakeLocalParticipant) SendConnectResponseArgsForCall(i int) *livekit.ConnectResponse { + fake.sendConnectResponseMutex.RLock() + defer fake.sendConnectResponseMutex.RUnlock() + argsForCall := fake.sendConnectResponseArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) SendConnectResponseReturns(result1 error) { + fake.sendConnectResponseMutex.Lock() + defer fake.sendConnectResponseMutex.Unlock() + fake.SendConnectResponseStub = nil + fake.sendConnectResponseReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeLocalParticipant) SendConnectResponseReturnsOnCall(i int, result1 error) { + fake.sendConnectResponseMutex.Lock() + defer fake.sendConnectResponseMutex.Unlock() + fake.SendConnectResponseStub = nil + if fake.sendConnectResponseReturnsOnCall == nil { + fake.sendConnectResponseReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendConnectResponseReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeLocalParticipant) SendConnectionQualityUpdate(arg1 *livekit.ConnectionQualityUpdate) error { fake.sendConnectionQualityUpdateMutex.Lock() ret, specificReturn := fake.sendConnectionQualityUpdateReturnsOnCall[len(fake.sendConnectionQualityUpdateArgsForCall)] @@ -7101,6 +7736,59 @@ func (fake *FakeLocalParticipant) SetTrackMutedReturnsOnCall(i int, result1 *liv }{result1} } +func (fake *FakeLocalParticipant) SignalPendingMessages() proto.Message { + fake.signalPendingMessagesMutex.Lock() + ret, specificReturn := fake.signalPendingMessagesReturnsOnCall[len(fake.signalPendingMessagesArgsForCall)] + fake.signalPendingMessagesArgsForCall = append(fake.signalPendingMessagesArgsForCall, struct { + }{}) + stub := fake.SignalPendingMessagesStub + fakeReturns := fake.signalPendingMessagesReturns + fake.recordInvocation("SignalPendingMessages", []interface{}{}) + fake.signalPendingMessagesMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) SignalPendingMessagesCallCount() int { + fake.signalPendingMessagesMutex.RLock() + defer fake.signalPendingMessagesMutex.RUnlock() + return len(fake.signalPendingMessagesArgsForCall) +} + +func (fake *FakeLocalParticipant) SignalPendingMessagesCalls(stub func() proto.Message) { + fake.signalPendingMessagesMutex.Lock() + defer fake.signalPendingMessagesMutex.Unlock() + fake.SignalPendingMessagesStub = stub +} + +func (fake *FakeLocalParticipant) SignalPendingMessagesReturns(result1 proto.Message) { + fake.signalPendingMessagesMutex.Lock() + defer fake.signalPendingMessagesMutex.Unlock() + fake.SignalPendingMessagesStub = nil + fake.signalPendingMessagesReturns = struct { + result1 proto.Message + }{result1} +} + +func (fake *FakeLocalParticipant) SignalPendingMessagesReturnsOnCall(i int, result1 proto.Message) { + fake.signalPendingMessagesMutex.Lock() + defer fake.signalPendingMessagesMutex.Unlock() + fake.SignalPendingMessagesStub = nil + if fake.signalPendingMessagesReturnsOnCall == nil { + fake.signalPendingMessagesReturnsOnCall = make(map[int]struct { + result1 proto.Message + }) + } + fake.signalPendingMessagesReturnsOnCall[i] = struct { + result1 proto.Message + }{result1} +} + func (fake *FakeLocalParticipant) State() livekit.ParticipantInfo_State { fake.stateMutex.Lock() ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)] @@ -8524,6 +9212,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.getReporterMutex.RUnlock() fake.getReporterResolverMutex.RLock() defer fake.getReporterResolverMutex.RUnlock() + fake.getResponseSinkMutex.RLock() + defer fake.getResponseSinkMutex.RUnlock() fake.getSubscribedParticipantsMutex.RLock() defer fake.getSubscribedParticipantsMutex.RUnlock() fake.getSubscribedTracksMutex.RLock() @@ -8536,6 +9226,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.handleICERestartSDPFragmentMutex.RUnlock() fake.handleICETrickleSDPFragmentMutex.RLock() defer fake.handleICETrickleSDPFragmentMutex.RUnlock() + fake.handleLeaveRequestMutex.RLock() + defer fake.handleLeaveRequestMutex.RUnlock() fake.handleMetricsMutex.RLock() defer fake.handleMetricsMutex.RUnlock() fake.handleOfferMutex.RLock() @@ -8546,6 +9238,14 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.handleReconnectAndSendResponseMutex.RUnlock() fake.handleSignalSourceCloseMutex.RLock() defer fake.handleSignalSourceCloseMutex.RUnlock() + fake.handleSimulateScenarioMutex.RLock() + defer fake.handleSimulateScenarioMutex.RUnlock() + fake.handleSyncStateMutex.RLock() + defer fake.handleSyncStateMutex.RUnlock() + fake.handleUpdateSubscriptionPermissionMutex.RLock() + defer fake.handleUpdateSubscriptionPermissionMutex.RUnlock() + fake.handleUpdateSubscriptionsMutex.RLock() + defer fake.handleUpdateSubscriptionsMutex.RUnlock() fake.hasConnectedMutex.RLock() defer fake.hasConnectedMutex.RUnlock() fake.hasPermissionMutex.RLock() @@ -8604,30 +9304,42 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.onDataPacketMutex.RUnlock() fake.onICEConfigChangedMutex.RLock() defer fake.onICEConfigChangedMutex.RUnlock() + fake.onLeaveMutex.RLock() + defer fake.onLeaveMutex.RUnlock() fake.onMetricsMutex.RLock() defer fake.onMetricsMutex.RUnlock() fake.onMigrateStateChangeMutex.RLock() defer fake.onMigrateStateChangeMutex.RUnlock() fake.onParticipantUpdateMutex.RLock() defer fake.onParticipantUpdateMutex.RUnlock() + fake.onSimulateScenarioMutex.RLock() + defer fake.onSimulateScenarioMutex.RUnlock() fake.onStateChangeMutex.RLock() defer fake.onStateChangeMutex.RUnlock() fake.onSubscribeStatusChangedMutex.RLock() defer fake.onSubscribeStatusChangedMutex.RUnlock() fake.onSubscriberReadyMutex.RLock() defer fake.onSubscriberReadyMutex.RUnlock() + fake.onSyncStateMutex.RLock() + defer fake.onSyncStateMutex.RUnlock() fake.onTrackPublishedMutex.RLock() defer fake.onTrackPublishedMutex.RUnlock() fake.onTrackUnpublishedMutex.RLock() defer fake.onTrackUnpublishedMutex.RUnlock() fake.onTrackUpdatedMutex.RLock() defer fake.onTrackUpdatedMutex.RUnlock() + fake.onUpdateSubscriptionPermissionMutex.RLock() + defer fake.onUpdateSubscriptionPermissionMutex.RUnlock() + fake.onUpdateSubscriptionsMutex.RLock() + defer fake.onUpdateSubscriptionsMutex.RUnlock() fake.protocolVersionMutex.RLock() defer fake.protocolVersionMutex.RUnlock() fake.removePublishedTrackMutex.RLock() defer fake.removePublishedTrackMutex.RUnlock() fake.removeTrackLocalMutex.RLock() defer fake.removeTrackLocalMutex.RUnlock() + fake.sendConnectResponseMutex.RLock() + defer fake.sendConnectResponseMutex.RUnlock() fake.sendConnectionQualityUpdateMutex.RLock() defer fake.sendConnectionQualityUpdateMutex.RUnlock() fake.sendDataMessageMutex.RLock() @@ -8674,6 +9386,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.setSubscriberChannelCapacityMutex.RUnlock() fake.setTrackMutedMutex.RLock() defer fake.setTrackMutedMutex.RUnlock() + fake.signalPendingMessagesMutex.RLock() + defer fake.signalPendingMessagesMutex.RUnlock() fake.stateMutex.RLock() defer fake.stateMutex.RUnlock() fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_room.go b/pkg/rtc/types/typesfakes/fake_room.go index 70fbc3245..e4ae5e3df 100644 --- a/pkg/rtc/types/typesfakes/fake_room.go +++ b/pkg/rtc/types/typesfakes/fake_room.go @@ -69,42 +69,6 @@ type FakeRoom struct { resolveMediaTrackForSubscriberReturnsOnCall map[int]struct { result1 types.MediaResolverResult } - SimulateScenarioStub func(types.LocalParticipant, *livekit.SimulateScenario) error - simulateScenarioMutex sync.RWMutex - simulateScenarioArgsForCall []struct { - arg1 types.LocalParticipant - arg2 *livekit.SimulateScenario - } - simulateScenarioReturns struct { - result1 error - } - simulateScenarioReturnsOnCall map[int]struct { - result1 error - } - SyncStateStub func(types.LocalParticipant, *livekit.SyncState) error - syncStateMutex sync.RWMutex - syncStateArgsForCall []struct { - arg1 types.LocalParticipant - arg2 *livekit.SyncState - } - syncStateReturns struct { - result1 error - } - syncStateReturnsOnCall map[int]struct { - result1 error - } - UpdateSubscriptionPermissionStub func(types.LocalParticipant, *livekit.SubscriptionPermission) error - updateSubscriptionPermissionMutex sync.RWMutex - updateSubscriptionPermissionArgsForCall []struct { - arg1 types.LocalParticipant - arg2 *livekit.SubscriptionPermission - } - updateSubscriptionPermissionReturns struct { - result1 error - } - updateSubscriptionPermissionReturnsOnCall map[int]struct { - result1 error - } UpdateSubscriptionsStub func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) updateSubscriptionsMutex sync.RWMutex updateSubscriptionsArgsForCall []struct { @@ -433,192 +397,6 @@ func (fake *FakeRoom) ResolveMediaTrackForSubscriberReturnsOnCall(i int, result1 }{result1} } -func (fake *FakeRoom) SimulateScenario(arg1 types.LocalParticipant, arg2 *livekit.SimulateScenario) error { - fake.simulateScenarioMutex.Lock() - ret, specificReturn := fake.simulateScenarioReturnsOnCall[len(fake.simulateScenarioArgsForCall)] - fake.simulateScenarioArgsForCall = append(fake.simulateScenarioArgsForCall, struct { - arg1 types.LocalParticipant - arg2 *livekit.SimulateScenario - }{arg1, arg2}) - stub := fake.SimulateScenarioStub - fakeReturns := fake.simulateScenarioReturns - fake.recordInvocation("SimulateScenario", []interface{}{arg1, arg2}) - fake.simulateScenarioMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRoom) SimulateScenarioCallCount() int { - fake.simulateScenarioMutex.RLock() - defer fake.simulateScenarioMutex.RUnlock() - return len(fake.simulateScenarioArgsForCall) -} - -func (fake *FakeRoom) SimulateScenarioCalls(stub func(types.LocalParticipant, *livekit.SimulateScenario) error) { - fake.simulateScenarioMutex.Lock() - defer fake.simulateScenarioMutex.Unlock() - fake.SimulateScenarioStub = stub -} - -func (fake *FakeRoom) SimulateScenarioArgsForCall(i int) (types.LocalParticipant, *livekit.SimulateScenario) { - fake.simulateScenarioMutex.RLock() - defer fake.simulateScenarioMutex.RUnlock() - argsForCall := fake.simulateScenarioArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeRoom) SimulateScenarioReturns(result1 error) { - fake.simulateScenarioMutex.Lock() - defer fake.simulateScenarioMutex.Unlock() - fake.SimulateScenarioStub = nil - fake.simulateScenarioReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeRoom) SimulateScenarioReturnsOnCall(i int, result1 error) { - fake.simulateScenarioMutex.Lock() - defer fake.simulateScenarioMutex.Unlock() - fake.SimulateScenarioStub = nil - if fake.simulateScenarioReturnsOnCall == nil { - fake.simulateScenarioReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.simulateScenarioReturnsOnCall[i] = struct { - result1 error - }{result1} -} - -func (fake *FakeRoom) SyncState(arg1 types.LocalParticipant, arg2 *livekit.SyncState) error { - fake.syncStateMutex.Lock() - ret, specificReturn := fake.syncStateReturnsOnCall[len(fake.syncStateArgsForCall)] - fake.syncStateArgsForCall = append(fake.syncStateArgsForCall, struct { - arg1 types.LocalParticipant - arg2 *livekit.SyncState - }{arg1, arg2}) - stub := fake.SyncStateStub - fakeReturns := fake.syncStateReturns - fake.recordInvocation("SyncState", []interface{}{arg1, arg2}) - fake.syncStateMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRoom) SyncStateCallCount() int { - fake.syncStateMutex.RLock() - defer fake.syncStateMutex.RUnlock() - return len(fake.syncStateArgsForCall) -} - -func (fake *FakeRoom) SyncStateCalls(stub func(types.LocalParticipant, *livekit.SyncState) error) { - fake.syncStateMutex.Lock() - defer fake.syncStateMutex.Unlock() - fake.SyncStateStub = stub -} - -func (fake *FakeRoom) SyncStateArgsForCall(i int) (types.LocalParticipant, *livekit.SyncState) { - fake.syncStateMutex.RLock() - defer fake.syncStateMutex.RUnlock() - argsForCall := fake.syncStateArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeRoom) SyncStateReturns(result1 error) { - fake.syncStateMutex.Lock() - defer fake.syncStateMutex.Unlock() - fake.SyncStateStub = nil - fake.syncStateReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeRoom) SyncStateReturnsOnCall(i int, result1 error) { - fake.syncStateMutex.Lock() - defer fake.syncStateMutex.Unlock() - fake.SyncStateStub = nil - if fake.syncStateReturnsOnCall == nil { - fake.syncStateReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.syncStateReturnsOnCall[i] = struct { - result1 error - }{result1} -} - -func (fake *FakeRoom) UpdateSubscriptionPermission(arg1 types.LocalParticipant, arg2 *livekit.SubscriptionPermission) error { - fake.updateSubscriptionPermissionMutex.Lock() - ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)] - fake.updateSubscriptionPermissionArgsForCall = append(fake.updateSubscriptionPermissionArgsForCall, struct { - arg1 types.LocalParticipant - arg2 *livekit.SubscriptionPermission - }{arg1, arg2}) - stub := fake.UpdateSubscriptionPermissionStub - fakeReturns := fake.updateSubscriptionPermissionReturns - fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2}) - fake.updateSubscriptionPermissionMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRoom) UpdateSubscriptionPermissionCallCount() int { - fake.updateSubscriptionPermissionMutex.RLock() - defer fake.updateSubscriptionPermissionMutex.RUnlock() - return len(fake.updateSubscriptionPermissionArgsForCall) -} - -func (fake *FakeRoom) UpdateSubscriptionPermissionCalls(stub func(types.LocalParticipant, *livekit.SubscriptionPermission) error) { - fake.updateSubscriptionPermissionMutex.Lock() - defer fake.updateSubscriptionPermissionMutex.Unlock() - fake.UpdateSubscriptionPermissionStub = stub -} - -func (fake *FakeRoom) UpdateSubscriptionPermissionArgsForCall(i int) (types.LocalParticipant, *livekit.SubscriptionPermission) { - fake.updateSubscriptionPermissionMutex.RLock() - defer fake.updateSubscriptionPermissionMutex.RUnlock() - argsForCall := fake.updateSubscriptionPermissionArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeRoom) UpdateSubscriptionPermissionReturns(result1 error) { - fake.updateSubscriptionPermissionMutex.Lock() - defer fake.updateSubscriptionPermissionMutex.Unlock() - fake.UpdateSubscriptionPermissionStub = nil - fake.updateSubscriptionPermissionReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeRoom) UpdateSubscriptionPermissionReturnsOnCall(i int, result1 error) { - fake.updateSubscriptionPermissionMutex.Lock() - defer fake.updateSubscriptionPermissionMutex.Unlock() - fake.UpdateSubscriptionPermissionStub = nil - if fake.updateSubscriptionPermissionReturnsOnCall == nil { - fake.updateSubscriptionPermissionReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.updateSubscriptionPermissionReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeRoom) UpdateSubscriptions(arg1 types.LocalParticipant, arg2 []livekit.TrackID, arg3 []*livekit.ParticipantTracks, arg4 bool) { var arg2Copy []livekit.TrackID if arg2 != nil { @@ -679,12 +457,6 @@ func (fake *FakeRoom) Invocations() map[string][][]interface{} { defer fake.removeParticipantMutex.RUnlock() fake.resolveMediaTrackForSubscriberMutex.RLock() defer fake.resolveMediaTrackForSubscriberMutex.RUnlock() - fake.simulateScenarioMutex.RLock() - defer fake.simulateScenarioMutex.RUnlock() - fake.syncStateMutex.RLock() - defer fake.syncStateMutex.RUnlock() - fake.updateSubscriptionPermissionMutex.RLock() - defer fake.updateSubscriptionPermissionMutex.RUnlock() fake.updateSubscriptionsMutex.RLock() defer fake.updateSubscriptionsMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index b847094d6..abf31a375 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -863,12 +863,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, createRoom *livekit.C // manages an RTC session for a participant, runs on the RTC node func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalParticipant, requestSource routing.MessageSource) { - pLogger := rtc.LoggerWithParticipant( - rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), - participant.Identity(), - participant.ID(), - false, - ) + pLogger := participant.GetLogger() defer func() { pLogger.Debugw("RTC session finishing", "connID", requestSource.ConnectionID()) requestSource.Close() @@ -888,11 +883,13 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa select { case <-participant.Disconnected(): return + case <-tokenTicker.C: // refresh token with the first API Key/secret pair if err := r.refreshToken(participant); err != nil { pLogger.Errorw("could not refresh token", err, "connID", requestSource.ConnectionID()) } + case obj := <-requestSource.ReadChan(): if obj == nil { if room.GetParticipantRequestSource(participant.Identity()) == requestSource { @@ -902,7 +899,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa } req := obj.(*livekit.SignalRequest) - if err := rtc.HandleParticipantSignal(room, participant, req, pLogger); err != nil { + if err := rtc.HandleParticipantSignal(participant, req); err != nil { // more specific errors are already logged // treat errors returned as fatal return diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 15d27d803..7675e1cb6 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -371,12 +371,15 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch m := res.Message.(type) { case *livekit.SignalResponse_Offer: pLogger.Debugw("sending offer", "offer", m) + case *livekit.SignalResponse_Answer: pLogger.Debugw("sending answer", "answer", m) + case *livekit.SignalResponse_Join: pLogger.Debugw("sending join", "join", m) signalStats.ResolveRoom(m.Join.GetRoom()) signalStats.ResolveParticipant(m.Join.GetParticipant()) + case *livekit.SignalResponse_RoomUpdate: updateRoomID := livekit.RoomID(m.RoomUpdate.GetRoom().GetSid()) if updateRoomID != "" { @@ -385,11 +388,14 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } pLogger.Debugw("sending room update", "roomUpdate", m) signalStats.ResolveRoom(m.RoomUpdate.GetRoom()) + case *livekit.SignalResponse_Update: pLogger.Debugw("sending participant update", "participantUpdate", m) + case *livekit.SignalResponse_RoomMoved: resetLogger() signalStats.Reset() + roomName = livekit.RoomName(m.RoomMoved.GetRoom().GetName()) moveRoomID := livekit.RoomID(m.RoomMoved.GetRoom().GetSid()) if moveRoomID != "" { @@ -398,6 +404,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { participantIdentity = livekit.ParticipantIdentity(m.RoomMoved.GetParticipant().GetIdentity()) pID = livekit.ParticipantID(m.RoomMoved.GetParticipant().GetSid()) resolveLogger(false) + signalStats.ResolveRoom(m.RoomMoved.GetRoom()) signalStats.ResolveParticipant(m.RoomMoved.GetParticipant()) pLogger.Debugw("sending room moved", "roomMoved", m)