mirror of
https://github.com/livekit/livekit.git
synced 2026-06-07 11:01:58 +00:00
~Send initial participant update only after a participant becomes active.~ - General clean up (#3655)
* Send initial participant update only after a participant becomes active. There are cases where apps send data to remote participant as soon as client emits `ParticipantConnected`. But, that time point would not have a fully established client (i. e. the media connection + data channel establishment is still in progress). This PR changes the initial participant update to be sent from server side only when a participant becomes `ACTIVE`, i.e fully connected (media channel established and data channels open). It is supported for clients using protocol version > 15. @cnderrauber bumping up the protocol version in this PR. Move support is also conditioned on protocol version > 15, but that PR did not ump protocol version. Please let me know if there are issues bumping protocol version. * check for joining states in broadcast * have to check on other participant * test * make helper for sending participant updates * test * make utility of pushAndDeque * test * consolidate getting other participants * remove extra cast * debug * debug * typo * stop transceiver that is not bound * logs * log * check for ever bound * clean up * clean up
This commit is contained in:
@@ -608,7 +608,7 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su
|
||||
t.lock.RUnlock()
|
||||
|
||||
if remove {
|
||||
t.params.Logger.Debugw("removing susbcriber on a not-open track", "subscriberID", subID, "isExpectedToResume", isExpectedToResume)
|
||||
t.params.Logger.Debugw("removing subscriber on a not-open track", "subscriberID", subID, "isExpectedToResume", isExpectedToResume)
|
||||
_ = t.MediaTrackSubscriptions.RemoveSubscriber(subID, isExpectedToResume)
|
||||
return nil, ErrNotOpen
|
||||
}
|
||||
|
||||
@@ -453,9 +453,16 @@ func (t *MediaTrackSubscriptions) downTrackClosed(
|
||||
// delete the subscribed track only after caching.
|
||||
if isExpectedToResume {
|
||||
dt := subTrack.DownTrack()
|
||||
tr := dt.GetTransceiver()
|
||||
tr, wasBound := dt.GetTransceiver()
|
||||
if tr != nil {
|
||||
sub.CacheDownTrack(subTrack.ID(), tr, dt.GetState())
|
||||
if wasBound {
|
||||
sub.CacheDownTrack(subTrack.ID(), tr, dt.GetState())
|
||||
} else {
|
||||
// unbound transceivers cannot be re-used as pion will not fire Bind() in
|
||||
// ReplaceTrack().
|
||||
t.params.Logger.Infow("stopping unbound transceiver")
|
||||
tr.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+25
-5
@@ -1867,7 +1867,11 @@ func (p *ParticipantImpl) onMediaTrack(rtcTrack *webrtc.TrackRemote, rtpReceiver
|
||||
_, _, err := rtcTrack.Read(bytes)
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
p.params.Logger.Warnw("could not read first packet to determine codec, track will be ignored", err, "trackID", rtcTrack.ID(), "StreamID", rtcTrack.StreamID())
|
||||
p.params.Logger.Warnw(
|
||||
"could not read first packet to determine codec, track will be ignored", err,
|
||||
"trackID", rtcTrack.ID(),
|
||||
"StreamID", rtcTrack.StreamID(),
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -1880,13 +1884,23 @@ func (p *ParticipantImpl) onMediaTrack(rtcTrack *webrtc.TrackRemote, rtpReceiver
|
||||
// track fired by sdp
|
||||
codecs := rtpReceiver.GetParameters().Codecs
|
||||
if len(codecs) == 0 {
|
||||
p.pubLogger.Errorw("no negotiated codecs for track, track will be ignored", nil, "trackID", rtcTrack.ID(), "StreamID", rtcTrack.StreamID())
|
||||
p.pubLogger.Errorw(
|
||||
"no negotiated codecs for track, track will be ignored", nil,
|
||||
"trackID", rtcTrack.ID(),
|
||||
"StreamID", rtcTrack.StreamID(),
|
||||
)
|
||||
return
|
||||
}
|
||||
codec = codecs[0]
|
||||
fromSdp = true
|
||||
}
|
||||
p.params.Logger.Debugw("onMediaTrack", "codec", codec, "payloadType", codec.PayloadType, "fromSdp", fromSdp, "parameters", rtpReceiver.GetParameters())
|
||||
p.params.Logger.Debugw(
|
||||
"onMediaTrack",
|
||||
"codec", codec,
|
||||
"payloadType", codec.PayloadType,
|
||||
"fromSdp", fromSdp,
|
||||
"parameters", rtpReceiver.GetParameters(),
|
||||
)
|
||||
|
||||
var track sfu.TrackRemote = sfu.NewTrackRemoteFromSdp(rtcTrack, codec)
|
||||
publishedTrack, isNewTrack := p.mediaTrackReceived(track, rtpReceiver)
|
||||
@@ -2526,7 +2540,10 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver
|
||||
"mid", mid,
|
||||
)
|
||||
if mid == "" {
|
||||
p.pendingRemoteTracks = append(p.pendingRemoteTracks, &pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver})
|
||||
p.pendingRemoteTracks = append(
|
||||
p.pendingRemoteTracks,
|
||||
&pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver},
|
||||
)
|
||||
p.pendingTracksLock.Unlock()
|
||||
p.pubLogger.Warnw("could not get mid for track", nil, "trackID", track.ID())
|
||||
return nil, false
|
||||
@@ -2539,7 +2556,10 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver
|
||||
if !ok {
|
||||
signalCid, ti, migrated, createdAt := p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), true)
|
||||
if ti == nil {
|
||||
p.pendingRemoteTracks = append(p.pendingRemoteTracks, &pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver})
|
||||
p.pendingRemoteTracks = append(
|
||||
p.pendingRemoteTracks,
|
||||
&pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver},
|
||||
)
|
||||
p.pendingTracksLock.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
+164
-136
@@ -79,12 +79,6 @@ type broadcastOptions struct {
|
||||
immediate bool
|
||||
}
|
||||
|
||||
type participantUpdate struct {
|
||||
pi *livekit.ParticipantInfo
|
||||
isSynthesizedDisconnect bool
|
||||
closeReason types.ParticipantCloseReason
|
||||
}
|
||||
|
||||
type disconnectSignalOnResumeNoMessages struct {
|
||||
expiry time.Time
|
||||
closedCount int
|
||||
@@ -128,7 +122,7 @@ type Room struct {
|
||||
bufferFactory *buffer.FactoryOfBufferFactory
|
||||
|
||||
// batch update participant info for non-publishers
|
||||
batchedUpdates map[livekit.ParticipantIdentity]*participantUpdate
|
||||
batchedUpdates map[livekit.ParticipantIdentity]*ParticipantUpdate
|
||||
batchedUpdatesMu sync.Mutex
|
||||
|
||||
closed chan struct{}
|
||||
@@ -267,7 +261,7 @@ func NewRoom(
|
||||
hasPublished: make(map[livekit.ParticipantIdentity]bool),
|
||||
agentParticpants: make(map[livekit.ParticipantIdentity]*agentJob),
|
||||
bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSizeVideo, config.Receiver.PacketBufferSizeAudio),
|
||||
batchedUpdates: make(map[livekit.ParticipantIdentity]*participantUpdate),
|
||||
batchedUpdates: make(map[livekit.ParticipantIdentity]*ParticipantUpdate),
|
||||
closed: make(chan struct{}),
|
||||
trailer: []byte(utils.RandomSecret()),
|
||||
disconnectSignalOnResumeParticipants: make(map[livekit.ParticipantIdentity]time.Time),
|
||||
@@ -324,6 +318,7 @@ func (r *Room) Trailer() []byte {
|
||||
func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.LocalParticipant {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
|
||||
return r.participants[identity]
|
||||
}
|
||||
|
||||
@@ -644,7 +639,7 @@ func (r *Room) ResumeParticipant(
|
||||
}
|
||||
|
||||
// include the local participant's info as well, since metadata could have been changed
|
||||
updates := r.getOtherParticipantInfo("")
|
||||
updates := GetOtherParticipantInfo(nil, false, toParticipants(r.GetParticipants()), false)
|
||||
if err := p.SendParticipantUpdate(updates); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1135,18 +1130,6 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Room) getOtherParticipantInfo(identity livekit.ParticipantIdentity) []*livekit.ParticipantInfo {
|
||||
participants := r.GetParticipants()
|
||||
pi := make([]*livekit.ParticipantInfo, 0, len(participants))
|
||||
for _, p := range participants {
|
||||
if !p.Hidden() && p.Identity() != identity {
|
||||
pi = append(pi, p.ToProto())
|
||||
}
|
||||
}
|
||||
|
||||
return pi
|
||||
}
|
||||
|
||||
// checks if participant should be autosubscribed to new tracks, assumes lock is already acquired
|
||||
func (r *Room) autoSubscribe(participant types.LocalParticipant) bool {
|
||||
opts := r.participantOpts[participant.Identity()]
|
||||
@@ -1158,21 +1141,18 @@ func (r *Room) autoSubscribe(participant types.LocalParticipant) bool {
|
||||
}
|
||||
|
||||
func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceServers []*livekit.ICEServer) *livekit.JoinResponse {
|
||||
// gather other participants and send join response
|
||||
otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants))
|
||||
for _, p := range r.participants {
|
||||
if p.ID() != participant.ID() && !p.Hidden() {
|
||||
otherParticipants = append(otherParticipants, p.ToProto())
|
||||
}
|
||||
}
|
||||
|
||||
iceConfig := participant.GetICEConfig()
|
||||
hasICEFallback := iceConfig.GetPreferencePublisher() != livekit.ICECandidateType_ICT_NONE || iceConfig.GetPreferenceSubscriber() != livekit.ICECandidateType_ICT_NONE
|
||||
return &livekit.JoinResponse{
|
||||
Room: r.ToProto(),
|
||||
Participant: participant.ToProto(),
|
||||
OtherParticipants: otherParticipants,
|
||||
IceServers: iceServers,
|
||||
Room: r.ToProto(),
|
||||
Participant: participant.ToProto(),
|
||||
OtherParticipants: GetOtherParticipantInfo(
|
||||
participant,
|
||||
false, // isMigratingIn
|
||||
toParticipants(maps.Values(r.participants)),
|
||||
false, // skipSubscriberBroadcast
|
||||
),
|
||||
IceServers: iceServers,
|
||||
// indicates both server and client support subscriber as primary
|
||||
SubscriberPrimary: participant.SubscriberAsPrimary(),
|
||||
ClientConfiguration: participant.GetClientConfiguration(),
|
||||
@@ -1358,47 +1338,17 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
|
||||
return
|
||||
}
|
||||
|
||||
updates := r.pushAndDequeueUpdates(pi, p.CloseReason(), opts.immediate)
|
||||
updates := PushAndDequeueUpdates(
|
||||
pi,
|
||||
p.CloseReason(),
|
||||
opts.immediate,
|
||||
r.GetParticipant(livekit.ParticipantIdentity(pi.Identity)),
|
||||
&r.batchedUpdatesMu,
|
||||
r.batchedUpdates,
|
||||
)
|
||||
if len(updates) != 0 {
|
||||
selfSent = true
|
||||
r.sendParticipantUpdates(updates)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Room) sendParticipantUpdates(updates []*participantUpdate) {
|
||||
if len(updates) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// For filtered updates, skip
|
||||
// 1. synthesized DISCONNECT - this happens on SID change
|
||||
// 2. close reasons of DUPLICATE_IDENTITY/STALE - A newer session for that identity exists.
|
||||
//
|
||||
// Filtered updates are used with clients that can handle identity based reconnect and hence those
|
||||
// conditions can be skipped.
|
||||
var filteredUpdates []*livekit.ParticipantInfo
|
||||
for _, update := range updates {
|
||||
if update.isSynthesizedDisconnect || IsCloseNotifySkippable(update.closeReason) {
|
||||
continue
|
||||
}
|
||||
filteredUpdates = append(filteredUpdates, update.pi)
|
||||
}
|
||||
|
||||
var fullUpdates []*livekit.ParticipantInfo
|
||||
for _, update := range updates {
|
||||
fullUpdates = append(fullUpdates, update.pi)
|
||||
}
|
||||
|
||||
for _, op := range r.GetParticipants() {
|
||||
var err error
|
||||
if op.ProtocolVersion().SupportsIdentityBasedReconnection() {
|
||||
err = op.SendParticipantUpdate(filteredUpdates)
|
||||
} else {
|
||||
err = op.SendParticipantUpdate(fullUpdates)
|
||||
}
|
||||
if err != nil {
|
||||
op.GetLogger().Errorw("could not send update to participant", err)
|
||||
}
|
||||
SendParticipantUpdates(updates, r.GetParticipants())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1411,68 +1361,6 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
// push a participant update for batched broadcast, optionally returning immediate updates to broadcast.
|
||||
// it handles the following scenarios
|
||||
// * subscriber-only updates will be queued for batch updates
|
||||
// * publisher & immediate updates will be returned without queuing
|
||||
// * when the SID changes, it will return both updates, with the earlier participant set to disconnected
|
||||
func (r *Room) pushAndDequeueUpdates(
|
||||
pi *livekit.ParticipantInfo,
|
||||
closeReason types.ParticipantCloseReason,
|
||||
isImmediate bool,
|
||||
) []*participantUpdate {
|
||||
r.batchedUpdatesMu.Lock()
|
||||
defer r.batchedUpdatesMu.Unlock()
|
||||
|
||||
var updates []*participantUpdate
|
||||
identity := livekit.ParticipantIdentity(pi.Identity)
|
||||
existing := r.batchedUpdates[identity]
|
||||
shouldSend := isImmediate || pi.IsPublisher
|
||||
|
||||
if existing != nil {
|
||||
if pi.Sid == existing.pi.Sid {
|
||||
// same participant session
|
||||
if pi.Version < existing.pi.Version {
|
||||
// out of order update
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// different participant sessions
|
||||
if CompareParticipant(existing.pi, pi) < 0 {
|
||||
// existing is older, synthesize a DISCONNECT for older and
|
||||
// send immediately along with newer session to signal switch
|
||||
shouldSend = true
|
||||
existing.pi.State = livekit.ParticipantInfo_DISCONNECTED
|
||||
existing.isSynthesizedDisconnect = true
|
||||
updates = append(updates, existing)
|
||||
} else {
|
||||
// older session update, newer session has already become active, so nothing to do
|
||||
return nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ep := r.GetParticipant(identity)
|
||||
if ep != nil {
|
||||
epi := ep.ToProto()
|
||||
if CompareParticipant(epi, pi) > 0 {
|
||||
// older session update, newer session has already become active, so nothing to do
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldSend {
|
||||
// include any queued update, and return
|
||||
delete(r.batchedUpdates, identity)
|
||||
updates = append(updates, &participantUpdate{pi: pi, closeReason: closeReason})
|
||||
} else {
|
||||
// enqueue for batch
|
||||
r.batchedUpdates[identity] = &participantUpdate{pi: pi, closeReason: closeReason}
|
||||
}
|
||||
|
||||
return updates
|
||||
}
|
||||
|
||||
func (r *Room) updateProto() *livekit.Room {
|
||||
r.lock.RLock()
|
||||
room := utils.CloneProto(r.protoRoom)
|
||||
@@ -1508,14 +1396,14 @@ func (r *Room) changeUpdateWorker() {
|
||||
case <-subTicker.C:
|
||||
r.batchedUpdatesMu.Lock()
|
||||
updatesMap := r.batchedUpdates
|
||||
r.batchedUpdates = make(map[livekit.ParticipantIdentity]*participantUpdate)
|
||||
r.batchedUpdates = make(map[livekit.ParticipantIdentity]*ParticipantUpdate)
|
||||
r.batchedUpdatesMu.Unlock()
|
||||
|
||||
if len(updatesMap) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
r.sendParticipantUpdates(maps.Values(updatesMap))
|
||||
SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1915,6 +1803,138 @@ func CompareParticipant(pi1 *livekit.ParticipantInfo, pi2 *livekit.ParticipantIn
|
||||
return 0
|
||||
}
|
||||
|
||||
type ParticipantUpdate struct {
|
||||
ParticipantInfo *livekit.ParticipantInfo
|
||||
IsSynthesizedDisconnect bool
|
||||
CloseReason types.ParticipantCloseReason
|
||||
}
|
||||
|
||||
// push a participant update for batched broadcast, optionally returning immediate updates to broadcast.
|
||||
// it handles the following scenarios
|
||||
// * subscriber-only updates will be queued for batch updates
|
||||
// * publisher & immediate updates will be returned without queuing
|
||||
// * when the SID changes, it will return both updates, with the earlier participant set to disconnected
|
||||
func PushAndDequeueUpdates(
|
||||
pi *livekit.ParticipantInfo,
|
||||
closeReason types.ParticipantCloseReason,
|
||||
isImmediate bool,
|
||||
existingParticipant types.Participant,
|
||||
lock *sync.Mutex,
|
||||
cache map[livekit.ParticipantIdentity]*ParticipantUpdate,
|
||||
) []*ParticipantUpdate {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
var updates []*ParticipantUpdate
|
||||
identity := livekit.ParticipantIdentity(pi.Identity)
|
||||
existing := cache[identity]
|
||||
shouldSend := isImmediate || pi.IsPublisher
|
||||
|
||||
if existing != nil {
|
||||
if pi.Sid == existing.ParticipantInfo.Sid {
|
||||
// same participant session
|
||||
if pi.Version < existing.ParticipantInfo.Version {
|
||||
// out of order update
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// different participant sessions
|
||||
if CompareParticipant(existing.ParticipantInfo, pi) < 0 {
|
||||
// existing is older, synthesize a DISCONNECT for older and
|
||||
// send immediately along with newer session to signal switch
|
||||
shouldSend = true
|
||||
existing.ParticipantInfo.State = livekit.ParticipantInfo_DISCONNECTED
|
||||
existing.IsSynthesizedDisconnect = true
|
||||
updates = append(updates, existing)
|
||||
} else {
|
||||
// older session update, newer session has already become active, so nothing to do
|
||||
return nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if existingParticipant != nil {
|
||||
epi := existingParticipant.ToProto()
|
||||
if CompareParticipant(epi, pi) > 0 {
|
||||
// older session update, newer session has already become active, so nothing to do
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldSend {
|
||||
// include any queued update, and return
|
||||
delete(cache, identity)
|
||||
updates = append(updates, &ParticipantUpdate{ParticipantInfo: pi, CloseReason: closeReason})
|
||||
} else {
|
||||
// enqueue for batch
|
||||
cache[identity] = &ParticipantUpdate{ParticipantInfo: pi, CloseReason: closeReason}
|
||||
}
|
||||
|
||||
return updates
|
||||
}
|
||||
|
||||
func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.LocalParticipant) {
|
||||
if len(updates) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// For filtered updates, skip
|
||||
// 1. synthesized DISCONNECT - this happens on SID change
|
||||
// 2. close reasons of DUPLICATE_IDENTITY/STALE - A newer session for that identity exists.
|
||||
//
|
||||
// Filtered updates are used with clients that can handle identity based reconnect and hence those
|
||||
// conditions can be skipped.
|
||||
var filteredUpdates []*livekit.ParticipantInfo
|
||||
for _, update := range updates {
|
||||
if update.IsSynthesizedDisconnect || IsCloseNotifySkippable(update.CloseReason) {
|
||||
continue
|
||||
}
|
||||
filteredUpdates = append(filteredUpdates, update.ParticipantInfo)
|
||||
}
|
||||
|
||||
var fullUpdates []*livekit.ParticipantInfo
|
||||
for _, update := range updates {
|
||||
fullUpdates = append(fullUpdates, update.ParticipantInfo)
|
||||
}
|
||||
|
||||
for _, op := range participants {
|
||||
var err error
|
||||
if op.ProtocolVersion().SupportsIdentityBasedReconnection() {
|
||||
err = op.SendParticipantUpdate(filteredUpdates)
|
||||
} else {
|
||||
err = op.SendParticipantUpdate(fullUpdates)
|
||||
}
|
||||
if err != nil {
|
||||
op.GetLogger().Errorw("could not send update to participant", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetOtherParticipantInfo returns ParticipantInfo for everyone in the room except for the participant identified by lp.Identity()
|
||||
func GetOtherParticipantInfo(
|
||||
lp types.LocalParticipant,
|
||||
isMigratingIn bool,
|
||||
allParticipants []types.Participant,
|
||||
skipSubscriberBroadcast bool,
|
||||
) []*livekit.ParticipantInfo {
|
||||
var lpIdentity livekit.ParticipantIdentity
|
||||
if lp != nil {
|
||||
lpIdentity = lp.Identity()
|
||||
}
|
||||
|
||||
pInfos := make([]*livekit.ParticipantInfo, 0, len(allParticipants))
|
||||
for _, op := range allParticipants {
|
||||
if !op.Hidden() &&
|
||||
op.Identity() != lpIdentity &&
|
||||
!isMigratingIn &&
|
||||
!(skipSubscriberBroadcast && op.CanSkipBroadcast()) {
|
||||
pInfos = append(pInfos, op.ToProto())
|
||||
}
|
||||
}
|
||||
|
||||
return pInfos
|
||||
}
|
||||
|
||||
func connectionDetailsFields(infos []*types.ICEConnectionInfo) []interface{} {
|
||||
var fields []interface{}
|
||||
connectionType := types.ICEConnectionTypeUnknown
|
||||
@@ -1962,3 +1982,11 @@ func connectionDetailsFields(infos []*types.ICEConnectionInfo) []interface{} {
|
||||
fields = append(fields, "connectionType", connectionType)
|
||||
return fields
|
||||
}
|
||||
|
||||
func toParticipants(lps []types.LocalParticipant) []types.Participant {
|
||||
participants := make([]types.Participant, len(lps))
|
||||
for idx, lp := range lps {
|
||||
participants[idx] = lp
|
||||
}
|
||||
return participants
|
||||
}
|
||||
|
||||
+34
-27
@@ -272,14 +272,14 @@ func TestPushAndDequeueUpdates(t *testing.T) {
|
||||
pi *livekit.ParticipantInfo
|
||||
closeReason types.ParticipantCloseReason
|
||||
immediate bool
|
||||
existing *participantUpdate
|
||||
expected []*participantUpdate
|
||||
validate func(t *testing.T, rm *Room, updates []*participantUpdate)
|
||||
existing *ParticipantUpdate
|
||||
expected []*ParticipantUpdate
|
||||
validate func(t *testing.T, rm *Room, updates []*ParticipantUpdate)
|
||||
}{
|
||||
{
|
||||
name: "publisher updates are immediate",
|
||||
pi: publisher1v1,
|
||||
expected: []*participantUpdate{{pi: publisher1v1}},
|
||||
expected: []*ParticipantUpdate{{ParticipantInfo: publisher1v1}},
|
||||
},
|
||||
{
|
||||
name: "subscriber updates are queued",
|
||||
@@ -288,20 +288,20 @@ func TestPushAndDequeueUpdates(t *testing.T) {
|
||||
{
|
||||
name: "last version is enqueued",
|
||||
pi: subscriber1v2,
|
||||
existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)}, // clone the existing value since it can be modified when setting to disconnected
|
||||
validate: func(t *testing.T, rm *Room, _ []*participantUpdate) {
|
||||
existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v1)}, // clone the existing value since it can be modified when setting to disconnected
|
||||
validate: func(t *testing.T, rm *Room, _ []*ParticipantUpdate) {
|
||||
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
|
||||
require.NotNil(t, queued)
|
||||
requirePIEquals(t, subscriber1v2, queued.pi)
|
||||
requirePIEquals(t, subscriber1v2, queued.ParticipantInfo)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "latest version when immediate",
|
||||
pi: subscriber1v2,
|
||||
existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)},
|
||||
existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v1)},
|
||||
immediate: true,
|
||||
expected: []*participantUpdate{{pi: subscriber1v2}},
|
||||
validate: func(t *testing.T, rm *Room, _ []*participantUpdate) {
|
||||
expected: []*ParticipantUpdate{{ParticipantInfo: subscriber1v2}},
|
||||
validate: func(t *testing.T, rm *Room, _ []*ParticipantUpdate) {
|
||||
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
|
||||
require.Nil(t, queued)
|
||||
},
|
||||
@@ -309,37 +309,37 @@ func TestPushAndDequeueUpdates(t *testing.T) {
|
||||
{
|
||||
name: "out of order updates are rejected",
|
||||
pi: subscriber1v1,
|
||||
existing: &participantUpdate{pi: utils.CloneProto(subscriber1v2)},
|
||||
validate: func(t *testing.T, rm *Room, updates []*participantUpdate) {
|
||||
existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v2)},
|
||||
validate: func(t *testing.T, rm *Room, updates []*ParticipantUpdate) {
|
||||
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
|
||||
requirePIEquals(t, subscriber1v2, queued.pi)
|
||||
requirePIEquals(t, subscriber1v2, queued.ParticipantInfo)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sid change is broadcasted immediately with synthsized disconnect",
|
||||
pi: publisher2,
|
||||
closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant, // just to test if update contain the close reason
|
||||
existing: &participantUpdate{pi: utils.CloneProto(subscriber1v2), closeReason: types.ParticipantCloseReasonStale},
|
||||
expected: []*participantUpdate{
|
||||
existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v2), CloseReason: types.ParticipantCloseReasonStale},
|
||||
expected: []*ParticipantUpdate{
|
||||
{
|
||||
pi: &livekit.ParticipantInfo{
|
||||
ParticipantInfo: &livekit.ParticipantInfo{
|
||||
Identity: identity,
|
||||
Sid: "1",
|
||||
Version: 2,
|
||||
State: livekit.ParticipantInfo_DISCONNECTED,
|
||||
},
|
||||
isSynthesizedDisconnect: true,
|
||||
closeReason: types.ParticipantCloseReasonStale,
|
||||
IsSynthesizedDisconnect: true,
|
||||
CloseReason: types.ParticipantCloseReasonStale,
|
||||
},
|
||||
{pi: publisher2, closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant},
|
||||
{ParticipantInfo: publisher2, CloseReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when switching to publisher, queue is cleared",
|
||||
pi: publisher1v2,
|
||||
existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)},
|
||||
expected: []*participantUpdate{{pi: publisher1v2}},
|
||||
validate: func(t *testing.T, rm *Room, updates []*participantUpdate) {
|
||||
existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v1)},
|
||||
expected: []*ParticipantUpdate{{ParticipantInfo: publisher1v2}},
|
||||
validate: func(t *testing.T, rm *Room, updates []*ParticipantUpdate) {
|
||||
require.Empty(t, rm.batchedUpdates)
|
||||
},
|
||||
},
|
||||
@@ -349,14 +349,21 @@ func TestPushAndDequeueUpdates(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})
|
||||
if tc.existing != nil {
|
||||
rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.pi.Identity)] = tc.existing
|
||||
rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.ParticipantInfo.Identity)] = tc.existing
|
||||
}
|
||||
updates := rm.pushAndDequeueUpdates(tc.pi, tc.closeReason, tc.immediate)
|
||||
updates := PushAndDequeueUpdates(
|
||||
tc.pi,
|
||||
tc.closeReason,
|
||||
tc.immediate,
|
||||
rm.GetParticipant(livekit.ParticipantIdentity(tc.pi.Identity)),
|
||||
&rm.batchedUpdatesMu,
|
||||
rm.batchedUpdates,
|
||||
)
|
||||
require.Equal(t, len(tc.expected), len(updates))
|
||||
for i, item := range tc.expected {
|
||||
requirePIEquals(t, item.pi, updates[i].pi)
|
||||
require.Equal(t, item.isSynthesizedDisconnect, updates[i].isSynthesizedDisconnect)
|
||||
require.Equal(t, item.closeReason, updates[i].closeReason)
|
||||
requirePIEquals(t, item.ParticipantInfo, updates[i].ParticipantInfo)
|
||||
require.Equal(t, item.IsSynthesizedDisconnect, updates[i].IsSynthesizedDisconnect)
|
||||
require.Equal(t, item.CloseReason, updates[i].CloseReason)
|
||||
}
|
||||
|
||||
if tc.validate != nil {
|
||||
|
||||
@@ -16,7 +16,7 @@ package types
|
||||
|
||||
type ProtocolVersion int
|
||||
|
||||
const CurrentProtocol = 15
|
||||
const CurrentProtocol = 16
|
||||
|
||||
func (v ProtocolVersion) SupportsPackedStreamId() bool {
|
||||
return v > 0
|
||||
|
||||
@@ -291,6 +291,7 @@ type DownTrack struct {
|
||||
|
||||
bindLock sync.Mutex
|
||||
bindState atomic.Value
|
||||
wasEverBound atomic.Bool
|
||||
onBinding func(error)
|
||||
bindOnReceiverReady func()
|
||||
onBindAndConnected func()
|
||||
@@ -584,6 +585,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
|
||||
d.onBinding(nil)
|
||||
}
|
||||
d.setBindStateLocked(bindStateBound)
|
||||
d.wasEverBound.Store(true)
|
||||
d.bindLock.Unlock()
|
||||
|
||||
d.forwarder.DetermineCodec(codec.RTPCodecCapability, d.Receiver().HeaderExtensions())
|
||||
@@ -852,8 +854,8 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) {
|
||||
d.transceiver.Store(transceiver)
|
||||
}
|
||||
|
||||
func (d *DownTrack) GetTransceiver() *webrtc.RTPTransceiver {
|
||||
return d.transceiver.Load()
|
||||
func (d *DownTrack) GetTransceiver() (*webrtc.RTPTransceiver, bool) {
|
||||
return d.transceiver.Load(), d.wasEverBound.Load()
|
||||
}
|
||||
|
||||
func (d *DownTrack) postKeyFrameRequestEvent() {
|
||||
|
||||
Reference in New Issue
Block a user