diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index e604c294a..26730b3c5 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -60,6 +60,9 @@ func TestMissingAnswerDuringICERestart(t *testing.T) { connectTransports(t, transportA, transportB, true, 1, 1) require.Equal(t, webrtc.ICEConnectionStateConnected, transportA.pc.ICEConnectionState()) require.Equal(t, webrtc.ICEConnectionStateConnected, transportB.pc.ICEConnectionState()) + + transportA.Close() + transportB.Close() } func TestNegotiationTiming(t *testing.T) { @@ -133,6 +136,9 @@ func TestNegotiationTiming(t *testing.T) { offer2, ok := offer.Load().(*webrtc.SessionDescription) require.True(t, ok) require.False(t, offer2 == actualOffer) + + transportA.Close() + transportB.Close() } func TestFirstOfferMissedDuringICERestart(t *testing.T) { @@ -198,6 +204,7 @@ func TestFirstOfferMissedDuringICERestart(t *testing.T) { transportB.pc.ICEConnectionState() == webrtc.ICEConnectionStateConnected && offerCount.Load() == 2 }, testutils.ConnectTimeout, 10*time.Millisecond, "transport did not connect") + transportA.Close() transportB.Close() } @@ -270,6 +277,7 @@ func TestFirstAnwserMissedDuringICERestart(t *testing.T) { transportB.pc.ICEConnectionState() == webrtc.ICEConnectionStateConnected && offerCount.Load() == 2 }, testutils.ConnectTimeout, 10*time.Millisecond, "transport did not connect") + transportA.Close() transportB.Close() } @@ -301,6 +309,8 @@ func TestNegotiationFailed(t *testing.T) { require.Eventually(t, func() bool { return failed.Load() == 1 }, negotiationFailedTimeout+time.Second, 10*time.Millisecond, "negotiation failed") + + transportA.Close() } func TestFilteringCandidates(t *testing.T) { @@ -420,6 +430,8 @@ func TestFilteringCandidates(t *testing.T) { udp, tcp = getNumTransportTypeCandidates(parsed) require.Zero(t, udp) require.Equal(t, 2, tcp) + + transport.Close() } func handleICEExchange(t *testing.T, a, b *PCTransport) { diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index e8fc1b02a..f33715956 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -46,11 +46,10 @@ type UpTrackManager struct { func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager { return &UpTrackManager{ - params: params, - publishedTracks: make(map[livekit.TrackID]types.MediaTrack), - subscriptionPermissionVersion: utils.NewTimedVersion(time.Now(), 0), - pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantIdentity), - opsQueue: utils.NewOpsQueue(params.Logger, "utm", 20), + params: params, + publishedTracks: make(map[livekit.TrackID]types.MediaTrack), + pendingSubscriptions: make(map[livekit.TrackID][]livekit.ParticipantIdentity), + opsQueue: utils.NewOpsQueue(params.Logger, "utm", 20), } } @@ -206,27 +205,35 @@ func (u *UpTrackManager) UpdateSubscriptionPermission( ) error { u.lock.Lock() if timedVersion != nil { - tv := utils.NewTimedVersionFromProto(timedVersion) - // ignore older version - if !tv.After(u.subscriptionPermissionVersion) { - perms := "" - if u.subscriptionPermission != nil { - perms = u.subscriptionPermission.String() + if u.subscriptionPermissionVersion != nil { + tv := utils.NewTimedVersionFromProto(timedVersion) + // ignore older version + if !tv.After(u.subscriptionPermissionVersion) { + perms := "" + if u.subscriptionPermission != nil { + perms = u.subscriptionPermission.String() + } + u.params.Logger.Infow( + "skipping older subscription permission version", + "existingValue", perms, + "existingVersion", u.subscriptionPermissionVersion.ToProto().String(), + "requestingValue", subscriptionPermission.String(), + "requestingVersion", timedVersion.String(), + ) + u.lock.Unlock() + return nil } - u.params.Logger.Infow( - "skipping older subscription permission version", - "existingValue", perms, - "existingVersion", u.subscriptionPermissionVersion.ToProto().String(), - "requestingValue", subscriptionPermission.String(), - "requestingVersion", timedVersion.String(), - ) - u.lock.Unlock() - return nil + u.subscriptionPermissionVersion.Update(time.UnixMicro(timedVersion.UnixMicro)) + } else { + u.subscriptionPermissionVersion = utils.NewTimedVersionFromProto(timedVersion) } - u.subscriptionPermissionVersion.Update(time.UnixMicro(timedVersion.UnixMicro)) } else { - // ignore older version - u.subscriptionPermissionVersion.Update(time.Now()) + // use current time as the new/updated version + if u.subscriptionPermissionVersion == nil { + u.subscriptionPermissionVersion = utils.NewTimedVersion(time.Now(), 0) + } else { + u.subscriptionPermissionVersion.Update(time.Now()) + } } // store as is for use when migrating @@ -264,6 +271,10 @@ func (u *UpTrackManager) SubscriptionPermission() (*livekit.SubscriptionPermissi u.lock.RLock() defer u.lock.RUnlock() + if u.subscriptionPermissionVersion == nil { + return nil, nil + } + return u.subscriptionPermission, u.subscriptionPermissionVersion.ToProto() }