diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 9648c118a..43a7570e6 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -608,7 +608,7 @@ func (t *MediaTrackReceiver) AddSubscriber(sub types.LocalParticipant) (types.Su t.lock.RUnlock() if remove { - t.params.Logger.Debugw("removing susbcriber on a not-open track", "subscriberID", subID, "isExpectedToResume", isExpectedToResume) + t.params.Logger.Debugw("removing subscriber on a not-open track", "subscriberID", subID, "isExpectedToResume", isExpectedToResume) _ = t.MediaTrackSubscriptions.RemoveSubscriber(subID, isExpectedToResume) return nil, ErrNotOpen } diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 1e8f73cdd..5a6a6374b 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -453,9 +453,16 @@ func (t *MediaTrackSubscriptions) downTrackClosed( // delete the subscribed track only after caching. if isExpectedToResume { dt := subTrack.DownTrack() - tr := dt.GetTransceiver() + tr, wasBound := dt.GetTransceiver() if tr != nil { - sub.CacheDownTrack(subTrack.ID(), tr, dt.GetState()) + if wasBound { + sub.CacheDownTrack(subTrack.ID(), tr, dt.GetState()) + } else { + // unbound transceivers cannot be re-used as pion will not fire Bind() in + // ReplaceTrack(). + t.params.Logger.Infow("stopping unbound transceiver") + tr.Stop() + } } } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index f4dceeb8c..357918ea3 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1867,7 +1867,11 @@ func (p *ParticipantImpl) onMediaTrack(rtcTrack *webrtc.TrackRemote, rtpReceiver _, _, err := rtcTrack.Read(bytes) if err != nil { if !errors.Is(err, io.EOF) { - p.params.Logger.Warnw("could not read first packet to determine codec, track will be ignored", err, "trackID", rtcTrack.ID(), "StreamID", rtcTrack.StreamID()) + p.params.Logger.Warnw( + "could not read first packet to determine codec, track will be ignored", err, + "trackID", rtcTrack.ID(), + "StreamID", rtcTrack.StreamID(), + ) } return } @@ -1880,13 +1884,23 @@ func (p *ParticipantImpl) onMediaTrack(rtcTrack *webrtc.TrackRemote, rtpReceiver // track fired by sdp codecs := rtpReceiver.GetParameters().Codecs if len(codecs) == 0 { - p.pubLogger.Errorw("no negotiated codecs for track, track will be ignored", nil, "trackID", rtcTrack.ID(), "StreamID", rtcTrack.StreamID()) + p.pubLogger.Errorw( + "no negotiated codecs for track, track will be ignored", nil, + "trackID", rtcTrack.ID(), + "StreamID", rtcTrack.StreamID(), + ) return } codec = codecs[0] fromSdp = true } - p.params.Logger.Debugw("onMediaTrack", "codec", codec, "payloadType", codec.PayloadType, "fromSdp", fromSdp, "parameters", rtpReceiver.GetParameters()) + p.params.Logger.Debugw( + "onMediaTrack", + "codec", codec, + "payloadType", codec.PayloadType, + "fromSdp", fromSdp, + "parameters", rtpReceiver.GetParameters(), + ) var track sfu.TrackRemote = sfu.NewTrackRemoteFromSdp(rtcTrack, codec) publishedTrack, isNewTrack := p.mediaTrackReceived(track, rtpReceiver) @@ -2526,7 +2540,10 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver "mid", mid, ) if mid == "" { - p.pendingRemoteTracks = append(p.pendingRemoteTracks, &pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver}) + p.pendingRemoteTracks = append( + p.pendingRemoteTracks, + &pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver}, + ) p.pendingTracksLock.Unlock() p.pubLogger.Warnw("could not get mid for track", nil, "trackID", track.ID()) return nil, false @@ -2539,7 +2556,10 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver if !ok { signalCid, ti, migrated, createdAt := p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), true) if ti == nil { - p.pendingRemoteTracks = append(p.pendingRemoteTracks, &pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver}) + p.pendingRemoteTracks = append( + p.pendingRemoteTracks, + &pendingRemoteTrack{track: track.RTCTrack(), receiver: rtpReceiver}, + ) p.pendingTracksLock.Unlock() return nil, false } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 712d38692..f372c3135 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -79,12 +79,6 @@ type broadcastOptions struct { immediate bool } -type participantUpdate struct { - pi *livekit.ParticipantInfo - isSynthesizedDisconnect bool - closeReason types.ParticipantCloseReason -} - type disconnectSignalOnResumeNoMessages struct { expiry time.Time closedCount int @@ -128,7 +122,7 @@ type Room struct { bufferFactory *buffer.FactoryOfBufferFactory // batch update participant info for non-publishers - batchedUpdates map[livekit.ParticipantIdentity]*participantUpdate + batchedUpdates map[livekit.ParticipantIdentity]*ParticipantUpdate batchedUpdatesMu sync.Mutex closed chan struct{} @@ -267,7 +261,7 @@ func NewRoom( hasPublished: make(map[livekit.ParticipantIdentity]bool), agentParticpants: make(map[livekit.ParticipantIdentity]*agentJob), bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSizeVideo, config.Receiver.PacketBufferSizeAudio), - batchedUpdates: make(map[livekit.ParticipantIdentity]*participantUpdate), + batchedUpdates: make(map[livekit.ParticipantIdentity]*ParticipantUpdate), closed: make(chan struct{}), trailer: []byte(utils.RandomSecret()), disconnectSignalOnResumeParticipants: make(map[livekit.ParticipantIdentity]time.Time), @@ -324,6 +318,7 @@ func (r *Room) Trailer() []byte { func (r *Room) GetParticipant(identity livekit.ParticipantIdentity) types.LocalParticipant { r.lock.RLock() defer r.lock.RUnlock() + return r.participants[identity] } @@ -644,7 +639,7 @@ func (r *Room) ResumeParticipant( } // include the local participant's info as well, since metadata could have been changed - updates := r.getOtherParticipantInfo("") + updates := GetOtherParticipantInfo(nil, false, toParticipants(r.GetParticipants()), false) if err := p.SendParticipantUpdate(updates); err != nil { return err } @@ -1135,18 +1130,6 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen return nil } -func (r *Room) getOtherParticipantInfo(identity livekit.ParticipantIdentity) []*livekit.ParticipantInfo { - participants := r.GetParticipants() - pi := make([]*livekit.ParticipantInfo, 0, len(participants)) - for _, p := range participants { - if !p.Hidden() && p.Identity() != identity { - pi = append(pi, p.ToProto()) - } - } - - return pi -} - // checks if participant should be autosubscribed to new tracks, assumes lock is already acquired func (r *Room) autoSubscribe(participant types.LocalParticipant) bool { opts := r.participantOpts[participant.Identity()] @@ -1158,21 +1141,18 @@ func (r *Room) autoSubscribe(participant types.LocalParticipant) bool { } func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceServers []*livekit.ICEServer) *livekit.JoinResponse { - // gather other participants and send join response - otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants)) - for _, p := range r.participants { - if p.ID() != participant.ID() && !p.Hidden() { - otherParticipants = append(otherParticipants, p.ToProto()) - } - } - iceConfig := participant.GetICEConfig() hasICEFallback := iceConfig.GetPreferencePublisher() != livekit.ICECandidateType_ICT_NONE || iceConfig.GetPreferenceSubscriber() != livekit.ICECandidateType_ICT_NONE return &livekit.JoinResponse{ - Room: r.ToProto(), - Participant: participant.ToProto(), - OtherParticipants: otherParticipants, - IceServers: iceServers, + Room: r.ToProto(), + Participant: participant.ToProto(), + OtherParticipants: GetOtherParticipantInfo( + participant, + false, // isMigratingIn + toParticipants(maps.Values(r.participants)), + false, // skipSubscriberBroadcast + ), + IceServers: iceServers, // indicates both server and client support subscriber as primary SubscriberPrimary: participant.SubscriberAsPrimary(), ClientConfiguration: participant.GetClientConfiguration(), @@ -1358,47 +1338,17 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas return } - updates := r.pushAndDequeueUpdates(pi, p.CloseReason(), opts.immediate) + updates := PushAndDequeueUpdates( + pi, + p.CloseReason(), + opts.immediate, + r.GetParticipant(livekit.ParticipantIdentity(pi.Identity)), + &r.batchedUpdatesMu, + r.batchedUpdates, + ) if len(updates) != 0 { selfSent = true - r.sendParticipantUpdates(updates) - } -} - -func (r *Room) sendParticipantUpdates(updates []*participantUpdate) { - if len(updates) == 0 { - return - } - - // For filtered updates, skip - // 1. synthesized DISCONNECT - this happens on SID change - // 2. close reasons of DUPLICATE_IDENTITY/STALE - A newer session for that identity exists. - // - // Filtered updates are used with clients that can handle identity based reconnect and hence those - // conditions can be skipped. - var filteredUpdates []*livekit.ParticipantInfo - for _, update := range updates { - if update.isSynthesizedDisconnect || IsCloseNotifySkippable(update.closeReason) { - continue - } - filteredUpdates = append(filteredUpdates, update.pi) - } - - var fullUpdates []*livekit.ParticipantInfo - for _, update := range updates { - fullUpdates = append(fullUpdates, update.pi) - } - - for _, op := range r.GetParticipants() { - var err error - if op.ProtocolVersion().SupportsIdentityBasedReconnection() { - err = op.SendParticipantUpdate(filteredUpdates) - } else { - err = op.SendParticipantUpdate(fullUpdates) - } - if err != nil { - op.GetLogger().Errorw("could not send update to participant", err) - } + SendParticipantUpdates(updates, r.GetParticipants()) } } @@ -1411,68 +1361,6 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) { } } -// push a participant update for batched broadcast, optionally returning immediate updates to broadcast. -// it handles the following scenarios -// * subscriber-only updates will be queued for batch updates -// * publisher & immediate updates will be returned without queuing -// * when the SID changes, it will return both updates, with the earlier participant set to disconnected -func (r *Room) pushAndDequeueUpdates( - pi *livekit.ParticipantInfo, - closeReason types.ParticipantCloseReason, - isImmediate bool, -) []*participantUpdate { - r.batchedUpdatesMu.Lock() - defer r.batchedUpdatesMu.Unlock() - - var updates []*participantUpdate - identity := livekit.ParticipantIdentity(pi.Identity) - existing := r.batchedUpdates[identity] - shouldSend := isImmediate || pi.IsPublisher - - if existing != nil { - if pi.Sid == existing.pi.Sid { - // same participant session - if pi.Version < existing.pi.Version { - // out of order update - return nil - } - } else { - // different participant sessions - if CompareParticipant(existing.pi, pi) < 0 { - // existing is older, synthesize a DISCONNECT for older and - // send immediately along with newer session to signal switch - shouldSend = true - existing.pi.State = livekit.ParticipantInfo_DISCONNECTED - existing.isSynthesizedDisconnect = true - updates = append(updates, existing) - } else { - // older session update, newer session has already become active, so nothing to do - return nil - } - } - } else { - ep := r.GetParticipant(identity) - if ep != nil { - epi := ep.ToProto() - if CompareParticipant(epi, pi) > 0 { - // older session update, newer session has already become active, so nothing to do - return nil - } - } - } - - if shouldSend { - // include any queued update, and return - delete(r.batchedUpdates, identity) - updates = append(updates, &participantUpdate{pi: pi, closeReason: closeReason}) - } else { - // enqueue for batch - r.batchedUpdates[identity] = &participantUpdate{pi: pi, closeReason: closeReason} - } - - return updates -} - func (r *Room) updateProto() *livekit.Room { r.lock.RLock() room := utils.CloneProto(r.protoRoom) @@ -1508,14 +1396,14 @@ func (r *Room) changeUpdateWorker() { case <-subTicker.C: r.batchedUpdatesMu.Lock() updatesMap := r.batchedUpdates - r.batchedUpdates = make(map[livekit.ParticipantIdentity]*participantUpdate) + r.batchedUpdates = make(map[livekit.ParticipantIdentity]*ParticipantUpdate) r.batchedUpdatesMu.Unlock() if len(updatesMap) == 0 { continue } - r.sendParticipantUpdates(maps.Values(updatesMap)) + SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants()) } } } @@ -1915,6 +1803,138 @@ func CompareParticipant(pi1 *livekit.ParticipantInfo, pi2 *livekit.ParticipantIn return 0 } +type ParticipantUpdate struct { + ParticipantInfo *livekit.ParticipantInfo + IsSynthesizedDisconnect bool + CloseReason types.ParticipantCloseReason +} + +// push a participant update for batched broadcast, optionally returning immediate updates to broadcast. +// it handles the following scenarios +// * subscriber-only updates will be queued for batch updates +// * publisher & immediate updates will be returned without queuing +// * when the SID changes, it will return both updates, with the earlier participant set to disconnected +func PushAndDequeueUpdates( + pi *livekit.ParticipantInfo, + closeReason types.ParticipantCloseReason, + isImmediate bool, + existingParticipant types.Participant, + lock *sync.Mutex, + cache map[livekit.ParticipantIdentity]*ParticipantUpdate, +) []*ParticipantUpdate { + lock.Lock() + defer lock.Unlock() + + var updates []*ParticipantUpdate + identity := livekit.ParticipantIdentity(pi.Identity) + existing := cache[identity] + shouldSend := isImmediate || pi.IsPublisher + + if existing != nil { + if pi.Sid == existing.ParticipantInfo.Sid { + // same participant session + if pi.Version < existing.ParticipantInfo.Version { + // out of order update + return nil + } + } else { + // different participant sessions + if CompareParticipant(existing.ParticipantInfo, pi) < 0 { + // existing is older, synthesize a DISCONNECT for older and + // send immediately along with newer session to signal switch + shouldSend = true + existing.ParticipantInfo.State = livekit.ParticipantInfo_DISCONNECTED + existing.IsSynthesizedDisconnect = true + updates = append(updates, existing) + } else { + // older session update, newer session has already become active, so nothing to do + return nil + } + } + } else { + if existingParticipant != nil { + epi := existingParticipant.ToProto() + if CompareParticipant(epi, pi) > 0 { + // older session update, newer session has already become active, so nothing to do + return nil + } + } + } + + if shouldSend { + // include any queued update, and return + delete(cache, identity) + updates = append(updates, &ParticipantUpdate{ParticipantInfo: pi, CloseReason: closeReason}) + } else { + // enqueue for batch + cache[identity] = &ParticipantUpdate{ParticipantInfo: pi, CloseReason: closeReason} + } + + return updates +} + +func SendParticipantUpdates(updates []*ParticipantUpdate, participants []types.LocalParticipant) { + if len(updates) == 0 { + return + } + + // For filtered updates, skip + // 1. synthesized DISCONNECT - this happens on SID change + // 2. close reasons of DUPLICATE_IDENTITY/STALE - A newer session for that identity exists. + // + // Filtered updates are used with clients that can handle identity based reconnect and hence those + // conditions can be skipped. + var filteredUpdates []*livekit.ParticipantInfo + for _, update := range updates { + if update.IsSynthesizedDisconnect || IsCloseNotifySkippable(update.CloseReason) { + continue + } + filteredUpdates = append(filteredUpdates, update.ParticipantInfo) + } + + var fullUpdates []*livekit.ParticipantInfo + for _, update := range updates { + fullUpdates = append(fullUpdates, update.ParticipantInfo) + } + + for _, op := range participants { + var err error + if op.ProtocolVersion().SupportsIdentityBasedReconnection() { + err = op.SendParticipantUpdate(filteredUpdates) + } else { + err = op.SendParticipantUpdate(fullUpdates) + } + if err != nil { + op.GetLogger().Errorw("could not send update to participant", err) + } + } +} + +// GetOtherParticipantInfo returns ParticipantInfo for everyone in the room except for the participant identified by lp.Identity() +func GetOtherParticipantInfo( + lp types.LocalParticipant, + isMigratingIn bool, + allParticipants []types.Participant, + skipSubscriberBroadcast bool, +) []*livekit.ParticipantInfo { + var lpIdentity livekit.ParticipantIdentity + if lp != nil { + lpIdentity = lp.Identity() + } + + pInfos := make([]*livekit.ParticipantInfo, 0, len(allParticipants)) + for _, op := range allParticipants { + if !op.Hidden() && + op.Identity() != lpIdentity && + !isMigratingIn && + !(skipSubscriberBroadcast && op.CanSkipBroadcast()) { + pInfos = append(pInfos, op.ToProto()) + } + } + + return pInfos +} + func connectionDetailsFields(infos []*types.ICEConnectionInfo) []interface{} { var fields []interface{} connectionType := types.ICEConnectionTypeUnknown @@ -1962,3 +1982,11 @@ func connectionDetailsFields(infos []*types.ICEConnectionInfo) []interface{} { fields = append(fields, "connectionType", connectionType) return fields } + +func toParticipants(lps []types.LocalParticipant) []types.Participant { + participants := make([]types.Participant, len(lps)) + for idx, lp := range lps { + participants[idx] = lp + } + return participants +} diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index a294baccb..07dccc002 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -272,14 +272,14 @@ func TestPushAndDequeueUpdates(t *testing.T) { pi *livekit.ParticipantInfo closeReason types.ParticipantCloseReason immediate bool - existing *participantUpdate - expected []*participantUpdate - validate func(t *testing.T, rm *Room, updates []*participantUpdate) + existing *ParticipantUpdate + expected []*ParticipantUpdate + validate func(t *testing.T, rm *Room, updates []*ParticipantUpdate) }{ { name: "publisher updates are immediate", pi: publisher1v1, - expected: []*participantUpdate{{pi: publisher1v1}}, + expected: []*ParticipantUpdate{{ParticipantInfo: publisher1v1}}, }, { name: "subscriber updates are queued", @@ -288,20 +288,20 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "last version is enqueued", pi: subscriber1v2, - existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)}, // clone the existing value since it can be modified when setting to disconnected - validate: func(t *testing.T, rm *Room, _ []*participantUpdate) { + existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v1)}, // clone the existing value since it can be modified when setting to disconnected + validate: func(t *testing.T, rm *Room, _ []*ParticipantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] require.NotNil(t, queued) - requirePIEquals(t, subscriber1v2, queued.pi) + requirePIEquals(t, subscriber1v2, queued.ParticipantInfo) }, }, { name: "latest version when immediate", pi: subscriber1v2, - existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)}, + existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v1)}, immediate: true, - expected: []*participantUpdate{{pi: subscriber1v2}}, - validate: func(t *testing.T, rm *Room, _ []*participantUpdate) { + expected: []*ParticipantUpdate{{ParticipantInfo: subscriber1v2}}, + validate: func(t *testing.T, rm *Room, _ []*ParticipantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] require.Nil(t, queued) }, @@ -309,37 +309,37 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "out of order updates are rejected", pi: subscriber1v1, - existing: &participantUpdate{pi: utils.CloneProto(subscriber1v2)}, - validate: func(t *testing.T, rm *Room, updates []*participantUpdate) { + existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v2)}, + validate: func(t *testing.T, rm *Room, updates []*ParticipantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] - requirePIEquals(t, subscriber1v2, queued.pi) + requirePIEquals(t, subscriber1v2, queued.ParticipantInfo) }, }, { name: "sid change is broadcasted immediately with synthsized disconnect", pi: publisher2, closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant, // just to test if update contain the close reason - existing: &participantUpdate{pi: utils.CloneProto(subscriber1v2), closeReason: types.ParticipantCloseReasonStale}, - expected: []*participantUpdate{ + existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v2), CloseReason: types.ParticipantCloseReasonStale}, + expected: []*ParticipantUpdate{ { - pi: &livekit.ParticipantInfo{ + ParticipantInfo: &livekit.ParticipantInfo{ Identity: identity, Sid: "1", Version: 2, State: livekit.ParticipantInfo_DISCONNECTED, }, - isSynthesizedDisconnect: true, - closeReason: types.ParticipantCloseReasonStale, + IsSynthesizedDisconnect: true, + CloseReason: types.ParticipantCloseReasonStale, }, - {pi: publisher2, closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant}, + {ParticipantInfo: publisher2, CloseReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant}, }, }, { name: "when switching to publisher, queue is cleared", pi: publisher1v2, - existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)}, - expected: []*participantUpdate{{pi: publisher1v2}}, - validate: func(t *testing.T, rm *Room, updates []*participantUpdate) { + existing: &ParticipantUpdate{ParticipantInfo: utils.CloneProto(subscriber1v1)}, + expected: []*ParticipantUpdate{{ParticipantInfo: publisher1v2}}, + validate: func(t *testing.T, rm *Room, updates []*ParticipantUpdate) { require.Empty(t, rm.batchedUpdates) }, }, @@ -349,14 +349,21 @@ func TestPushAndDequeueUpdates(t *testing.T) { t.Run(tc.name, func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) if tc.existing != nil { - rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.pi.Identity)] = tc.existing + rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.ParticipantInfo.Identity)] = tc.existing } - updates := rm.pushAndDequeueUpdates(tc.pi, tc.closeReason, tc.immediate) + updates := PushAndDequeueUpdates( + tc.pi, + tc.closeReason, + tc.immediate, + rm.GetParticipant(livekit.ParticipantIdentity(tc.pi.Identity)), + &rm.batchedUpdatesMu, + rm.batchedUpdates, + ) require.Equal(t, len(tc.expected), len(updates)) for i, item := range tc.expected { - requirePIEquals(t, item.pi, updates[i].pi) - require.Equal(t, item.isSynthesizedDisconnect, updates[i].isSynthesizedDisconnect) - require.Equal(t, item.closeReason, updates[i].closeReason) + requirePIEquals(t, item.ParticipantInfo, updates[i].ParticipantInfo) + require.Equal(t, item.IsSynthesizedDisconnect, updates[i].IsSynthesizedDisconnect) + require.Equal(t, item.CloseReason, updates[i].CloseReason) } if tc.validate != nil { diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index a6a7e2056..f1c54a7cb 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -16,7 +16,7 @@ package types type ProtocolVersion int -const CurrentProtocol = 15 +const CurrentProtocol = 16 func (v ProtocolVersion) SupportsPackedStreamId() bool { return v > 0 diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d2472db61..e6417a7ad 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -291,6 +291,7 @@ type DownTrack struct { bindLock sync.Mutex bindState atomic.Value + wasEverBound atomic.Bool onBinding func(error) bindOnReceiverReady func() onBindAndConnected func() @@ -584,6 +585,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.onBinding(nil) } d.setBindStateLocked(bindStateBound) + d.wasEverBound.Store(true) d.bindLock.Unlock() d.forwarder.DetermineCodec(codec.RTPCodecCapability, d.Receiver().HeaderExtensions()) @@ -852,8 +854,8 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) { d.transceiver.Store(transceiver) } -func (d *DownTrack) GetTransceiver() *webrtc.RTPTransceiver { - return d.transceiver.Load() +func (d *DownTrack) GetTransceiver() (*webrtc.RTPTransceiver, bool) { + return d.transceiver.Load(), d.wasEverBound.Load() } func (d *DownTrack) postKeyFrameRequestEvent() {