// Copyright 2023 LiveKit, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rtc import ( "time" "github.com/pion/webrtc/v4" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" protosignalling "github.com/livekit/protocol/signalling" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" ) func (p *ParticipantImpl) SwapResponseSink(sink routing.MessageSink, reason types.SignallingCloseReason) { p.signaller.SwapResponseSink(sink, reason) } func (p *ParticipantImpl) GetResponseSink() routing.MessageSink { return p.signaller.GetResponseSink() } func (p *ParticipantImpl) CloseSignalConnection(reason types.SignallingCloseReason) { p.signaller.CloseSignalConnection(reason) } func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error { // keep track of participant updates and versions p.updateLock.Lock() for _, op := range joinResponse.OtherParticipants { p.updateCache.Add(livekit.ParticipantID(op.Sid), participantUpdateInfo{ identity: livekit.ParticipantIdentity(op.Identity), version: op.Version, state: op.State, updatedAt: time.Now(), }) } p.updateLock.Unlock() // send Join response err := p.signaller.WriteMessage(p.signalling.SignalJoinResponse(joinResponse)) if err != nil { return err } // update state after sending message, so that no participant updates could slip through before JoinResponse is sent p.updateLock.Lock() if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) } queuedUpdates := p.queuedUpdates p.queuedUpdates = nil p.updateLock.Unlock() if p.params.RequireMediaSectionWithJoinResponse && p.params.UseSinglePeerConnection { p.sendMediaSectionsRequirement(audioSectionsCountWithJoinResponse, videoSectionsCountWithJoinResponse) } if len(queuedUpdates) > 0 { return p.SendParticipantUpdate(queuedUpdates) } return nil } func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error { p.updateLock.Lock() if p.IsDisconnected() { p.updateLock.Unlock() return nil } if !p.IsReady() { // queue up updates p.queuedUpdates = append(p.queuedUpdates, participantsToUpdate...) p.updateLock.Unlock() return nil } validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate)) for _, pi := range participantsToUpdate { isValid := true pID := livekit.ParticipantID(pi.Sid) if lastVersion, ok := p.updateCache.Get(pID); ok { // this is a message delivered out of order, a more recent version of the message had already been // sent. if pi.Version < lastVersion.version { p.params.Logger.Debugw( "skipping outdated participant update", "otherParticipant", pi.Identity, "otherPID", pi.Sid, "version", pi.Version, "lastVersion", lastVersion, ) isValid = false } } if pi.Permission != nil && pi.Permission.Hidden && pi.Sid != string(p.ID()) { p.params.Logger.Debugw("skipping hidden participant update", "otherParticipant", pi.Identity) isValid = false } if isValid { p.updateCache.Add(pID, participantUpdateInfo{ identity: livekit.ParticipantIdentity(pi.Identity), version: pi.Version, state: pi.State, updatedAt: time.Now(), }) validUpdates = append(validUpdates, pi) } } p.updateLock.Unlock() return p.signaller.WriteMessage(p.signalling.SignalParticipantUpdate(validUpdates)) } // SendSpeakerUpdate notifies participant changes to speakers. only send members that have changed since last update func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, force bool) error { if !p.IsReady() { return nil } var scopedSpeakers []*livekit.SpeakerInfo if force { scopedSpeakers = speakers } else { for _, s := range speakers { participantID := livekit.ParticipantID(s.Sid) if p.IsSubscribedTo(participantID) || participantID == p.ID() { scopedSpeakers = append(scopedSpeakers, s) } } } return p.signaller.WriteMessage(p.signalling.SignalSpeakerUpdate(scopedSpeakers)) } func (p *ParticipantImpl) SendRoomUpdate(room *livekit.Room) error { return p.signaller.WriteMessage(p.signalling.SignalRoomUpdate(room)) } func (p *ParticipantImpl) SendConnectionQualityUpdate(update *livekit.ConnectionQualityUpdate) error { return p.signaller.WriteMessage(p.signalling.SignalConnectionQualityUpdate(update)) } func (p *ParticipantImpl) SendRefreshToken(token string) error { return p.signaller.WriteMessage(p.signalling.SignalRefreshToken(token)) } func (p *ParticipantImpl) sendRequestResponse(requestResponse *livekit.RequestResponse) error { if !p.params.ClientInfo.SupportsRequestResponse() { return nil } if requestResponse.Reason == livekit.RequestResponse_OK && !p.ProtocolVersion().SupportsNonErrorSignalResponse() { return nil } return p.signaller.WriteMessage(p.signalling.SignalRequestResponse(requestResponse)) } func (p *ParticipantImpl) SendRoomMovedResponse(roomMovedResponse *livekit.RoomMovedResponse) error { return p.signaller.WriteMessage(p.signalling.SignalRoomMovedResponse(roomMovedResponse)) } func (p *ParticipantImpl) HandleReconnectAndSendResponse(reconnectReason livekit.ReconnectReason, reconnectResponse *livekit.ReconnectResponse) error { p.TransportManager.HandleClientReconnect(reconnectReason) if !p.params.ClientInfo.CanHandleReconnectResponse() { return nil } if err := p.signaller.WriteMessage(p.signalling.SignalReconnectResponse(reconnectResponse)); err != nil { return err } if p.params.ProtocolVersion.SupportsDisconnectedUpdate() { return p.sendDisconnectUpdatesForReconnect() } return nil } func (p *ParticipantImpl) sendDisconnectUpdatesForReconnect() error { lastSignalAt := p.TransportManager.LastSeenSignalAt() var disconnectedParticipants []*livekit.ParticipantInfo p.updateLock.Lock() keys := p.updateCache.Keys() for i := len(keys) - 1; i >= 0; i-- { if info, ok := p.updateCache.Get(keys[i]); ok { if info.updatedAt.Before(lastSignalAt) { break } else if info.state == livekit.ParticipantInfo_DISCONNECTED { disconnectedParticipants = append(disconnectedParticipants, &livekit.ParticipantInfo{ Sid: string(keys[i]), Identity: string(info.identity), Version: info.version, State: livekit.ParticipantInfo_DISCONNECTED, }) } } } p.updateLock.Unlock() return p.signaller.WriteMessage(p.signalling.SignalParticipantUpdate(disconnectedParticipants)) } func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error { prevIC := p.icQueue[target].Swap(ic) if prevIC == nil { return nil } trickle := protosignalling.ToProtoTrickle(prevIC.ToJSON(), target, ic == nil) p.params.Logger.Debugw("sending ICE candidate", "transport", target, "trickle", logger.Proto(trickle)) return p.signaller.WriteMessage(p.signalling.SignalICECandidate(trickle)) } func (p *ParticipantImpl) sendTrackMuted(trackID livekit.TrackID, muted bool) { _ = p.signaller.WriteMessage(p.signalling.SignalTrackMuted(&livekit.MuteTrackRequest{ Sid: string(trackID), Muted: muted, })) } func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) error { p.pubLogger.Debugw("sending track published", "cid", cid, "trackInfo", logger.Proto(ti)) return p.signaller.WriteMessage(p.signalling.SignalTrackPublished(&livekit.TrackPublishedResponse{ Cid: cid, Track: ti, })) } func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) { _ = p.signaller.WriteMessage(p.signalling.SignalTrackUnpublished(&livekit.TrackUnpublishedResponse{ TrackSid: string(trackID), })) } func (p *ParticipantImpl) sendTrackHasBeenSubscribed(trackID livekit.TrackID) { if !p.params.ClientInfo.SupportsTrackSubscribedEvent() { return } _ = p.signaller.WriteMessage(p.signalling.SignalTrackSubscribed(&livekit.TrackSubscribed{ TrackSid: string(trackID), })) p.params.Logger.Debugw("track has been subscribed", "trackID", trackID) } func (p *ParticipantImpl) sendLeaveRequest( reason types.ParticipantCloseReason, isExpectedToResume bool, isExpectedToReconnect bool, sendOnlyIfSupportingLeaveRequestWithAction bool, ) error { var leave *livekit.LeaveRequest if p.ProtocolVersion().SupportsRegionsInLeaveRequest() { leave = &livekit.LeaveRequest{ Reason: reason.ToDisconnectReason(), } switch { case isExpectedToResume: leave.Action = livekit.LeaveRequest_RESUME case isExpectedToReconnect: leave.Action = livekit.LeaveRequest_RECONNECT default: leave.Action = livekit.LeaveRequest_DISCONNECT } if leave.Action != livekit.LeaveRequest_DISCONNECT { // sending region settings even for RESUME just in case client wants to a full reconnect despite server saying RESUME leave.Regions = p.helper().GetRegionSettings(p.params.ClientInfo.Address) } } else { if !sendOnlyIfSupportingLeaveRequestWithAction { leave = &livekit.LeaveRequest{ CanReconnect: isExpectedToReconnect, Reason: reason.ToDisconnectReason(), } } } if leave != nil { return p.signaller.WriteMessage(p.signalling.SignalLeaveRequest(leave)) } return nil } func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32, midToTrackID map[string]string) error { return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(protosignalling.ToProtoSessionDescription(answer, answerId, midToTrackID))) } func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32, midToTrackID map[string]string) error { return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(offer, offerId, midToTrackID))) } func (p *ParticipantImpl) sendStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) error { return p.signaller.WriteMessage(p.signalling.SignalStreamStateUpdate(streamStateUpdate)) } func (p *ParticipantImpl) sendSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) error { return p.signaller.WriteMessage(p.signalling.SignalSubscribedQualityUpdate(subscribedQualityUpdate)) } func (p *ParticipantImpl) sendSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) error { return p.signaller.WriteMessage(p.signalling.SignalSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate)) } func (p *ParticipantImpl) sendSubscriptionResponse(trackID livekit.TrackID, subErr livekit.SubscriptionError) error { return p.signaller.WriteMessage(p.signalling.SignalSubscriptionResponse(&livekit.SubscriptionResponse{ TrackSid: string(trackID), Err: subErr, })) } func (p *ParticipantImpl) SendSubscriptionPermissionUpdate(publisherID livekit.ParticipantID, trackID livekit.TrackID, allowed bool) error { p.subLogger.Debugw("sending subscription permission update", "publisherID", publisherID, "trackID", trackID, "allowed", allowed) err := p.signaller.WriteMessage(p.signalling.SignalSubscriptionPermissionUpdate(&livekit.SubscriptionPermissionUpdate{ ParticipantSid: string(publisherID), TrackSid: string(trackID), Allowed: allowed, })) if err != nil { p.subLogger.Errorw("could not send subscription permission update", err) } return err } func (p *ParticipantImpl) sendMediaSectionsRequirement(numAudios uint32, numVideos uint32) error { p.pubLogger.Debugw( "sending media sections requirement", "numAudios", numAudios, "numVideos", numVideos, ) err := p.signaller.WriteMessage(p.signalling.SignalMediaSectionsRequirement(&livekit.MediaSectionsRequirement{ NumAudios: numAudios, NumVideos: numVideos, })) if err != nil { p.subLogger.Errorw("could not send media sections requirement", err) } return err } func (p *ParticipantImpl) sendPublishDataTrackResponse(dti *livekit.DataTrackInfo) error { return p.signaller.WriteMessage(p.signalling.SignalPublishDataTrackResponse(&livekit.PublishDataTrackResponse{ Info: dti, })) } func (p *ParticipantImpl) sendUnpublishDataTrackResponse(dti *livekit.DataTrackInfo) error { return p.signaller.WriteMessage(p.signalling.SignalUnpublishDataTrackResponse(&livekit.UnpublishDataTrackResponse{ Info: dti, })) } func (p *ParticipantImpl) SendDataTrackSubscriberHandles(handles map[uint32]*livekit.DataTrackSubscriberHandles_PublishedDataTrack) error { return p.signaller.WriteMessage(p.signalling.SignalDataTrackSubscriberHandles(&livekit.DataTrackSubscriberHandles{ SubHandles: handles, })) }