starting signaller interface (#3802)

* starting signaller interface

* WIP

* WIP

* WIP

* typo

* connect response check

* WIP

* clean up

* move signal handling to participant fully

* service methods

* type assertions for interfaces
This commit is contained in:
Raja Subramanian
2025-07-20 13:48:40 +05:30
committed by GitHub
parent 18ce524455
commit 7837c8e595
24 changed files with 2376 additions and 689 deletions
+127 -10
View File
@@ -47,6 +47,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/metric"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/signalling"
"github.com/livekit/livekit-server/pkg/rtc/supervisor"
"github.com/livekit/livekit-server/pkg/rtc/transport"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -280,16 +281,21 @@ type ParticipantImpl struct {
version atomic.Uint32
// callbacks & handlers
onTrackPublished func(types.LocalParticipant, types.MediaTrack)
onTrackUpdated func(types.LocalParticipant, types.MediaTrack)
onTrackUnpublished func(types.LocalParticipant, types.MediaTrack)
onStateChange func(p types.LocalParticipant)
onSubscriberReady func(p types.LocalParticipant)
onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState)
onParticipantUpdate func(types.LocalParticipant)
onDataPacket func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
onDataMessage func(types.LocalParticipant, []byte)
onMetrics func(types.Participant, *livekit.DataPacket)
onTrackPublished func(types.LocalParticipant, types.MediaTrack)
onTrackUpdated func(types.LocalParticipant, types.MediaTrack)
onTrackUnpublished func(types.LocalParticipant, types.MediaTrack)
onStateChange func(p types.LocalParticipant)
onSubscriberReady func(p types.LocalParticipant)
onMigrateStateChange func(p types.LocalParticipant, migrateState types.MigrateState)
onParticipantUpdate func(types.LocalParticipant)
onDataPacket func(types.LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
onDataMessage func(types.LocalParticipant, []byte)
onMetrics func(types.Participant, *livekit.DataPacket)
onUpdateSubscriptions func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)
onUpdateSubscriptionPermission func(types.LocalParticipant, *livekit.SubscriptionPermission) error
onSyncState func(types.LocalParticipant, *livekit.SyncState) error
onSimulateScenario func(types.LocalParticipant, *livekit.SimulateScenario) error
onLeave func(types.LocalParticipant, types.ParticipantCloseReason)
migrateState atomic.Value // types.MigrateState
migratedTracksPublishedFuse core.Fuse
@@ -309,6 +315,9 @@ type ParticipantImpl struct {
metricsCollector *metric.MetricsCollector
metricsReporter *metric.MetricsReporter
signalling signalling.ParticipantSignalling
signaller signalling.ParticipantSignaller
// loggers for publisher and subscriber
pubLogger logger.Logger
subLogger logger.Logger
@@ -345,6 +354,14 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
joiningMessageLastWrittenSeqs: make(map[livekit.ParticipantID]uint32),
},
}
p.signalling = signalling.NewSignalling(signalling.SignallingParams{
Logger: params.Logger,
})
p.signaller = signalling.NewSignallerAsync(signalling.SignallerAsyncParams{
Logger: params.Logger,
Participant: p,
})
p.id.Store(params.SID)
p.dataChannelStats = telemetry.NewBytesTrackStats(
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, p.ID()),
@@ -908,6 +925,66 @@ func (p *ParticipantImpl) getOnMetrics() func(types.Participant, *livekit.DataPa
return p.onMetrics
}
func (p *ParticipantImpl) OnUpdateSubscriptions(callback func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)) {
p.lock.Lock()
p.onUpdateSubscriptions = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnUpdateSubscriptions() func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onUpdateSubscriptions
}
func (p *ParticipantImpl) OnUpdateSubscriptionPermission(callback func(types.LocalParticipant, *livekit.SubscriptionPermission) error) {
p.lock.Lock()
p.onUpdateSubscriptionPermission = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnUpdateSubscriptionPermission() func(types.LocalParticipant, *livekit.SubscriptionPermission) error {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onUpdateSubscriptionPermission
}
func (p *ParticipantImpl) OnSyncState(callback func(types.LocalParticipant, *livekit.SyncState) error) {
p.lock.Lock()
p.onSyncState = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnSyncState() func(types.LocalParticipant, *livekit.SyncState) error {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onSyncState
}
func (p *ParticipantImpl) OnSimulateScenario(callback func(types.LocalParticipant, *livekit.SimulateScenario) error) {
p.lock.Lock()
p.onSimulateScenario = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnSimulateScenario() func(types.LocalParticipant, *livekit.SimulateScenario) error {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onSimulateScenario
}
func (p *ParticipantImpl) OnLeave(callback func(types.LocalParticipant, types.ParticipantCloseReason)) {
p.lock.Lock()
p.onLeave = callback
p.lock.Unlock()
}
func (p *ParticipantImpl) getOnLeave() func(types.LocalParticipant, types.ParticipantCloseReason) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.onLeave
}
func (p *ParticipantImpl) OnClose(callback func(types.LocalParticipant)) {
if p.isClosed.Load() {
go callback(p)
@@ -3625,3 +3702,43 @@ func (p *ParticipantImpl) GetLastReliableSequence(migrateOut bool) uint32 {
}
return p.reliableDataInfo.lastPubReliableSeq.Load()
}
func (p *ParticipantImpl) HandleUpdateSubscriptions(
trackIDs []livekit.TrackID,
participantTracks []*livekit.ParticipantTracks,
subscribe bool,
) {
if onUpdateSubscriptions := p.getOnUpdateSubscriptions(); onUpdateSubscriptions != nil {
onUpdateSubscriptions(p, trackIDs, participantTracks, subscribe)
}
}
func (p *ParticipantImpl) HandleUpdateSubscriptionPermission(subscriptionPermission *livekit.SubscriptionPermission) error {
if onUpdateSubscriptionPermission := p.getOnUpdateSubscriptionPermission(); onUpdateSubscriptionPermission != nil {
return onUpdateSubscriptionPermission(p, subscriptionPermission)
}
return errors.New("no handler")
}
func (p *ParticipantImpl) HandleSyncState(syncState *livekit.SyncState) error {
if onSyncState := p.getOnSyncState(); onSyncState != nil {
return onSyncState(p, syncState)
}
return errors.New("no handler")
}
func (p *ParticipantImpl) HandleSimulateScenario(simulateScenario *livekit.SimulateScenario) error {
if onSimulateScenario := p.getOnSimulateScenario(); onSimulateScenario != nil {
return onSimulateScenario(p, simulateScenario)
}
return errors.New("no handler")
}
func (p *ParticipantImpl) HandleLeaveRequest(reason types.ParticipantCloseReason) {
if onLeave := p.getOnLeave(); onLeave != nil {
onLeave(p, reason)
}
}
+1 -1
View File
@@ -227,7 +227,7 @@ func TestOutOfOrderUpdates(t *testing.T) {
p := newParticipantForTest("test")
p.updateState(livekit.ParticipantInfo_JOINED)
p.SetMetadata("initial metadata")
sink := p.getResponseSink().(*routingfakes.FakeMessageSink)
sink := p.GetResponseSink().(*routingfakes.FakeMessageSink)
pi1 := p.ToProto()
p.SetMetadata("second update")
pi2 := p.ToProto()
+87 -205
View File
@@ -15,30 +15,29 @@
package rtc
import (
"fmt"
"time"
"github.com/pion/webrtc/v4"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"google.golang.org/protobuf/proto"
)
func (p *ParticipantImpl) getResponseSink() routing.MessageSink {
p.resSinkMu.Lock()
defer p.resSinkMu.Unlock()
return p.resSink
func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) {
p.signaller.SetResponseSink(sink)
}
func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) {
p.resSinkMu.Lock()
defer p.resSinkMu.Unlock()
p.resSink = sink
func (p *ParticipantImpl) GetResponseSink() routing.MessageSink {
return p.signaller.GetResponseSink()
}
func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) {
p.signaller.CloseSignalConnection(reason)
}
func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error {
@@ -55,11 +54,7 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e
p.updateLock.Unlock()
// send Join response
err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Join{
Join: joinResponse,
},
})
err := p.signaller.WriteMessage(p.signalling.SignalJoinResponse(joinResponse))
if err != nil {
return err
}
@@ -127,17 +122,7 @@ func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.
}
p.updateLock.Unlock()
if len(validUpdates) == 0 {
return nil
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Update{
Update: &livekit.ParticipantUpdate{
Participants: validUpdates,
},
},
})
return p.signaller.WriteMessage(p.signalling.SignalParticipantUpdate(validUpdates))
}
// SendSpeakerUpdate notifies participant changes to speakers. only send members that have changed since last update
@@ -158,43 +143,19 @@ func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, for
}
}
if len(scopedSpeakers) == 0 {
return nil
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_SpeakersChanged{
SpeakersChanged: &livekit.SpeakersChanged{
Speakers: scopedSpeakers,
},
},
})
return p.signaller.WriteMessage(p.signalling.SignalSpeakerUpdate(scopedSpeakers))
}
func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_RoomUpdate{
RoomUpdate: &livekit.RoomUpdate{
Room: room,
},
},
})
return p.signaller.WriteMessage(p.signalling.SignalRoomUpdate(room))
}
func (p *ParticipantImpl) SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_ConnectionQuality{
ConnectionQuality: update,
},
})
return p.signaller.WriteMessage(p.signalling.SignalConnectionQualityUpdate(update))
}
func (p *ParticipantImpl) SendRefreshToken(token string) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_RefreshToken{
RefreshToken: token,
},
})
return p.signaller.WriteMessage(p.signalling.SignalRefreshToken(token))
}
func (p *ParticipantImpl) SendRequestResponse(requestResponse *livekit.RequestResponse) error {
@@ -206,19 +167,11 @@ func (p *ParticipantImpl) SendRequestResponse(requestResponse *livekit.RequestRe
return nil
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_RequestResponse{
RequestResponse: requestResponse,
},
})
return p.signaller.WriteMessage(p.signalling.SignalRequestResponse(requestResponse))
}
func (p *ParticipantImpl) SendRoomMovedResponse(roomMovedResponse *livekit.RoomMovedResponse) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_RoomMoved{
RoomMoved: roomMovedResponse,
},
})
return p.signaller.WriteMessage(p.signalling.SignalRoomMovedResponse(roomMovedResponse))
}
func (p *ParticipantImpl) HandleReconnectAndSendResponse(reconnectReason livekit.ReconnectReason, reconnectResponse *livekit.ReconnectResponse) error {
@@ -227,11 +180,7 @@ func (p *ParticipantImpl) HandleReconnectAndSendResponse(reconnectReason livekit
if !p.params.ClientInfo.CanHandleReconnectResponse() {
return nil
}
if err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Reconnect{
Reconnect: reconnectResponse,
},
}); err != nil {
if err := p.signaller.WriteMessage(p.signalling.SignalReconnectResponse(reconnectResponse)); err != nil {
return err
}
@@ -263,17 +212,7 @@ func (p *ParticipantImpl) sendDisconnectUpdatesForReconnect() error {
}
p.updateLock.Unlock()
if len(disconnectedParticipants) == 0 {
return nil
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Update{
Update: &livekit.ParticipantUpdate{
Participants: disconnectedParticipants,
},
},
})
return p.signaller.WriteMessage(p.signalling.SignalParticipantUpdate(disconnectedParticipants))
}
func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error {
@@ -285,87 +224,40 @@ func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livek
trickle := ToProtoTrickle(prevIC.ToJSON(), target, ic == nil)
p.params.Logger.Debugw("sending ICE candidate", "transport", target, "trickle", logger.Proto(trickle))
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Trickle{
Trickle: trickle,
},
})
return p.signaller.WriteMessage(p.signalling.SignalICECandidate(trickle))
}
func (p *ParticipantImpl) sendTrackMuted(trackID livekit.TrackID, muted bool) {
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Mute{
Mute: &livekit.MuteTrackRequest{
Sid: string(trackID),
Muted: muted,
},
},
})
_ = p.signaller.WriteMessage(p.signalling.SignalTrackMuted(&livekit.MuteTrackRequest{
Sid: string(trackID),
Muted: muted,
}))
}
func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) error {
p.pubLogger.Debugw("sending track published", "cid", cid, "trackInfo", logger.Proto(ti))
return p.signaller.WriteMessage(p.signalling.SignalTrackPublished(&livekit.TrackPublishedResponse{
Cid: cid,
Track: ti,
}))
}
func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) {
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_TrackUnpublished{
TrackUnpublished: &livekit.TrackUnpublishedResponse{
TrackSid: string(trackID),
},
},
})
_ = p.signaller.WriteMessage(p.signalling.SignalTrackUnpublished(&livekit.TrackUnpublishedResponse{
TrackSid: string(trackID),
}))
}
func (p *ParticipantImpl) sendTrackHasBeenSubscribed(trackID livekit.TrackID) {
if !p.params.ClientInfo.SupportTrackSubscribedEvent() {
return
}
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_TrackSubscribed{
TrackSubscribed: &livekit.TrackSubscribed{
TrackSid: string(trackID),
},
},
})
_ = p.signaller.WriteMessage(p.signalling.SignalTrackSubscribed(&livekit.TrackSubscribed{
TrackSid: string(trackID),
}))
p.params.Logger.Debugw("track has been subscribed", "trackID", trackID)
}
func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
if p.IsDisconnected() || (!p.IsReady() && msg.GetJoin() == nil) {
return nil
}
sink := p.getResponseSink()
if sink == nil {
p.params.Logger.Debugw("could not send message to participant", "messageType", fmt.Sprintf("%T", msg.Message))
return nil
}
err := sink.WriteMessage(msg)
if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) {
p.params.Logger.Debugw(
"could not send message to participant",
"error", err,
"messageType", fmt.Sprintf("%T", msg.Message),
)
return nil
} else if err != nil {
p.params.Logger.Warnw(
"could not send message to participant", err,
"messageType", fmt.Sprintf("%T", msg.Message),
)
return err
}
return nil
}
// closes signal connection to notify client to resume/reconnect
func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) {
sink := p.getResponseSink()
if sink != nil {
p.params.Logger.Debugw("closing signal connection", "reason", reason, "connID", sink.ConnectionID())
sink.Close()
p.SetResponseSink(nil)
}
}
func (p *ParticipantImpl) sendLeaveRequest(
reason types.ParticipantCloseReason,
isExpectedToResume bool,
@@ -397,85 +289,75 @@ func (p *ParticipantImpl) sendLeaveRequest(
}
}
}
if leave != nil {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: leave,
},
})
}
return nil
return p.signaller.WriteMessage(p.signalling.SignalLeaveRequest(leave))
}
func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer, answerId),
},
})
return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(ToProtoSessionDescription(answer, answerId)))
}
func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: ToProtoSessionDescription(offer, offerId),
},
})
return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(ToProtoSessionDescription(offer, offerId)))
}
func (p *ParticipantImpl) sendStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_StreamStateUpdate{
StreamStateUpdate: streamStateUpdate,
},
})
return p.signaller.WriteMessage(p.signalling.SignalStreamStateUpdate(streamStateUpdate))
}
func (p *ParticipantImpl) sendSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscribedQualityUpdate{
SubscribedQualityUpdate: subscribedQualityUpdate,
},
})
}
func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) error {
p.pubLogger.Debugw("sending track published", "cid", cid, "trackInfo", logger.Proto(ti))
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_TrackPublished{
TrackPublished: &livekit.TrackPublishedResponse{
Cid: cid,
Track: ti,
},
},
})
return p.signaller.WriteMessage(p.signalling.SignalSubscribedQualityUpdate(subscribedQualityUpdate))
}
func (p *ParticipantImpl) sendSubscriptionResponse(trackID livekit.TrackID, subErr livekit.SubscriptionError) error {
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscriptionResponse{
SubscriptionResponse: &livekit.SubscriptionResponse{
TrackSid: string(trackID),
Err: subErr,
},
},
})
return p.signaller.WriteMessage(p.signalling.SignalSubscriptionResponse(&livekit.SubscriptionResponse{
TrackSid: string(trackID),
Err: subErr,
}))
}
func (p *ParticipantImpl) SendSubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) error {
p.subLogger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed)
err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{
SubscriptionPermissionUpdate: &livekit.SubscriptionPermissionUpdate{
ParticipantSid: string(publisherID),
TrackSid: string(trackID),
Allowed: allowed,
},
},
})
err := p.signaller.WriteMessage(p.signalling.SignalSubscriptionPermissionUpdate(&livekit.SubscriptionPermissionUpdate{
ParticipantSid: string(publisherID),
TrackSid: string(trackID),
Allowed: allowed,
}))
if err != nil {
p.subLogger.Errorw("could not send subscription permission update", err)
}
return err
}
func (p *ParticipantImpl) SendConnectResponse(connectResponse *livekit.ConnectResponse) error {
// keep track of participant updates and versions
p.updateLock.Lock()
for _, op := range connectResponse.OtherParticipants {
p.updateCache.Add(livekit.ParticipantID(op.Sid), participantUpdateInfo{
identity: livekit.ParticipantIdentity(op.Identity),
version: op.Version,
state: op.State,
updatedAt: time.Now(),
})
}
p.updateLock.Unlock()
err := p.signaller.WriteMessage(p.signalling.SignalConnectResponse(connectResponse))
if err != nil {
return err
}
// update state after sending message, so that no participant updates could slip through before JoinResponse is sent
p.updateLock.Lock()
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
}
queuedUpdates := p.queuedUpdates
p.queuedUpdates = nil
p.updateLock.Unlock()
return p.SendParticipantUpdate(queuedUpdates)
}
func (p *ParticipantImpl) SignalPendingMessages() proto.Message {
return p.signalling.PendingMessages()
}
+174 -127
View File
@@ -552,6 +552,11 @@ func (r *Room) Join(
}, true)
}
})
participant.OnUpdateSubscriptions(r.onUpdateSubscriptions)
participant.OnUpdateSubscriptionPermission(r.onUpdateSubscriptionPermission)
participant.OnSyncState(r.onSyncState)
participant.OnSimulateScenario(r.onSimulateScenario)
participant.OnLeave(r.onLeave)
r.launchTargetAgents(maps.Values(r.agentDispatches), participant, livekit.JobType_JT_PARTICIPANT)
@@ -792,6 +797,27 @@ func (r *Room) Joinv2(
return nil, err
}
connectResponse.SubscriberSdp = ToProtoSessionDescription(offer, 0) // SIGNALLING-V2-TODO - need to proper offerId?
// for sync response, this does not actually send, only generates messageId and caches the message
if err := participant.SendConnectResponse(connectResponse); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
return nil, err
}
if wireMessage := participant.SignalPendingMessages(); wireMessage != nil {
if wireMessage, ok := wireMessage.(*livekit.Signalv2WireMessage); ok {
switch msg := wireMessage.GetMessage().(type) {
case *livekit.Signalv2WireMessage_Envelope:
got_connect_response:
for _, innerMsg := range msg.Envelope.GetServerMessages() {
switch serverMessage := innerMsg.GetMessage().(type) {
case *livekit.Signalv2ServerMessage_ConnectResponse:
connectResponse = serverMessage.ConnectResponse
break got_connect_response
}
}
}
}
}
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1)
return connectResponse, nil
@@ -878,129 +904,7 @@ func (r *Room) ResumeParticipant(
return nil
}
func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livekit.ParticipantID, reason types.ParticipantCloseReason) {
r.lock.Lock()
p, ok := r.participants[identity]
if !ok {
r.lock.Unlock()
return
}
if pID != "" && p.ID() != pID {
// participant session has been replaced
r.lock.Unlock()
return
}
agentJob := r.agentParticpants[identity]
delete(r.participants, identity)
delete(r.participantOpts, identity)
delete(r.participantRequestSources, identity)
delete(r.hasPublished, identity)
delete(r.agentParticpants, identity)
if !p.Hidden() {
r.protoRoom.NumParticipants--
}
immediateChange := false
if p.IsRecorder() {
activeRecording := false
for _, op := range r.participants {
if op.IsRecorder() {
activeRecording = true
break
}
}
if r.protoRoom.ActiveRecording != activeRecording {
r.protoRoom.ActiveRecording = activeRecording
immediateChange = true
}
}
r.lock.Unlock()
r.protoProxy.MarkDirty(immediateChange)
if !p.HasConnected() {
fields := append(
connectionDetailsFields(p.GetICEConnectionInfo()),
"reason", reason.String(),
"clientInfo", logger.Proto(sutils.ClientInfoWithoutAddress(p.GetClientInfo())),
)
p.GetLogger().Infow("removing participant without connection", fields...)
}
// send broadcast only if it's not already closed
sendUpdates := !p.IsDisconnected()
// remove all published tracks
for _, t := range p.GetPublishedTracks() {
p.RemovePublishedTrack(t, false)
r.trackManager.RemoveTrack(t)
}
if agentJob != nil {
agentJob.participantLeft()
go func() {
_, err := r.agentClient.TerminateJob(context.Background(), agentJob.Id, rpc.JobTerminateReason_AGENT_LEFT_ROOM)
if err != nil {
r.logger.Infow("failed sending TerminateJob RPC", "error", err, "jobID", agentJob.Id, "participant", identity)
}
}()
}
p.OnTrackUpdated(nil)
p.OnTrackPublished(nil)
p.OnTrackUnpublished(nil)
p.OnStateChange(nil)
p.OnSubscriberReady(nil)
p.OnParticipantUpdate(nil)
p.OnDataPacket(nil)
p.OnDataMessage(nil)
p.OnMetrics(nil)
p.OnSubscribeStatusChanged(nil)
// close participant as well
_ = p.Close(true, reason, false)
r.leftAt.Store(time.Now().Unix())
if sendUpdates {
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
}
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
}
}
func (r *Room) UpdateSubscriptions(
participant types.LocalParticipant,
trackIDs []livekit.TrackID,
participantTracks []*livekit.ParticipantTracks,
subscribe bool,
) {
// handle subscription changes
for _, trackID := range trackIDs {
if subscribe {
participant.SubscribeToTrack(trackID, false)
} else {
participant.UnsubscribeFromTrack(trackID)
}
}
for _, pt := range participantTracks {
for _, trackID := range livekit.StringsAsIDs[livekit.TrackID](pt.TrackSids) {
if subscribe {
participant.SubscribeToTrack(trackID, false)
} else {
participant.UnsubscribeFromTrack(trackID)
}
}
}
}
func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.SyncState) error {
func (r *Room) onSyncState(participant types.LocalParticipant, state *livekit.SyncState) error {
pLogger := participant.GetLogger()
pLogger.Infow("setting sync state", "state", logger.Proto(state))
@@ -1046,8 +950,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync
participant.UpdateSubscribedTrackSettings(livekit.TrackID(trackSid), &livekit.UpdateTrackSettings{Disabled: true})
}
r.UpdateSubscriptions(
participant,
participant.HandleUpdateSubscriptions(
livekit.StringsAsIDs[livekit.TrackID](state.Subscription.TrackSids),
state.Subscription.ParticipantTracks,
state.Subscription.Subscribe,
@@ -1055,7 +958,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync
return nil
}
func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error {
func (r *Room) onUpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error {
if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion(0), r.GetParticipantByID); err != nil {
return err
}
@@ -1285,7 +1188,7 @@ func (r *Room) OnRoomUpdated(f func()) {
r.onRoomUpdated = f
}
func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error {
func (r *Room) onSimulateScenario(participant types.LocalParticipant, simulateScenario *livekit.SimulateScenario) error {
switch scenario := simulateScenario.Scenario.(type) {
case *livekit.SimulateScenario_SpeakerUpdate:
r.logger.Infow("simulating speaker update", "participant", participant.Identity(), "duration", scenario.SpeakerUpdate)
@@ -1543,6 +1446,150 @@ func (r *Room) onMetrics(source types.Participant, dp *livekit.DataPacket) {
BroadcastMetricsForRoom(r, source, dp, r.logger)
}
func (r *Room) onUpdateSubscriptions(
participant types.LocalParticipant,
trackIDs []livekit.TrackID,
participantTracks []*livekit.ParticipantTracks,
subscribe bool,
) {
r.UpdateSubscriptions(participant, trackIDs, participantTracks, subscribe)
}
func (r *Room) UpdateSubscriptions(
participant types.LocalParticipant,
trackIDs []livekit.TrackID,
participantTracks []*livekit.ParticipantTracks,
subscribe bool,
) {
// handle subscription changes
for _, trackID := range trackIDs {
if subscribe {
participant.SubscribeToTrack(trackID, false)
} else {
participant.UnsubscribeFromTrack(trackID)
}
}
for _, pt := range participantTracks {
for _, trackID := range livekit.StringsAsIDs[livekit.TrackID](pt.TrackSids) {
if subscribe {
participant.SubscribeToTrack(trackID, false)
} else {
participant.UnsubscribeFromTrack(trackID)
}
}
}
}
func (r *Room) onLeave(p types.LocalParticipant, reason types.ParticipantCloseReason) {
r.RemoveParticipant(p.Identity(), p.ID(), reason)
}
func (r *Room) RemoveParticipant(
identity livekit.ParticipantIdentity,
pID livekit.ParticipantID,
reason types.ParticipantCloseReason,
) {
r.lock.Lock()
p, ok := r.participants[identity]
if !ok {
r.lock.Unlock()
return
}
if pID != "" && p.ID() != pID {
// participant session has been replaced
r.lock.Unlock()
return
}
agentJob := r.agentParticpants[identity]
delete(r.participants, identity)
delete(r.participantOpts, identity)
delete(r.participantRequestSources, identity)
delete(r.hasPublished, identity)
delete(r.agentParticpants, identity)
if !p.Hidden() {
r.protoRoom.NumParticipants--
}
immediateChange := false
if p.IsRecorder() {
activeRecording := false
for _, op := range r.participants {
if op.IsRecorder() {
activeRecording = true
break
}
}
if r.protoRoom.ActiveRecording != activeRecording {
r.protoRoom.ActiveRecording = activeRecording
immediateChange = true
}
}
r.lock.Unlock()
r.protoProxy.MarkDirty(immediateChange)
if !p.HasConnected() {
fields := append(
connectionDetailsFields(p.GetICEConnectionInfo()),
"reason", reason.String(),
"clientInfo", logger.Proto(sutils.ClientInfoWithoutAddress(p.GetClientInfo())),
)
p.GetLogger().Infow("removing participant without connection", fields...)
}
// send broadcast only if it's not already closed
sendUpdates := !p.IsDisconnected()
// remove all published tracks
for _, t := range p.GetPublishedTracks() {
p.RemovePublishedTrack(t, false)
r.trackManager.RemoveTrack(t)
}
if agentJob != nil {
agentJob.participantLeft()
go func() {
_, err := r.agentClient.TerminateJob(context.Background(), agentJob.Id, rpc.JobTerminateReason_AGENT_LEFT_ROOM)
if err != nil {
r.logger.Infow("failed sending TerminateJob RPC", "error", err, "jobID", agentJob.Id, "participant", identity)
}
}()
}
p.OnTrackUpdated(nil)
p.OnTrackPublished(nil)
p.OnTrackUnpublished(nil)
p.OnStateChange(nil)
p.OnSubscriberReady(nil)
p.OnParticipantUpdate(nil)
p.OnDataPacket(nil)
p.OnDataMessage(nil)
p.OnMetrics(nil)
p.OnSubscribeStatusChanged(nil)
p.OnUpdateSubscriptions(nil)
p.OnUpdateSubscriptionPermission(nil)
p.OnSyncState(nil)
p.OnSimulateScenario(nil)
p.OnLeave(nil)
// close participant as well
_ = p.Close(true, reason, false)
r.leftAt.Store(time.Now().Unix())
if sendUpdates {
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
}
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})
}
}
func (r *Room) subscribeToExistingTracks(p types.LocalParticipant, isSync bool) {
r.lock.RLock()
shouldSubscribe := r.autoSubscribe(p)
-75
View File
@@ -1,75 +0,0 @@
package rtc
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/livekit"
)
func TestSignalCache(t *testing.T) {
firstMessageId := uint32(10)
cache := NewSignalCache(SignalCacheParams{
FirstMessageId: firstMessageId,
})
inputMessages := []*livekit.Signalv2ServerMessage{
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
// SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
}
// Add() - add one message at a time
for _, inputMessage := range inputMessages {
cache.Add(inputMessage, 2345)
}
// get all messages in cache
outputMessages := cache.GetFromFront()
require.Equal(t, inputMessages, outputMessages)
// clear one and get again
cache.Clear(firstMessageId)
outputMessages = cache.GetFromFront()
require.Equal(t, inputMessages[1:], outputMessages)
// clearing some evicted messages should not clear anything
cache.Clear(firstMessageId) // firstMessageId has been cleared already at this point
outputMessages = cache.GetFromFront()
require.Equal(t, inputMessages[1:], outputMessages)
// clear some and get rest in one go
outputMessages = cache.ClearAndGetFrom(firstMessageId + 3)
require.Equal(t, 1, len(outputMessages))
require.Equal(t, inputMessages[3:], outputMessages)
// getting again should get the same messages again as they sill should in cache
outputMessages = cache.GetFromFront()
require.Equal(t, inputMessages[3:], outputMessages)
// clearing all and getting should return nil
require.Nil(t, cache.ClearAndGetFrom(firstMessageId+uint32(len(inputMessages))))
// getting again should return nil as the cache is fully cleared above
require.Nil(t, cache.GetFromFront())
// AddBatch() - add all messages at once
cache.AddBatch(inputMessages, 4567)
// get all messages in cache
outputMessages = cache.GetFromFront()
require.Equal(t, inputMessages, outputMessages)
}
+24 -20
View File
@@ -16,12 +16,11 @@ package rtc
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
func HandleParticipantSignal(room types.Room, participant types.LocalParticipant, req *livekit.SignalRequest, pLogger logger.Logger) error {
func HandleParticipantSignal(participant types.LocalParticipant, req *livekit.SignalRequest) error {
participant.UpdateLastSeenSignal()
switch msg := req.GetMessage().(type) {
@@ -34,13 +33,13 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
case *livekit.SignalRequest_Trickle:
candidateInit, err := FromProtoTrickle(msg.Trickle)
if err != nil {
pLogger.Warnw("could not decode trickle", err)
participant.GetLogger().Warnw("could not decode trickle", err)
return nil
}
participant.AddICECandidate(candidateInit, msg.Trickle.Target)
case *livekit.SignalRequest_AddTrack:
pLogger.Debugw("add track request", "trackID", msg.AddTrack.Cid)
participant.GetLogger().Debugw("add track request", "trackID", msg.AddTrack.Cid)
participant.AddTrack(msg.AddTrack)
case *livekit.SignalRequest_Mute:
@@ -49,8 +48,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
case *livekit.SignalRequest_Subscription:
// allow participant to indicate their interest in the subscription
// permission check happens later in SubscriptionManager
room.UpdateSubscriptions(
participant,
participant.HandleUpdateSubscriptions(
livekit.StringsAsIDs[livekit.TrackID](msg.Subscription.TrackSids),
msg.Subscription.ParticipantTracks,
msg.Subscription.Subscribe,
@@ -71,28 +69,34 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
case livekit.DisconnectReason_USER_REJECTED:
reason = types.ParticipantCloseReasonUserRejected
}
pLogger.Debugw("client leaving room", "reason", reason)
room.RemoveParticipant(participant.Identity(), participant.ID(), reason)
participant.GetLogger().Debugw("client leaving room", "reason", reason)
participant.HandleLeaveRequest(reason)
case *livekit.SignalRequest_SubscriptionPermission:
err := room.UpdateSubscriptionPermission(participant, msg.SubscriptionPermission)
err := participant.HandleUpdateSubscriptionPermission(msg.SubscriptionPermission)
if err != nil {
pLogger.Warnw("could not update subscription permission", err,
"permissions", msg.SubscriptionPermission)
participant.GetLogger().Warnw(
"could not update subscription permission", err,
"permissions", msg.SubscriptionPermission,
)
}
case *livekit.SignalRequest_SyncState:
err := room.SyncState(participant, msg.SyncState)
err := participant.HandleSyncState(msg.SyncState)
if err != nil {
pLogger.Warnw("could not sync state", err,
"state", msg.SyncState)
participant.GetLogger().Warnw(
"could not sync state", err,
"state", msg.SyncState,
)
}
case *livekit.SignalRequest_Simulate:
err := room.SimulateScenario(participant, msg.Simulate)
err := participant.HandleSimulateScenario(msg.Simulate)
if err != nil {
pLogger.Warnw("could not simulate scenario", err,
"simulate", msg.Simulate)
participant.GetLogger().Warnw(
"could not simulate scenario", err,
"simulate", msg.Simulate,
)
}
case *livekit.SignalRequest_PingReq:
@@ -121,7 +125,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
participant.SetAttributes(msg.UpdateMetadata.Attributes)
}
} else {
pLogger.Warnw("could not update metadata", err)
participant.GetLogger().Warnw("could not update metadata", err)
switch err {
case ErrNameExceedsLimits:
@@ -146,12 +150,12 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
case *livekit.SignalRequest_UpdateAudioTrack:
if err := participant.UpdateAudioTrack(msg.UpdateAudioTrack); err != nil {
pLogger.Warnw("could not update audio track", err, "update", msg.UpdateAudioTrack)
participant.GetLogger().Warnw("could not update audio track", err, "update", msg.UpdateAudioTrack)
}
case *livekit.SignalRequest_UpdateVideoTrack:
if err := participant.UpdateVideoTrack(msg.UpdateVideoTrack); err != nil {
pLogger.Warnw("could not update video track", err, "update", msg.UpdateVideoTrack)
participant.GetLogger().Warnw("could not update video track", err, "update", msg.UpdateVideoTrack)
}
}
+63
View File
@@ -0,0 +1,63 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"google.golang.org/protobuf/proto"
)
type ParticipantSignaller interface {
SetResponseSink(sink routing.MessageSink)
GetResponseSink() routing.MessageSink
CloseSignalConnection(reason types.SignallingCloseReason)
WriteMessage(msg proto.Message) error
}
type ParticipantSignalling interface {
SignalJoinResponse(join *livekit.JoinResponse) proto.Message
SignalParticipantUpdate(participants []*livekit.ParticipantInfo) proto.Message
SignalSpeakerUpdate(speakers []*livekit.SpeakerInfo) proto.Message
SignalRoomUpdate(room *livekit.Room) proto.Message
SignalConnectionQualityUpdate(connectionQuality *livekit.ConnectionQualityUpdate) proto.Message
SignalRefreshToken(token string) proto.Message
SignalRequestResponse(requestResponse *livekit.RequestResponse) proto.Message
SignalRoomMovedResponse(roomMoved *livekit.RoomMovedResponse) proto.Message
SignalReconnectResponse(reconnect *livekit.ReconnectResponse) proto.Message
SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message
SignalTrackMuted(mute *livekit.MuteTrackRequest) proto.Message
SignalTrackPublished(trackPublished *livekit.TrackPublishedResponse) proto.Message
SignalTrackUnpublished(trackUnpublished *livekit.TrackUnpublishedResponse) proto.Message
SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message
SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message
SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message
SignalSdpOffer(offer *livekit.SessionDescription) proto.Message
SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message
SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message
SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message
SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message
AckMessageId(ackMessageId uint32)
SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32)
PendingMessages() proto.Message
SignalConnectResponse(connectResponse *livekit.ConnectResponse) proto.Message
}
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
package signalling
import (
"math/rand"
@@ -22,6 +22,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
type SignalCacheParams struct {
@@ -32,9 +33,10 @@ type SignalCacheParams struct {
type SignalCache struct {
params SignalCacheParams
lock sync.Mutex
messageId uint32
messages deque.Deque[*livekit.Signalv2ServerMessage]
lock sync.Mutex
messageId uint32
lastProcessedRemoteMessageId uint32
messages deque.Deque[*livekit.Signalv2ServerMessage]
}
func NewSignalCache(params SignalCacheParams) *SignalCache {
@@ -49,18 +51,26 @@ func NewSignalCache(params SignalCacheParams) *SignalCache {
return s
}
func (s *SignalCache) Add(msg *livekit.Signalv2ServerMessage, lastRemoteId uint32) {
s.AddBatch([]*livekit.Signalv2ServerMessage{msg}, lastRemoteId)
func (s *SignalCache) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
s.lock.Lock()
defer s.lock.Unlock()
s.lastProcessedRemoteMessageId = lastProcessedRemoteMessageId
}
func (s *SignalCache) AddBatch(msgs []*livekit.Signalv2ServerMessage, lastRemoteId uint32) {
func (s *SignalCache) Add(msg *livekit.Signalv2ServerMessage) {
if msg != nil {
s.AddBatch([]*livekit.Signalv2ServerMessage{msg})
}
}
func (s *SignalCache) AddBatch(msgs []*livekit.Signalv2ServerMessage) {
s.lock.Lock()
defer s.lock.Unlock()
for _, msg := range msgs {
msg.Sequencer = &livekit.Sequencer{
MessageId: s.messageId,
LastProcessedRemoteMessageId: lastRemoteId,
MessageId: s.messageId,
}
s.messageId++
@@ -95,7 +105,9 @@ func (s *SignalCache) GetFromFront() []*livekit.Signalv2ServerMessage {
func (s *SignalCache) getFromFrontLocked() []*livekit.Signalv2ServerMessage {
var msgs []*livekit.Signalv2ServerMessage
for msg := range s.messages.Iter() {
msgs = append(msgs, msg)
clone := utils.CloneProto(msg)
clone.Sequencer.LastProcessedRemoteMessageId = s.lastProcessedRemoteMessageId
msgs = append(msgs, clone)
}
return msgs
+174
View File
@@ -0,0 +1,174 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
)
func TestSignalCache(t *testing.T) {
firstMessageId := uint32(10)
lastProcessedRemoteMessageId := uint32(2345)
cache := NewSignalCache(SignalCacheParams{
FirstMessageId: firstMessageId,
})
inputMessages := []*livekit.Signalv2ServerMessage{
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
// SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
}
expectedOutputMessages := []*livekit.Signalv2ServerMessage{
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
// SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 1,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 2,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 3,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
}
cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
// Add() - add one message at a time
for _, inputMessage := range inputMessages {
cache.Add(inputMessage)
}
// get all messages in cache
outputMessages := cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages))
// clear one and get again
cache.Clear(firstMessageId)
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages))
// clearing some evicted messages should not clear anything
cache.Clear(firstMessageId) // firstMessageId has been cleared already at this point
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages))
// clear some and get rest in one go
outputMessages = cache.ClearAndGetFrom(firstMessageId + 3)
require.Equal(t, 1, len(outputMessages))
require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages))
// getting again should get the same messages again as they sill should in cache
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages))
// clearing all and getting should return nil
require.Nil(t, cache.ClearAndGetFrom(firstMessageId+uint32(len(inputMessages))))
// getting again should return nil as the cache is fully cleared above
require.Nil(t, cache.GetFromFront())
lastProcessedRemoteMessageId = 4567
cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
expectedOutputMessages = []*livekit.Signalv2ServerMessage{
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
// SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 1 + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 2 + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 3 + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
}
// AddBatch() - add all messages at once
cache.AddBatch(inputMessages)
// get all messages in cache
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages))
}
func compareProtoSlices(a []*livekit.Signalv2ServerMessage, b []*livekit.Signalv2ServerMessage) bool {
if len(a) != len(b) {
return false
}
for i := 0; i < len(a); i++ {
if !proto.Equal(a[i], b[i]) {
return false
}
}
return true
}
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package rtc
package signalling
import (
"math/rand"
@@ -1,4 +1,18 @@
package rtc
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"testing"
+107
View File
@@ -0,0 +1,107 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"fmt"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"google.golang.org/protobuf/proto"
)
var _ ParticipantSignaller = (*signallerAsync)(nil)
type SignallerAsyncParams struct {
Logger logger.Logger
Participant types.LocalParticipant
}
type signallerAsync struct {
params SignallerAsyncParams
*signallerAsyncBase
}
func NewSignallerAsync(params SignallerAsyncParams) ParticipantSignaller {
return &signallerAsync{
params: params,
signallerAsyncBase: newSignallerAsyncBase(signallerAsyncBaseParams{Logger: params.Logger}),
}
}
func (s *signallerAsync) WriteMessage(msg proto.Message) error {
if msg == nil {
return nil
}
if s.params.Participant.IsDisconnected() {
return nil
}
if !s.params.Participant.IsReady() {
if typed, ok := msg.(*livekit.SignalResponse); !ok {
s.params.Logger.Warnw(
"unknown message type", nil,
"messageType", fmt.Sprintf("%T", msg),
)
} else {
if typed.GetJoin() == nil {
return nil
}
}
}
sink := s.GetResponseSink()
if sink == nil {
if typed, ok := msg.(*livekit.SignalResponse); ok {
s.params.Logger.Debugw(
"could not send message to participant",
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
return nil
}
err := sink.WriteMessage(msg)
if err != nil {
// SIGNALLING-V2-TODO: check for data channel errors to treat as non-error
if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) {
if typed, ok := msg.(*livekit.SignalResponse); ok {
s.params.Logger.Debugw(
"could not send message to participant",
"error", err,
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
return nil
} else {
if typed, ok := msg.(*livekit.SignalResponse); ok {
s.params.Logger.Warnw(
"could not send message to participant", err,
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
return err
}
}
return nil
}
+67
View File
@@ -0,0 +1,67 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"sync"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
type signallerAsyncBaseParams struct {
Logger logger.Logger
}
type signallerAsyncBase struct {
signallerUnimplemented
params signallerAsyncBaseParams
resSinkMu sync.Mutex
resSink routing.MessageSink
}
func newSignallerAsyncBase(params signallerAsyncBaseParams) *signallerAsyncBase {
return &signallerAsyncBase{
params: params,
}
}
func (s *signallerAsyncBase) SetResponseSink(sink routing.MessageSink) {
s.resSinkMu.Lock()
defer s.resSinkMu.Unlock()
s.resSink = sink
}
func (s *signallerAsyncBase) GetResponseSink() routing.MessageSink {
s.resSinkMu.Lock()
defer s.resSinkMu.Unlock()
return s.resSink
}
// closes signal connection to notify client to resume/reconnect
func (s *signallerAsyncBase) CloseSignalConnection(reason types.SignallingCloseReason) {
sink := s.GetResponseSink()
if sink == nil {
return
}
s.params.Logger.Debugw("closing signal connection", "reason", reason, "connID", sink.ConnectionID())
sink.Close()
s.SetResponseSink(nil)
}
@@ -0,0 +1,38 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"google.golang.org/protobuf/proto"
)
var _ ParticipantSignaller = (*signallerUnimplemented)(nil)
type signallerUnimplemented struct{}
func (u *signallerUnimplemented) SetResponseSink(sink routing.MessageSink) {}
func (u *signallerUnimplemented) GetResponseSink() routing.MessageSink {
return nil
}
func (u *signallerUnimplemented) CloseSignalConnection(reason types.SignallingCloseReason) {}
func (u *signallerUnimplemented) WriteMessage(msg proto.Message) error {
return nil
}
+172
View File
@@ -0,0 +1,172 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"fmt"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"google.golang.org/protobuf/proto"
)
var _ ParticipantSignaller = (*signallerv2Async)(nil)
type Signallerv2AsyncParams struct {
Logger logger.Logger
Participant types.LocalParticipant
}
type signallerv2Async struct {
params Signallerv2AsyncParams
*signallerAsyncBase
signalFragment *SignalFragment
}
func NewSignallerv2Async(params Signallerv2AsyncParams) ParticipantSignaller {
return &signallerv2Async{
params: params,
signallerAsyncBase: newSignallerAsyncBase(signallerAsyncBaseParams{Logger: params.Logger}),
signalFragment: NewSignalFragment(SignalFragmentParams{
Logger: params.Logger,
}),
}
}
// SIGNALLING-V2-TODO: need to lock write so that fragments do not get interrupted
func (s *signallerv2Async) WriteMessage(msg proto.Message) error {
if msg == nil {
return nil
}
if s.params.Participant.IsDisconnected() {
return nil
}
if !s.params.Participant.IsReady() {
if typed, ok := msg.(*livekit.Signalv2WireMessage); !ok {
s.params.Logger.Warnw(
"unknown message type", nil,
"messageType", fmt.Sprintf("%T", msg),
)
} else {
if !hasConnectResponse(typed) {
return nil
}
}
}
sink := s.GetResponseSink()
if sink == nil {
if typed, ok := msg.(*livekit.Signalv2WireMessage); ok {
s.params.Logger.Debugw(
"could not send message to participant",
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
return nil
}
// SIGNALLING-V2-TODO: avoid double marshalling,
// have to marshal once to get size of serialised packet and decide if it needs fragmentation,
// should used the marshaled bytes if fragmentation is not needed
var fragments []*livekit.Fragment
marshaled, err := proto.Marshal(msg)
if err != nil {
if typed, ok := msg.(*livekit.Signalv2WireMessage); ok {
s.params.Logger.Warnw(
"could not send message to participant", err,
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
} else {
fragments = s.signalFragment.Segment(marshaled)
}
sendMsg := func(m proto.Message) error {
if err := sink.WriteMessage(m); err != nil {
// SIGNALLING-V2-TODO: check for data channel errors to treat as debug too
if utils.ErrorIsOneOf(err, psrpc.Canceled, routing.ErrChannelClosed) {
if typed, ok := m.(*livekit.Signalv2WireMessage); ok {
s.params.Logger.Debugw(
"could not send message to participant",
"error", err,
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
return nil
} else {
if typed, ok := m.(*livekit.Signalv2WireMessage); ok {
s.params.Logger.Warnw(
"could not send message to participant", err,
"messageType", fmt.Sprintf("%T", typed.Message),
)
}
return err
}
}
return nil
}
if len(fragments) != 0 {
for _, fragment := range fragments {
wireMessage := &livekit.Signalv2WireMessage{
Message: &livekit.Signalv2WireMessage_Fragment{
Fragment: fragment,
},
}
if err := sendMsg(wireMessage); err != nil {
return err
}
}
} else {
if err := sendMsg(msg); err != nil {
return err
}
}
return nil
}
// ----------------------------
func hasConnectResponse(wireMessage *livekit.Signalv2WireMessage) bool {
switch msg := wireMessage.GetMessage().(type) {
case *livekit.Signalv2WireMessage_Envelope:
for _, innerMsg := range msg.Envelope.GetServerMessages() {
switch innerMsg.GetMessage().(type) {
case *livekit.Signalv2ServerMessage_ConnectResponse:
return true
default:
return false // first message should be `ConnectResponse`
}
}
default:
// SIGNALLING-V2-TODO: handle ConnectResponse getting fragmented.
return false
}
return false
}
+38
View File
@@ -0,0 +1,38 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"github.com/livekit/protocol/logger"
)
var _ ParticipantSignaller = (*signallerv2Hybrid)(nil)
type Signallerv2HybridParams struct {
Logger logger.Logger
}
type signallerv2Hybrid struct {
params Signallerv2HybridParams
*signallerv2Async
}
func NewSignallerv2Hybrid(params Signallerv2HybridParams) ParticipantSignaller {
return &signallerv2Hybrid{
params: params,
signallerv2Async: NewSignallerv2Async(Signallerv2AsyncParams{Logger: params.Logger}).(*signallerv2Async),
}
}
+298
View File
@@ -0,0 +1,298 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"google.golang.org/protobuf/proto"
)
var _ ParticipantSignalling = (*signalling)(nil)
type SignallingParams struct {
Logger logger.Logger
}
type signalling struct {
signallingUnimplemented
params SignallingParams
}
func NewSignalling(params SignallingParams) ParticipantSignalling {
return &signalling{
params: params,
}
}
func (s *signalling) SignalJoinResponse(join *livekit.JoinResponse) proto.Message {
if join == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Join{
Join: join,
},
}
}
func (s *signalling) SignalParticipantUpdate(participants []*livekit.ParticipantInfo) proto.Message {
if len(participants) == 0 {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Update{
Update: &livekit.ParticipantUpdate{
Participants: participants,
},
},
}
}
func (s *signalling) SignalSpeakerUpdate(speakers []*livekit.SpeakerInfo) proto.Message {
if len(speakers) == 0 {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_SpeakersChanged{
SpeakersChanged: &livekit.SpeakersChanged{
Speakers: speakers,
},
},
}
}
func (s *signalling) SignalRoomUpdate(room *livekit.Room) proto.Message {
if room == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_RoomUpdate{
RoomUpdate: &livekit.RoomUpdate{
Room: room,
},
},
}
}
func (s *signalling) SignalConnectionQualityUpdate(connectionQuality *livekit.ConnectionQualityUpdate) proto.Message {
if connectionQuality == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_ConnectionQuality{
ConnectionQuality: connectionQuality,
},
}
}
func (s *signalling) SignalRefreshToken(token string) proto.Message {
if token == "" {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_RefreshToken{
RefreshToken: token,
},
}
}
func (s *signalling) SignalRequestResponse(requestResponse *livekit.RequestResponse) proto.Message {
if requestResponse == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_RequestResponse{
RequestResponse: requestResponse,
},
}
}
func (s *signalling) SignalRoomMovedResponse(roomMoved *livekit.RoomMovedResponse) proto.Message {
if roomMoved == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_RoomMoved{
RoomMoved: roomMoved,
},
}
}
func (s *signalling) SignalReconnectResponse(reconnect *livekit.ReconnectResponse) proto.Message {
if reconnect == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Reconnect{
Reconnect: reconnect,
},
}
}
func (s *signalling) SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message {
if trickle == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Trickle{
Trickle: trickle,
},
}
}
func (s *signalling) SignalTrackMuted(mute *livekit.MuteTrackRequest) proto.Message {
if mute == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Mute{
Mute: mute,
},
}
}
func (s *signalling) SignalTrackPublished(trackPublished *livekit.TrackPublishedResponse) proto.Message {
if trackPublished == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_TrackPublished{
TrackPublished: trackPublished,
},
}
}
func (s *signalling) SignalTrackUnpublished(trackUnpublished *livekit.TrackUnpublishedResponse) proto.Message {
if trackUnpublished == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_TrackUnpublished{
TrackUnpublished: trackUnpublished,
},
}
}
func (s *signalling) SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message {
if trackSubscribed == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_TrackSubscribed{
TrackSubscribed: trackSubscribed,
},
}
}
func (s *signalling) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message {
if leave == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: leave,
},
}
}
func (s *signalling) SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message {
if answer == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: answer,
},
}
}
func (s *signalling) SignalSdpOffer(offer *livekit.SessionDescription) proto.Message {
if offer == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: offer,
},
}
}
func (s *signalling) SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message {
if streamStateUpdate == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_StreamStateUpdate{
StreamStateUpdate: streamStateUpdate,
},
}
}
func (s *signalling) SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message {
if subscribedQualityUpdate == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscribedQualityUpdate{
SubscribedQualityUpdate: subscribedQualityUpdate,
},
}
}
func (s *signalling) SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message {
if subscriptionResponse == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscriptionResponse{
SubscriptionResponse: subscriptionResponse,
},
}
}
func (s *signalling) SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message {
if subscriptionPermissionUpdate == nil {
return nil
}
return &livekit.SignalResponse{
Message: &livekit.SignalResponse_SubscriptionPermissionUpdate{
SubscriptionPermissionUpdate: subscriptionPermissionUpdate,
},
}
}
@@ -0,0 +1,122 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"github.com/livekit/protocol/livekit"
"google.golang.org/protobuf/proto"
)
var _ ParticipantSignalling = (*signallingUnimplemented)(nil)
type signallingUnimplemented struct{}
func (u *signallingUnimplemented) SignalJoinResponse(join *livekit.JoinResponse) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalParticipantUpdate(participants []*livekit.ParticipantInfo) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSpeakerUpdate(speakers []*livekit.SpeakerInfo) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalRoomUpdate(room *livekit.Room) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalConnectionQualityUpdate(connectionQuality *livekit.ConnectionQualityUpdate) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalRefreshToken(token string) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalRequestResponse(requestResponse *livekit.RequestResponse) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalRoomMovedResponse(roomMoved *livekit.RoomMovedResponse) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalReconnectResponse(reconnect *livekit.ReconnectResponse) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalICECandidate(trickle *livekit.TrickleRequest) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalTrackMuted(mute *livekit.MuteTrackRequest) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalTrackPublished(trackPublished *livekit.TrackPublishedResponse) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalTrackUnpublished(trackUnpublished *livekit.TrackUnpublishedResponse) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalLeaveRequest(leave *livekit.LeaveRequest) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSdpAnswer(answer *livekit.SessionDescription) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSdpOffer(offer *livekit.SessionDescription) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSubscriptionResponse(subscriptionResponse *livekit.SubscriptionResponse) proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalSubscriptionPermissionUpdate(subscriptionPermissionUpdate *livekit.SubscriptionPermissionUpdate) proto.Message {
return nil
}
func (u *signallingUnimplemented) AckMessageId(ackMessageId uint32) {}
func (u *signallingUnimplemented) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
}
func (u *signallingUnimplemented) PendingMessages() proto.Message {
return nil
}
func (u *signallingUnimplemented) SignalConnectResponse(connectResponse *livekit.ConnectResponse) proto.Message {
return nil
}
+91
View File
@@ -0,0 +1,91 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"google.golang.org/protobuf/proto"
)
var _ ParticipantSignalling = (*signallingv2)(nil)
type Signallingv2Params struct {
Logger logger.Logger
}
type signallingv2 struct {
signallingUnimplemented
params Signallingv2Params
signalCache *SignalCache
signalFragment *SignalFragment
}
func NewSignallingv2(params Signallingv2Params) ParticipantSignalling {
return &signallingv2{
params: params,
signalCache: NewSignalCache(SignalCacheParams{
Logger: params.Logger,
}),
signalFragment: NewSignalFragment(SignalFragmentParams{
Logger: params.Logger,
}),
}
}
func (s *signallingv2) AckMessageId(ackMessageId uint32) {
s.signalCache.Clear(ackMessageId)
}
func (s *signallingv2) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
s.signalCache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
}
func (s *signallingv2) PendingMessages() proto.Message {
serverMessages := s.signalCache.GetFromFront()
if len(serverMessages) == 0 {
return nil
}
return &livekit.Signalv2WireMessage{
Message: &livekit.Signalv2WireMessage_Envelope{
Envelope: &livekit.Envelope{
ServerMessages: serverMessages,
},
},
}
}
func (s *signallingv2) SignalConnectResponse(connectResponse *livekit.ConnectResponse) proto.Message {
if connectResponse == nil {
return nil
}
serverMessage := &livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
ConnectResponse: connectResponse,
},
}
s.signalCache.Add(serverMessage)
return &livekit.Signalv2WireMessage{
Message: &livekit.Signalv2WireMessage_Envelope{
Envelope: &livekit.Envelope{
ServerMessages: []*livekit.Signalv2ServerMessage{serverMessage},
},
},
}
}
+30 -4
View File
@@ -32,6 +32,8 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/mime"
"github.com/livekit/livekit-server/pkg/sfu/pacer"
"google.golang.org/protobuf/proto"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -376,6 +378,7 @@ type LocalParticipant interface {
GetLastReliableSequence(migrateOut bool) uint32
SetResponseSink(sink routing.MessageSink)
GetResponseSink() routing.MessageSink
CloseSignalConnection(reason SignallingCloseReason)
UpdateLastSeenSignal()
SetSignalSourceValid(valid bool)
@@ -450,6 +453,8 @@ type LocalParticipant interface {
HandleReconnectAndSendResponse(reconnectReason livekit.ReconnectReason, reconnectResponse *livekit.ReconnectResponse) error
IssueFullReconnect(reason ParticipantCloseReason)
SendRoomMovedResponse(moved *livekit.RoomMovedResponse) error
SendConnectResponse(connectResponse *livekit.ConnectResponse) error
SignalPendingMessages() proto.Message
// callbacks
OnStateChange(func(p LocalParticipant))
@@ -468,6 +473,16 @@ type LocalParticipant interface {
OnSubscribeStatusChanged(fn func(publisherID livekit.ParticipantID, subscribed bool))
OnClose(callback func(LocalParticipant))
OnClaimsChanged(callback func(LocalParticipant))
OnUpdateSubscriptions(func(
LocalParticipant,
[]livekit.TrackID,
[]*livekit.ParticipantTracks,
bool,
))
OnUpdateSubscriptionPermission(func(LocalParticipant, *livekit.SubscriptionPermission) error)
OnSyncState(func(LocalParticipant, *livekit.SyncState) error)
OnSimulateScenario(func(LocalParticipant, *livekit.SimulateScenario) error)
OnLeave(func(LocalParticipant, ParticipantCloseReason))
HandleReceiverReport(dt *sfu.DownTrack, report *rtcp.ReceiverReport)
@@ -508,6 +523,15 @@ type LocalParticipant interface {
GetDisableSenderReportPassThrough() bool
HandleMetrics(senderParticipantID livekit.ParticipantID, batch *livekit.MetricsBatch) error
HandleUpdateSubscriptions(
[]livekit.TrackID,
[]*livekit.ParticipantTracks,
bool,
)
HandleUpdateSubscriptionPermission(*livekit.SubscriptionPermission) error
HandleSyncState(*livekit.SyncState) error
HandleSimulateScenario(*livekit.SimulateScenario) error
HandleLeaveRequest(reason ParticipantCloseReason)
}
// Room is a container of participants, and can provide room-level actions
@@ -517,10 +541,12 @@ type Room interface {
Name() livekit.RoomName
ID() livekit.RoomID
RemoveParticipant(identity livekit.ParticipantIdentity, pID livekit.ParticipantID, reason ParticipantCloseReason)
UpdateSubscriptions(participant LocalParticipant, trackIDs []livekit.TrackID, participantTracks []*livekit.ParticipantTracks, subscribe bool)
UpdateSubscriptionPermission(participant LocalParticipant, permissions *livekit.SubscriptionPermission) error
SyncState(participant LocalParticipant, state *livekit.SyncState) error
SimulateScenario(participant LocalParticipant, scenario *livekit.SimulateScenario) error
UpdateSubscriptions(
participant LocalParticipant,
trackIDs []livekit.TrackID,
participantTracks []*livekit.ParticipantTracks,
subscribe bool,
)
ResolveMediaTrackForSubscriber(sub LocalParticipant, trackID livekit.TrackID) MediaResolverResult
GetLocalParticipants() []LocalParticipant
IsDataMessageUserPacketDuplicate(ip *livekit.UserPacket) bool
@@ -17,6 +17,7 @@ import (
"github.com/livekit/protocol/utils"
"github.com/pion/rtcp"
webrtc "github.com/pion/webrtc/v4"
"google.golang.org/protobuf/proto"
)
type FakeLocalParticipant struct {
@@ -456,6 +457,16 @@ type FakeLocalParticipant struct {
getReporterResolverReturnsOnCall map[int]struct {
result1 roomobs.ParticipantReporterResolver
}
GetResponseSinkStub func() routing.MessageSink
getResponseSinkMutex sync.RWMutex
getResponseSinkArgsForCall []struct {
}
getResponseSinkReturns struct {
result1 routing.MessageSink
}
getResponseSinkReturnsOnCall map[int]struct {
result1 routing.MessageSink
}
GetSubscribedParticipantsStub func() []livekit.ParticipantID
getSubscribedParticipantsMutex sync.RWMutex
getSubscribedParticipantsArgsForCall []struct {
@@ -516,6 +527,11 @@ type FakeLocalParticipant struct {
handleICETrickleSDPFragmentReturnsOnCall map[int]struct {
result1 error
}
HandleLeaveRequestStub func(types.ParticipantCloseReason)
handleLeaveRequestMutex sync.RWMutex
handleLeaveRequestArgsForCall []struct {
arg1 types.ParticipantCloseReason
}
HandleMetricsStub func(livekit.ParticipantID, *livekit.MetricsBatch) error
handleMetricsMutex sync.RWMutex
handleMetricsArgsForCall []struct {
@@ -562,6 +578,46 @@ type FakeLocalParticipant struct {
handleSignalSourceCloseMutex sync.RWMutex
handleSignalSourceCloseArgsForCall []struct {
}
HandleSimulateScenarioStub func(*livekit.SimulateScenario) error
handleSimulateScenarioMutex sync.RWMutex
handleSimulateScenarioArgsForCall []struct {
arg1 *livekit.SimulateScenario
}
handleSimulateScenarioReturns struct {
result1 error
}
handleSimulateScenarioReturnsOnCall map[int]struct {
result1 error
}
HandleSyncStateStub func(*livekit.SyncState) error
handleSyncStateMutex sync.RWMutex
handleSyncStateArgsForCall []struct {
arg1 *livekit.SyncState
}
handleSyncStateReturns struct {
result1 error
}
handleSyncStateReturnsOnCall map[int]struct {
result1 error
}
HandleUpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission) error
handleUpdateSubscriptionPermissionMutex sync.RWMutex
handleUpdateSubscriptionPermissionArgsForCall []struct {
arg1 *livekit.SubscriptionPermission
}
handleUpdateSubscriptionPermissionReturns struct {
result1 error
}
handleUpdateSubscriptionPermissionReturnsOnCall map[int]struct {
result1 error
}
HandleUpdateSubscriptionsStub func([]livekit.TrackID, []*livekit.ParticipantTracks, bool)
handleUpdateSubscriptionsMutex sync.RWMutex
handleUpdateSubscriptionsArgsForCall []struct {
arg1 []livekit.TrackID
arg2 []*livekit.ParticipantTracks
arg3 bool
}
HasConnectedStub func() bool
hasConnectedMutex sync.RWMutex
hasConnectedArgsForCall []struct {
@@ -808,6 +864,11 @@ type FakeLocalParticipant struct {
onICEConfigChangedArgsForCall []struct {
arg1 func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)
}
OnLeaveStub func(func(types.LocalParticipant, types.ParticipantCloseReason))
onLeaveMutex sync.RWMutex
onLeaveArgsForCall []struct {
arg1 func(types.LocalParticipant, types.ParticipantCloseReason)
}
OnMetricsStub func(func(types.Participant, *livekit.DataPacket))
onMetricsMutex sync.RWMutex
onMetricsArgsForCall []struct {
@@ -823,6 +884,11 @@ type FakeLocalParticipant struct {
onParticipantUpdateArgsForCall []struct {
arg1 func(types.LocalParticipant)
}
OnSimulateScenarioStub func(func(types.LocalParticipant, *livekit.SimulateScenario) error)
onSimulateScenarioMutex sync.RWMutex
onSimulateScenarioArgsForCall []struct {
arg1 func(types.LocalParticipant, *livekit.SimulateScenario) error
}
OnStateChangeStub func(func(p types.LocalParticipant))
onStateChangeMutex sync.RWMutex
onStateChangeArgsForCall []struct {
@@ -838,6 +904,11 @@ type FakeLocalParticipant struct {
onSubscriberReadyArgsForCall []struct {
arg1 func(types.LocalParticipant)
}
OnSyncStateStub func(func(types.LocalParticipant, *livekit.SyncState) error)
onSyncStateMutex sync.RWMutex
onSyncStateArgsForCall []struct {
arg1 func(types.LocalParticipant, *livekit.SyncState) error
}
OnTrackPublishedStub func(func(types.LocalParticipant, types.MediaTrack))
onTrackPublishedMutex sync.RWMutex
onTrackPublishedArgsForCall []struct {
@@ -853,6 +924,16 @@ type FakeLocalParticipant struct {
onTrackUpdatedArgsForCall []struct {
arg1 func(types.LocalParticipant, types.MediaTrack)
}
OnUpdateSubscriptionPermissionStub func(func(types.LocalParticipant, *livekit.SubscriptionPermission) error)
onUpdateSubscriptionPermissionMutex sync.RWMutex
onUpdateSubscriptionPermissionArgsForCall []struct {
arg1 func(types.LocalParticipant, *livekit.SubscriptionPermission) error
}
OnUpdateSubscriptionsStub func(func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool))
onUpdateSubscriptionsMutex sync.RWMutex
onUpdateSubscriptionsArgsForCall []struct {
arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)
}
ProtocolVersionStub func() types.ProtocolVersion
protocolVersionMutex sync.RWMutex
protocolVersionArgsForCall []struct {
@@ -880,6 +961,17 @@ type FakeLocalParticipant struct {
removeTrackLocalReturnsOnCall map[int]struct {
result1 error
}
SendConnectResponseStub func(*livekit.ConnectResponse) error
sendConnectResponseMutex sync.RWMutex
sendConnectResponseArgsForCall []struct {
arg1 *livekit.ConnectResponse
}
sendConnectResponseReturns struct {
result1 error
}
sendConnectResponseReturnsOnCall map[int]struct {
result1 error
}
SendConnectionQualityUpdateStub func(*livekit.ConnectionQualityUpdate) error
sendConnectionQualityUpdateMutex sync.RWMutex
sendConnectionQualityUpdateArgsForCall []struct {
@@ -1087,6 +1179,16 @@ type FakeLocalParticipant struct {
setTrackMutedReturnsOnCall map[int]struct {
result1 *livekit.TrackInfo
}
SignalPendingMessagesStub func() proto.Message
signalPendingMessagesMutex sync.RWMutex
signalPendingMessagesArgsForCall []struct {
}
signalPendingMessagesReturns struct {
result1 proto.Message
}
signalPendingMessagesReturnsOnCall map[int]struct {
result1 proto.Message
}
StateStub func() livekit.ParticipantInfo_State
stateMutex sync.RWMutex
stateArgsForCall []struct {
@@ -3590,6 +3692,59 @@ func (fake *FakeLocalParticipant) GetReporterResolverReturnsOnCall(i int, result
}{result1}
}
func (fake *FakeLocalParticipant) GetResponseSink() routing.MessageSink {
fake.getResponseSinkMutex.Lock()
ret, specificReturn := fake.getResponseSinkReturnsOnCall[len(fake.getResponseSinkArgsForCall)]
fake.getResponseSinkArgsForCall = append(fake.getResponseSinkArgsForCall, struct {
}{})
stub := fake.GetResponseSinkStub
fakeReturns := fake.getResponseSinkReturns
fake.recordInvocation("GetResponseSink", []interface{}{})
fake.getResponseSinkMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetResponseSinkCallCount() int {
fake.getResponseSinkMutex.RLock()
defer fake.getResponseSinkMutex.RUnlock()
return len(fake.getResponseSinkArgsForCall)
}
func (fake *FakeLocalParticipant) GetResponseSinkCalls(stub func() routing.MessageSink) {
fake.getResponseSinkMutex.Lock()
defer fake.getResponseSinkMutex.Unlock()
fake.GetResponseSinkStub = stub
}
func (fake *FakeLocalParticipant) GetResponseSinkReturns(result1 routing.MessageSink) {
fake.getResponseSinkMutex.Lock()
defer fake.getResponseSinkMutex.Unlock()
fake.GetResponseSinkStub = nil
fake.getResponseSinkReturns = struct {
result1 routing.MessageSink
}{result1}
}
func (fake *FakeLocalParticipant) GetResponseSinkReturnsOnCall(i int, result1 routing.MessageSink) {
fake.getResponseSinkMutex.Lock()
defer fake.getResponseSinkMutex.Unlock()
fake.GetResponseSinkStub = nil
if fake.getResponseSinkReturnsOnCall == nil {
fake.getResponseSinkReturnsOnCall = make(map[int]struct {
result1 routing.MessageSink
})
}
fake.getResponseSinkReturnsOnCall[i] = struct {
result1 routing.MessageSink
}{result1}
}
func (fake *FakeLocalParticipant) GetSubscribedParticipants() []livekit.ParticipantID {
fake.getSubscribedParticipantsMutex.Lock()
ret, specificReturn := fake.getSubscribedParticipantsReturnsOnCall[len(fake.getSubscribedParticipantsArgsForCall)]
@@ -3907,6 +4062,38 @@ func (fake *FakeLocalParticipant) HandleICETrickleSDPFragmentReturnsOnCall(i int
}{result1}
}
func (fake *FakeLocalParticipant) HandleLeaveRequest(arg1 types.ParticipantCloseReason) {
fake.handleLeaveRequestMutex.Lock()
fake.handleLeaveRequestArgsForCall = append(fake.handleLeaveRequestArgsForCall, struct {
arg1 types.ParticipantCloseReason
}{arg1})
stub := fake.HandleLeaveRequestStub
fake.recordInvocation("HandleLeaveRequest", []interface{}{arg1})
fake.handleLeaveRequestMutex.Unlock()
if stub != nil {
fake.HandleLeaveRequestStub(arg1)
}
}
func (fake *FakeLocalParticipant) HandleLeaveRequestCallCount() int {
fake.handleLeaveRequestMutex.RLock()
defer fake.handleLeaveRequestMutex.RUnlock()
return len(fake.handleLeaveRequestArgsForCall)
}
func (fake *FakeLocalParticipant) HandleLeaveRequestCalls(stub func(types.ParticipantCloseReason)) {
fake.handleLeaveRequestMutex.Lock()
defer fake.handleLeaveRequestMutex.Unlock()
fake.HandleLeaveRequestStub = stub
}
func (fake *FakeLocalParticipant) HandleLeaveRequestArgsForCall(i int) types.ParticipantCloseReason {
fake.handleLeaveRequestMutex.RLock()
defer fake.handleLeaveRequestMutex.RUnlock()
argsForCall := fake.handleLeaveRequestArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) HandleMetrics(arg1 livekit.ParticipantID, arg2 *livekit.MetricsBatch) error {
fake.handleMetricsMutex.Lock()
ret, specificReturn := fake.handleMetricsReturnsOnCall[len(fake.handleMetricsArgsForCall)]
@@ -4150,6 +4337,233 @@ func (fake *FakeLocalParticipant) HandleSignalSourceCloseCalls(stub func()) {
fake.HandleSignalSourceCloseStub = stub
}
func (fake *FakeLocalParticipant) HandleSimulateScenario(arg1 *livekit.SimulateScenario) error {
fake.handleSimulateScenarioMutex.Lock()
ret, specificReturn := fake.handleSimulateScenarioReturnsOnCall[len(fake.handleSimulateScenarioArgsForCall)]
fake.handleSimulateScenarioArgsForCall = append(fake.handleSimulateScenarioArgsForCall, struct {
arg1 *livekit.SimulateScenario
}{arg1})
stub := fake.HandleSimulateScenarioStub
fakeReturns := fake.handleSimulateScenarioReturns
fake.recordInvocation("HandleSimulateScenario", []interface{}{arg1})
fake.handleSimulateScenarioMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) HandleSimulateScenarioCallCount() int {
fake.handleSimulateScenarioMutex.RLock()
defer fake.handleSimulateScenarioMutex.RUnlock()
return len(fake.handleSimulateScenarioArgsForCall)
}
func (fake *FakeLocalParticipant) HandleSimulateScenarioCalls(stub func(*livekit.SimulateScenario) error) {
fake.handleSimulateScenarioMutex.Lock()
defer fake.handleSimulateScenarioMutex.Unlock()
fake.HandleSimulateScenarioStub = stub
}
func (fake *FakeLocalParticipant) HandleSimulateScenarioArgsForCall(i int) *livekit.SimulateScenario {
fake.handleSimulateScenarioMutex.RLock()
defer fake.handleSimulateScenarioMutex.RUnlock()
argsForCall := fake.handleSimulateScenarioArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) HandleSimulateScenarioReturns(result1 error) {
fake.handleSimulateScenarioMutex.Lock()
defer fake.handleSimulateScenarioMutex.Unlock()
fake.HandleSimulateScenarioStub = nil
fake.handleSimulateScenarioReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleSimulateScenarioReturnsOnCall(i int, result1 error) {
fake.handleSimulateScenarioMutex.Lock()
defer fake.handleSimulateScenarioMutex.Unlock()
fake.HandleSimulateScenarioStub = nil
if fake.handleSimulateScenarioReturnsOnCall == nil {
fake.handleSimulateScenarioReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.handleSimulateScenarioReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleSyncState(arg1 *livekit.SyncState) error {
fake.handleSyncStateMutex.Lock()
ret, specificReturn := fake.handleSyncStateReturnsOnCall[len(fake.handleSyncStateArgsForCall)]
fake.handleSyncStateArgsForCall = append(fake.handleSyncStateArgsForCall, struct {
arg1 *livekit.SyncState
}{arg1})
stub := fake.HandleSyncStateStub
fakeReturns := fake.handleSyncStateReturns
fake.recordInvocation("HandleSyncState", []interface{}{arg1})
fake.handleSyncStateMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) HandleSyncStateCallCount() int {
fake.handleSyncStateMutex.RLock()
defer fake.handleSyncStateMutex.RUnlock()
return len(fake.handleSyncStateArgsForCall)
}
func (fake *FakeLocalParticipant) HandleSyncStateCalls(stub func(*livekit.SyncState) error) {
fake.handleSyncStateMutex.Lock()
defer fake.handleSyncStateMutex.Unlock()
fake.HandleSyncStateStub = stub
}
func (fake *FakeLocalParticipant) HandleSyncStateArgsForCall(i int) *livekit.SyncState {
fake.handleSyncStateMutex.RLock()
defer fake.handleSyncStateMutex.RUnlock()
argsForCall := fake.handleSyncStateArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) HandleSyncStateReturns(result1 error) {
fake.handleSyncStateMutex.Lock()
defer fake.handleSyncStateMutex.Unlock()
fake.HandleSyncStateStub = nil
fake.handleSyncStateReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleSyncStateReturnsOnCall(i int, result1 error) {
fake.handleSyncStateMutex.Lock()
defer fake.handleSyncStateMutex.Unlock()
fake.HandleSyncStateStub = nil
if fake.handleSyncStateReturnsOnCall == nil {
fake.handleSyncStateReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.handleSyncStateReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission) error {
fake.handleUpdateSubscriptionPermissionMutex.Lock()
ret, specificReturn := fake.handleUpdateSubscriptionPermissionReturnsOnCall[len(fake.handleUpdateSubscriptionPermissionArgsForCall)]
fake.handleUpdateSubscriptionPermissionArgsForCall = append(fake.handleUpdateSubscriptionPermissionArgsForCall, struct {
arg1 *livekit.SubscriptionPermission
}{arg1})
stub := fake.HandleUpdateSubscriptionPermissionStub
fakeReturns := fake.handleUpdateSubscriptionPermissionReturns
fake.recordInvocation("HandleUpdateSubscriptionPermission", []interface{}{arg1})
fake.handleUpdateSubscriptionPermissionMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionCallCount() int {
fake.handleUpdateSubscriptionPermissionMutex.RLock()
defer fake.handleUpdateSubscriptionPermissionMutex.RUnlock()
return len(fake.handleUpdateSubscriptionPermissionArgsForCall)
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission) error) {
fake.handleUpdateSubscriptionPermissionMutex.Lock()
defer fake.handleUpdateSubscriptionPermissionMutex.Unlock()
fake.HandleUpdateSubscriptionPermissionStub = stub
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionArgsForCall(i int) *livekit.SubscriptionPermission {
fake.handleUpdateSubscriptionPermissionMutex.RLock()
defer fake.handleUpdateSubscriptionPermissionMutex.RUnlock()
argsForCall := fake.handleUpdateSubscriptionPermissionArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionReturns(result1 error) {
fake.handleUpdateSubscriptionPermissionMutex.Lock()
defer fake.handleUpdateSubscriptionPermissionMutex.Unlock()
fake.HandleUpdateSubscriptionPermissionStub = nil
fake.handleUpdateSubscriptionPermissionReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionPermissionReturnsOnCall(i int, result1 error) {
fake.handleUpdateSubscriptionPermissionMutex.Lock()
defer fake.handleUpdateSubscriptionPermissionMutex.Unlock()
fake.HandleUpdateSubscriptionPermissionStub = nil
if fake.handleUpdateSubscriptionPermissionReturnsOnCall == nil {
fake.handleUpdateSubscriptionPermissionReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.handleUpdateSubscriptionPermissionReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptions(arg1 []livekit.TrackID, arg2 []*livekit.ParticipantTracks, arg3 bool) {
var arg1Copy []livekit.TrackID
if arg1 != nil {
arg1Copy = make([]livekit.TrackID, len(arg1))
copy(arg1Copy, arg1)
}
var arg2Copy []*livekit.ParticipantTracks
if arg2 != nil {
arg2Copy = make([]*livekit.ParticipantTracks, len(arg2))
copy(arg2Copy, arg2)
}
fake.handleUpdateSubscriptionsMutex.Lock()
fake.handleUpdateSubscriptionsArgsForCall = append(fake.handleUpdateSubscriptionsArgsForCall, struct {
arg1 []livekit.TrackID
arg2 []*livekit.ParticipantTracks
arg3 bool
}{arg1Copy, arg2Copy, arg3})
stub := fake.HandleUpdateSubscriptionsStub
fake.recordInvocation("HandleUpdateSubscriptions", []interface{}{arg1Copy, arg2Copy, arg3})
fake.handleUpdateSubscriptionsMutex.Unlock()
if stub != nil {
fake.HandleUpdateSubscriptionsStub(arg1, arg2, arg3)
}
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionsCallCount() int {
fake.handleUpdateSubscriptionsMutex.RLock()
defer fake.handleUpdateSubscriptionsMutex.RUnlock()
return len(fake.handleUpdateSubscriptionsArgsForCall)
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionsCalls(stub func([]livekit.TrackID, []*livekit.ParticipantTracks, bool)) {
fake.handleUpdateSubscriptionsMutex.Lock()
defer fake.handleUpdateSubscriptionsMutex.Unlock()
fake.HandleUpdateSubscriptionsStub = stub
}
func (fake *FakeLocalParticipant) HandleUpdateSubscriptionsArgsForCall(i int) ([]livekit.TrackID, []*livekit.ParticipantTracks, bool) {
fake.handleUpdateSubscriptionsMutex.RLock()
defer fake.handleUpdateSubscriptionsMutex.RUnlock()
argsForCall := fake.handleUpdateSubscriptionsArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) HasConnected() bool {
fake.hasConnectedMutex.Lock()
ret, specificReturn := fake.hasConnectedReturnsOnCall[len(fake.hasConnectedArgsForCall)]
@@ -5504,6 +5918,38 @@ func (fake *FakeLocalParticipant) OnICEConfigChangedArgsForCall(i int) func(part
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnLeave(arg1 func(types.LocalParticipant, types.ParticipantCloseReason)) {
fake.onLeaveMutex.Lock()
fake.onLeaveArgsForCall = append(fake.onLeaveArgsForCall, struct {
arg1 func(types.LocalParticipant, types.ParticipantCloseReason)
}{arg1})
stub := fake.OnLeaveStub
fake.recordInvocation("OnLeave", []interface{}{arg1})
fake.onLeaveMutex.Unlock()
if stub != nil {
fake.OnLeaveStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnLeaveCallCount() int {
fake.onLeaveMutex.RLock()
defer fake.onLeaveMutex.RUnlock()
return len(fake.onLeaveArgsForCall)
}
func (fake *FakeLocalParticipant) OnLeaveCalls(stub func(func(types.LocalParticipant, types.ParticipantCloseReason))) {
fake.onLeaveMutex.Lock()
defer fake.onLeaveMutex.Unlock()
fake.OnLeaveStub = stub
}
func (fake *FakeLocalParticipant) OnLeaveArgsForCall(i int) func(types.LocalParticipant, types.ParticipantCloseReason) {
fake.onLeaveMutex.RLock()
defer fake.onLeaveMutex.RUnlock()
argsForCall := fake.onLeaveArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnMetrics(arg1 func(types.Participant, *livekit.DataPacket)) {
fake.onMetricsMutex.Lock()
fake.onMetricsArgsForCall = append(fake.onMetricsArgsForCall, struct {
@@ -5600,6 +6046,38 @@ func (fake *FakeLocalParticipant) OnParticipantUpdateArgsForCall(i int) func(typ
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnSimulateScenario(arg1 func(types.LocalParticipant, *livekit.SimulateScenario) error) {
fake.onSimulateScenarioMutex.Lock()
fake.onSimulateScenarioArgsForCall = append(fake.onSimulateScenarioArgsForCall, struct {
arg1 func(types.LocalParticipant, *livekit.SimulateScenario) error
}{arg1})
stub := fake.OnSimulateScenarioStub
fake.recordInvocation("OnSimulateScenario", []interface{}{arg1})
fake.onSimulateScenarioMutex.Unlock()
if stub != nil {
fake.OnSimulateScenarioStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnSimulateScenarioCallCount() int {
fake.onSimulateScenarioMutex.RLock()
defer fake.onSimulateScenarioMutex.RUnlock()
return len(fake.onSimulateScenarioArgsForCall)
}
func (fake *FakeLocalParticipant) OnSimulateScenarioCalls(stub func(func(types.LocalParticipant, *livekit.SimulateScenario) error)) {
fake.onSimulateScenarioMutex.Lock()
defer fake.onSimulateScenarioMutex.Unlock()
fake.OnSimulateScenarioStub = stub
}
func (fake *FakeLocalParticipant) OnSimulateScenarioArgsForCall(i int) func(types.LocalParticipant, *livekit.SimulateScenario) error {
fake.onSimulateScenarioMutex.RLock()
defer fake.onSimulateScenarioMutex.RUnlock()
argsForCall := fake.onSimulateScenarioArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnStateChange(arg1 func(p types.LocalParticipant)) {
fake.onStateChangeMutex.Lock()
fake.onStateChangeArgsForCall = append(fake.onStateChangeArgsForCall, struct {
@@ -5696,6 +6174,38 @@ func (fake *FakeLocalParticipant) OnSubscriberReadyArgsForCall(i int) func(types
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnSyncState(arg1 func(types.LocalParticipant, *livekit.SyncState) error) {
fake.onSyncStateMutex.Lock()
fake.onSyncStateArgsForCall = append(fake.onSyncStateArgsForCall, struct {
arg1 func(types.LocalParticipant, *livekit.SyncState) error
}{arg1})
stub := fake.OnSyncStateStub
fake.recordInvocation("OnSyncState", []interface{}{arg1})
fake.onSyncStateMutex.Unlock()
if stub != nil {
fake.OnSyncStateStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnSyncStateCallCount() int {
fake.onSyncStateMutex.RLock()
defer fake.onSyncStateMutex.RUnlock()
return len(fake.onSyncStateArgsForCall)
}
func (fake *FakeLocalParticipant) OnSyncStateCalls(stub func(func(types.LocalParticipant, *livekit.SyncState) error)) {
fake.onSyncStateMutex.Lock()
defer fake.onSyncStateMutex.Unlock()
fake.OnSyncStateStub = stub
}
func (fake *FakeLocalParticipant) OnSyncStateArgsForCall(i int) func(types.LocalParticipant, *livekit.SyncState) error {
fake.onSyncStateMutex.RLock()
defer fake.onSyncStateMutex.RUnlock()
argsForCall := fake.onSyncStateArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnTrackPublished(arg1 func(types.LocalParticipant, types.MediaTrack)) {
fake.onTrackPublishedMutex.Lock()
fake.onTrackPublishedArgsForCall = append(fake.onTrackPublishedArgsForCall, struct {
@@ -5792,6 +6302,70 @@ func (fake *FakeLocalParticipant) OnTrackUpdatedArgsForCall(i int) func(types.Lo
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermission(arg1 func(types.LocalParticipant, *livekit.SubscriptionPermission) error) {
fake.onUpdateSubscriptionPermissionMutex.Lock()
fake.onUpdateSubscriptionPermissionArgsForCall = append(fake.onUpdateSubscriptionPermissionArgsForCall, struct {
arg1 func(types.LocalParticipant, *livekit.SubscriptionPermission) error
}{arg1})
stub := fake.OnUpdateSubscriptionPermissionStub
fake.recordInvocation("OnUpdateSubscriptionPermission", []interface{}{arg1})
fake.onUpdateSubscriptionPermissionMutex.Unlock()
if stub != nil {
fake.OnUpdateSubscriptionPermissionStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermissionCallCount() int {
fake.onUpdateSubscriptionPermissionMutex.RLock()
defer fake.onUpdateSubscriptionPermissionMutex.RUnlock()
return len(fake.onUpdateSubscriptionPermissionArgsForCall)
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermissionCalls(stub func(func(types.LocalParticipant, *livekit.SubscriptionPermission) error)) {
fake.onUpdateSubscriptionPermissionMutex.Lock()
defer fake.onUpdateSubscriptionPermissionMutex.Unlock()
fake.OnUpdateSubscriptionPermissionStub = stub
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptionPermissionArgsForCall(i int) func(types.LocalParticipant, *livekit.SubscriptionPermission) error {
fake.onUpdateSubscriptionPermissionMutex.RLock()
defer fake.onUpdateSubscriptionPermissionMutex.RUnlock()
argsForCall := fake.onUpdateSubscriptionPermissionArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptions(arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)) {
fake.onUpdateSubscriptionsMutex.Lock()
fake.onUpdateSubscriptionsArgsForCall = append(fake.onUpdateSubscriptionsArgsForCall, struct {
arg1 func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)
}{arg1})
stub := fake.OnUpdateSubscriptionsStub
fake.recordInvocation("OnUpdateSubscriptions", []interface{}{arg1})
fake.onUpdateSubscriptionsMutex.Unlock()
if stub != nil {
fake.OnUpdateSubscriptionsStub(arg1)
}
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptionsCallCount() int {
fake.onUpdateSubscriptionsMutex.RLock()
defer fake.onUpdateSubscriptionsMutex.RUnlock()
return len(fake.onUpdateSubscriptionsArgsForCall)
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptionsCalls(stub func(func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool))) {
fake.onUpdateSubscriptionsMutex.Lock()
defer fake.onUpdateSubscriptionsMutex.Unlock()
fake.OnUpdateSubscriptionsStub = stub
}
func (fake *FakeLocalParticipant) OnUpdateSubscriptionsArgsForCall(i int) func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool) {
fake.onUpdateSubscriptionsMutex.RLock()
defer fake.onUpdateSubscriptionsMutex.RUnlock()
argsForCall := fake.onUpdateSubscriptionsArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) ProtocolVersion() types.ProtocolVersion {
fake.protocolVersionMutex.Lock()
ret, specificReturn := fake.protocolVersionReturnsOnCall[len(fake.protocolVersionArgsForCall)]
@@ -5939,6 +6513,67 @@ func (fake *FakeLocalParticipant) RemoveTrackLocalReturnsOnCall(i int, result1 e
}{result1}
}
func (fake *FakeLocalParticipant) SendConnectResponse(arg1 *livekit.ConnectResponse) error {
fake.sendConnectResponseMutex.Lock()
ret, specificReturn := fake.sendConnectResponseReturnsOnCall[len(fake.sendConnectResponseArgsForCall)]
fake.sendConnectResponseArgsForCall = append(fake.sendConnectResponseArgsForCall, struct {
arg1 *livekit.ConnectResponse
}{arg1})
stub := fake.SendConnectResponseStub
fakeReturns := fake.sendConnectResponseReturns
fake.recordInvocation("SendConnectResponse", []interface{}{arg1})
fake.sendConnectResponseMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) SendConnectResponseCallCount() int {
fake.sendConnectResponseMutex.RLock()
defer fake.sendConnectResponseMutex.RUnlock()
return len(fake.sendConnectResponseArgsForCall)
}
func (fake *FakeLocalParticipant) SendConnectResponseCalls(stub func(*livekit.ConnectResponse) error) {
fake.sendConnectResponseMutex.Lock()
defer fake.sendConnectResponseMutex.Unlock()
fake.SendConnectResponseStub = stub
}
func (fake *FakeLocalParticipant) SendConnectResponseArgsForCall(i int) *livekit.ConnectResponse {
fake.sendConnectResponseMutex.RLock()
defer fake.sendConnectResponseMutex.RUnlock()
argsForCall := fake.sendConnectResponseArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) SendConnectResponseReturns(result1 error) {
fake.sendConnectResponseMutex.Lock()
defer fake.sendConnectResponseMutex.Unlock()
fake.SendConnectResponseStub = nil
fake.sendConnectResponseReturns = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) SendConnectResponseReturnsOnCall(i int, result1 error) {
fake.sendConnectResponseMutex.Lock()
defer fake.sendConnectResponseMutex.Unlock()
fake.SendConnectResponseStub = nil
if fake.sendConnectResponseReturnsOnCall == nil {
fake.sendConnectResponseReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.sendConnectResponseReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeLocalParticipant) SendConnectionQualityUpdate(arg1 *livekit.ConnectionQualityUpdate) error {
fake.sendConnectionQualityUpdateMutex.Lock()
ret, specificReturn := fake.sendConnectionQualityUpdateReturnsOnCall[len(fake.sendConnectionQualityUpdateArgsForCall)]
@@ -7101,6 +7736,59 @@ func (fake *FakeLocalParticipant) SetTrackMutedReturnsOnCall(i int, result1 *liv
}{result1}
}
func (fake *FakeLocalParticipant) SignalPendingMessages() proto.Message {
fake.signalPendingMessagesMutex.Lock()
ret, specificReturn := fake.signalPendingMessagesReturnsOnCall[len(fake.signalPendingMessagesArgsForCall)]
fake.signalPendingMessagesArgsForCall = append(fake.signalPendingMessagesArgsForCall, struct {
}{})
stub := fake.SignalPendingMessagesStub
fakeReturns := fake.signalPendingMessagesReturns
fake.recordInvocation("SignalPendingMessages", []interface{}{})
fake.signalPendingMessagesMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) SignalPendingMessagesCallCount() int {
fake.signalPendingMessagesMutex.RLock()
defer fake.signalPendingMessagesMutex.RUnlock()
return len(fake.signalPendingMessagesArgsForCall)
}
func (fake *FakeLocalParticipant) SignalPendingMessagesCalls(stub func() proto.Message) {
fake.signalPendingMessagesMutex.Lock()
defer fake.signalPendingMessagesMutex.Unlock()
fake.SignalPendingMessagesStub = stub
}
func (fake *FakeLocalParticipant) SignalPendingMessagesReturns(result1 proto.Message) {
fake.signalPendingMessagesMutex.Lock()
defer fake.signalPendingMessagesMutex.Unlock()
fake.SignalPendingMessagesStub = nil
fake.signalPendingMessagesReturns = struct {
result1 proto.Message
}{result1}
}
func (fake *FakeLocalParticipant) SignalPendingMessagesReturnsOnCall(i int, result1 proto.Message) {
fake.signalPendingMessagesMutex.Lock()
defer fake.signalPendingMessagesMutex.Unlock()
fake.SignalPendingMessagesStub = nil
if fake.signalPendingMessagesReturnsOnCall == nil {
fake.signalPendingMessagesReturnsOnCall = make(map[int]struct {
result1 proto.Message
})
}
fake.signalPendingMessagesReturnsOnCall[i] = struct {
result1 proto.Message
}{result1}
}
func (fake *FakeLocalParticipant) State() livekit.ParticipantInfo_State {
fake.stateMutex.Lock()
ret, specificReturn := fake.stateReturnsOnCall[len(fake.stateArgsForCall)]
@@ -8524,6 +9212,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getReporterMutex.RUnlock()
fake.getReporterResolverMutex.RLock()
defer fake.getReporterResolverMutex.RUnlock()
fake.getResponseSinkMutex.RLock()
defer fake.getResponseSinkMutex.RUnlock()
fake.getSubscribedParticipantsMutex.RLock()
defer fake.getSubscribedParticipantsMutex.RUnlock()
fake.getSubscribedTracksMutex.RLock()
@@ -8536,6 +9226,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.handleICERestartSDPFragmentMutex.RUnlock()
fake.handleICETrickleSDPFragmentMutex.RLock()
defer fake.handleICETrickleSDPFragmentMutex.RUnlock()
fake.handleLeaveRequestMutex.RLock()
defer fake.handleLeaveRequestMutex.RUnlock()
fake.handleMetricsMutex.RLock()
defer fake.handleMetricsMutex.RUnlock()
fake.handleOfferMutex.RLock()
@@ -8546,6 +9238,14 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.handleReconnectAndSendResponseMutex.RUnlock()
fake.handleSignalSourceCloseMutex.RLock()
defer fake.handleSignalSourceCloseMutex.RUnlock()
fake.handleSimulateScenarioMutex.RLock()
defer fake.handleSimulateScenarioMutex.RUnlock()
fake.handleSyncStateMutex.RLock()
defer fake.handleSyncStateMutex.RUnlock()
fake.handleUpdateSubscriptionPermissionMutex.RLock()
defer fake.handleUpdateSubscriptionPermissionMutex.RUnlock()
fake.handleUpdateSubscriptionsMutex.RLock()
defer fake.handleUpdateSubscriptionsMutex.RUnlock()
fake.hasConnectedMutex.RLock()
defer fake.hasConnectedMutex.RUnlock()
fake.hasPermissionMutex.RLock()
@@ -8604,30 +9304,42 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.onDataPacketMutex.RUnlock()
fake.onICEConfigChangedMutex.RLock()
defer fake.onICEConfigChangedMutex.RUnlock()
fake.onLeaveMutex.RLock()
defer fake.onLeaveMutex.RUnlock()
fake.onMetricsMutex.RLock()
defer fake.onMetricsMutex.RUnlock()
fake.onMigrateStateChangeMutex.RLock()
defer fake.onMigrateStateChangeMutex.RUnlock()
fake.onParticipantUpdateMutex.RLock()
defer fake.onParticipantUpdateMutex.RUnlock()
fake.onSimulateScenarioMutex.RLock()
defer fake.onSimulateScenarioMutex.RUnlock()
fake.onStateChangeMutex.RLock()
defer fake.onStateChangeMutex.RUnlock()
fake.onSubscribeStatusChangedMutex.RLock()
defer fake.onSubscribeStatusChangedMutex.RUnlock()
fake.onSubscriberReadyMutex.RLock()
defer fake.onSubscriberReadyMutex.RUnlock()
fake.onSyncStateMutex.RLock()
defer fake.onSyncStateMutex.RUnlock()
fake.onTrackPublishedMutex.RLock()
defer fake.onTrackPublishedMutex.RUnlock()
fake.onTrackUnpublishedMutex.RLock()
defer fake.onTrackUnpublishedMutex.RUnlock()
fake.onTrackUpdatedMutex.RLock()
defer fake.onTrackUpdatedMutex.RUnlock()
fake.onUpdateSubscriptionPermissionMutex.RLock()
defer fake.onUpdateSubscriptionPermissionMutex.RUnlock()
fake.onUpdateSubscriptionsMutex.RLock()
defer fake.onUpdateSubscriptionsMutex.RUnlock()
fake.protocolVersionMutex.RLock()
defer fake.protocolVersionMutex.RUnlock()
fake.removePublishedTrackMutex.RLock()
defer fake.removePublishedTrackMutex.RUnlock()
fake.removeTrackLocalMutex.RLock()
defer fake.removeTrackLocalMutex.RUnlock()
fake.sendConnectResponseMutex.RLock()
defer fake.sendConnectResponseMutex.RUnlock()
fake.sendConnectionQualityUpdateMutex.RLock()
defer fake.sendConnectionQualityUpdateMutex.RUnlock()
fake.sendDataMessageMutex.RLock()
@@ -8674,6 +9386,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.setSubscriberChannelCapacityMutex.RUnlock()
fake.setTrackMutedMutex.RLock()
defer fake.setTrackMutedMutex.RUnlock()
fake.signalPendingMessagesMutex.RLock()
defer fake.signalPendingMessagesMutex.RUnlock()
fake.stateMutex.RLock()
defer fake.stateMutex.RUnlock()
fake.stopAndGetSubscribedTracksForwarderStateMutex.RLock()
-228
View File
@@ -69,42 +69,6 @@ type FakeRoom struct {
resolveMediaTrackForSubscriberReturnsOnCall map[int]struct {
result1 types.MediaResolverResult
}
SimulateScenarioStub func(types.LocalParticipant, *livekit.SimulateScenario) error
simulateScenarioMutex sync.RWMutex
simulateScenarioArgsForCall []struct {
arg1 types.LocalParticipant
arg2 *livekit.SimulateScenario
}
simulateScenarioReturns struct {
result1 error
}
simulateScenarioReturnsOnCall map[int]struct {
result1 error
}
SyncStateStub func(types.LocalParticipant, *livekit.SyncState) error
syncStateMutex sync.RWMutex
syncStateArgsForCall []struct {
arg1 types.LocalParticipant
arg2 *livekit.SyncState
}
syncStateReturns struct {
result1 error
}
syncStateReturnsOnCall map[int]struct {
result1 error
}
UpdateSubscriptionPermissionStub func(types.LocalParticipant, *livekit.SubscriptionPermission) error
updateSubscriptionPermissionMutex sync.RWMutex
updateSubscriptionPermissionArgsForCall []struct {
arg1 types.LocalParticipant
arg2 *livekit.SubscriptionPermission
}
updateSubscriptionPermissionReturns struct {
result1 error
}
updateSubscriptionPermissionReturnsOnCall map[int]struct {
result1 error
}
UpdateSubscriptionsStub func(types.LocalParticipant, []livekit.TrackID, []*livekit.ParticipantTracks, bool)
updateSubscriptionsMutex sync.RWMutex
updateSubscriptionsArgsForCall []struct {
@@ -433,192 +397,6 @@ func (fake *FakeRoom) ResolveMediaTrackForSubscriberReturnsOnCall(i int, result1
}{result1}
}
func (fake *FakeRoom) SimulateScenario(arg1 types.LocalParticipant, arg2 *livekit.SimulateScenario) error {
fake.simulateScenarioMutex.Lock()
ret, specificReturn := fake.simulateScenarioReturnsOnCall[len(fake.simulateScenarioArgsForCall)]
fake.simulateScenarioArgsForCall = append(fake.simulateScenarioArgsForCall, struct {
arg1 types.LocalParticipant
arg2 *livekit.SimulateScenario
}{arg1, arg2})
stub := fake.SimulateScenarioStub
fakeReturns := fake.simulateScenarioReturns
fake.recordInvocation("SimulateScenario", []interface{}{arg1, arg2})
fake.simulateScenarioMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoom) SimulateScenarioCallCount() int {
fake.simulateScenarioMutex.RLock()
defer fake.simulateScenarioMutex.RUnlock()
return len(fake.simulateScenarioArgsForCall)
}
func (fake *FakeRoom) SimulateScenarioCalls(stub func(types.LocalParticipant, *livekit.SimulateScenario) error) {
fake.simulateScenarioMutex.Lock()
defer fake.simulateScenarioMutex.Unlock()
fake.SimulateScenarioStub = stub
}
func (fake *FakeRoom) SimulateScenarioArgsForCall(i int) (types.LocalParticipant, *livekit.SimulateScenario) {
fake.simulateScenarioMutex.RLock()
defer fake.simulateScenarioMutex.RUnlock()
argsForCall := fake.simulateScenarioArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoom) SimulateScenarioReturns(result1 error) {
fake.simulateScenarioMutex.Lock()
defer fake.simulateScenarioMutex.Unlock()
fake.SimulateScenarioStub = nil
fake.simulateScenarioReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) SimulateScenarioReturnsOnCall(i int, result1 error) {
fake.simulateScenarioMutex.Lock()
defer fake.simulateScenarioMutex.Unlock()
fake.SimulateScenarioStub = nil
if fake.simulateScenarioReturnsOnCall == nil {
fake.simulateScenarioReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.simulateScenarioReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) SyncState(arg1 types.LocalParticipant, arg2 *livekit.SyncState) error {
fake.syncStateMutex.Lock()
ret, specificReturn := fake.syncStateReturnsOnCall[len(fake.syncStateArgsForCall)]
fake.syncStateArgsForCall = append(fake.syncStateArgsForCall, struct {
arg1 types.LocalParticipant
arg2 *livekit.SyncState
}{arg1, arg2})
stub := fake.SyncStateStub
fakeReturns := fake.syncStateReturns
fake.recordInvocation("SyncState", []interface{}{arg1, arg2})
fake.syncStateMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoom) SyncStateCallCount() int {
fake.syncStateMutex.RLock()
defer fake.syncStateMutex.RUnlock()
return len(fake.syncStateArgsForCall)
}
func (fake *FakeRoom) SyncStateCalls(stub func(types.LocalParticipant, *livekit.SyncState) error) {
fake.syncStateMutex.Lock()
defer fake.syncStateMutex.Unlock()
fake.SyncStateStub = stub
}
func (fake *FakeRoom) SyncStateArgsForCall(i int) (types.LocalParticipant, *livekit.SyncState) {
fake.syncStateMutex.RLock()
defer fake.syncStateMutex.RUnlock()
argsForCall := fake.syncStateArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoom) SyncStateReturns(result1 error) {
fake.syncStateMutex.Lock()
defer fake.syncStateMutex.Unlock()
fake.SyncStateStub = nil
fake.syncStateReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) SyncStateReturnsOnCall(i int, result1 error) {
fake.syncStateMutex.Lock()
defer fake.syncStateMutex.Unlock()
fake.SyncStateStub = nil
if fake.syncStateReturnsOnCall == nil {
fake.syncStateReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.syncStateReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) UpdateSubscriptionPermission(arg1 types.LocalParticipant, arg2 *livekit.SubscriptionPermission) error {
fake.updateSubscriptionPermissionMutex.Lock()
ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)]
fake.updateSubscriptionPermissionArgsForCall = append(fake.updateSubscriptionPermissionArgsForCall, struct {
arg1 types.LocalParticipant
arg2 *livekit.SubscriptionPermission
}{arg1, arg2})
stub := fake.UpdateSubscriptionPermissionStub
fakeReturns := fake.updateSubscriptionPermissionReturns
fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2})
fake.updateSubscriptionPermissionMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRoom) UpdateSubscriptionPermissionCallCount() int {
fake.updateSubscriptionPermissionMutex.RLock()
defer fake.updateSubscriptionPermissionMutex.RUnlock()
return len(fake.updateSubscriptionPermissionArgsForCall)
}
func (fake *FakeRoom) UpdateSubscriptionPermissionCalls(stub func(types.LocalParticipant, *livekit.SubscriptionPermission) error) {
fake.updateSubscriptionPermissionMutex.Lock()
defer fake.updateSubscriptionPermissionMutex.Unlock()
fake.UpdateSubscriptionPermissionStub = stub
}
func (fake *FakeRoom) UpdateSubscriptionPermissionArgsForCall(i int) (types.LocalParticipant, *livekit.SubscriptionPermission) {
fake.updateSubscriptionPermissionMutex.RLock()
defer fake.updateSubscriptionPermissionMutex.RUnlock()
argsForCall := fake.updateSubscriptionPermissionArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoom) UpdateSubscriptionPermissionReturns(result1 error) {
fake.updateSubscriptionPermissionMutex.Lock()
defer fake.updateSubscriptionPermissionMutex.Unlock()
fake.UpdateSubscriptionPermissionStub = nil
fake.updateSubscriptionPermissionReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) UpdateSubscriptionPermissionReturnsOnCall(i int, result1 error) {
fake.updateSubscriptionPermissionMutex.Lock()
defer fake.updateSubscriptionPermissionMutex.Unlock()
fake.UpdateSubscriptionPermissionStub = nil
if fake.updateSubscriptionPermissionReturnsOnCall == nil {
fake.updateSubscriptionPermissionReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.updateSubscriptionPermissionReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRoom) UpdateSubscriptions(arg1 types.LocalParticipant, arg2 []livekit.TrackID, arg3 []*livekit.ParticipantTracks, arg4 bool) {
var arg2Copy []livekit.TrackID
if arg2 != nil {
@@ -679,12 +457,6 @@ func (fake *FakeRoom) Invocations() map[string][][]interface{} {
defer fake.removeParticipantMutex.RUnlock()
fake.resolveMediaTrackForSubscriberMutex.RLock()
defer fake.resolveMediaTrackForSubscriberMutex.RUnlock()
fake.simulateScenarioMutex.RLock()
defer fake.simulateScenarioMutex.RUnlock()
fake.syncStateMutex.RLock()
defer fake.syncStateMutex.RUnlock()
fake.updateSubscriptionPermissionMutex.RLock()
defer fake.updateSubscriptionPermissionMutex.RUnlock()
fake.updateSubscriptionsMutex.RLock()
defer fake.updateSubscriptionsMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
+4 -7
View File
@@ -863,12 +863,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, createRoom *livekit.C
// manages an RTC session for a participant, runs on the RTC node
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalParticipant, requestSource routing.MessageSource) {
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()),
participant.Identity(),
participant.ID(),
false,
)
pLogger := participant.GetLogger()
defer func() {
pLogger.Debugw("RTC session finishing", "connID", requestSource.ConnectionID())
requestSource.Close()
@@ -888,11 +883,13 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
select {
case <-participant.Disconnected():
return
case <-tokenTicker.C:
// refresh token with the first API Key/secret pair
if err := r.refreshToken(participant); err != nil {
pLogger.Errorw("could not refresh token", err, "connID", requestSource.ConnectionID())
}
case obj := <-requestSource.ReadChan():
if obj == nil {
if room.GetParticipantRequestSource(participant.Identity()) == requestSource {
@@ -902,7 +899,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
}
req := obj.(*livekit.SignalRequest)
if err := rtc.HandleParticipantSignal(room, participant, req, pLogger); err != nil {
if err := rtc.HandleParticipantSignal(participant, req); err != nil {
// more specific errors are already logged
// treat errors returned as fatal
return
+7
View File
@@ -371,12 +371,15 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch m := res.Message.(type) {
case *livekit.SignalResponse_Offer:
pLogger.Debugw("sending offer", "offer", m)
case *livekit.SignalResponse_Answer:
pLogger.Debugw("sending answer", "answer", m)
case *livekit.SignalResponse_Join:
pLogger.Debugw("sending join", "join", m)
signalStats.ResolveRoom(m.Join.GetRoom())
signalStats.ResolveParticipant(m.Join.GetParticipant())
case *livekit.SignalResponse_RoomUpdate:
updateRoomID := livekit.RoomID(m.RoomUpdate.GetRoom().GetSid())
if updateRoomID != "" {
@@ -385,11 +388,14 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
pLogger.Debugw("sending room update", "roomUpdate", m)
signalStats.ResolveRoom(m.RoomUpdate.GetRoom())
case *livekit.SignalResponse_Update:
pLogger.Debugw("sending participant update", "participantUpdate", m)
case *livekit.SignalResponse_RoomMoved:
resetLogger()
signalStats.Reset()
roomName = livekit.RoomName(m.RoomMoved.GetRoom().GetName())
moveRoomID := livekit.RoomID(m.RoomMoved.GetRoom().GetSid())
if moveRoomID != "" {
@@ -398,6 +404,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
participantIdentity = livekit.ParticipantIdentity(m.RoomMoved.GetParticipant().GetIdentity())
pID = livekit.ParticipantID(m.RoomMoved.GetParticipant().GetSid())
resolveLogger(false)
signalStats.ResolveRoom(m.RoomMoved.GetRoom())
signalStats.ResolveParticipant(m.RoomMoved.GetParticipant())
pLogger.Debugw("sending room moved", "roomMoved", m)