From 92a355e1f3f7b7e8ddff43766ee4fa338922d74e Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Thu, 28 Sep 2023 15:41:44 +0800 Subject: [PATCH] Add SyncStreams flag to Room (#2110) * Add SyncStreams flag to Room * Increase protocol version * Revert version change * Move flags to internal & solve comment --- config-sample.yaml | 3 + go.mod | 2 +- go.sum | 6 +- pkg/config/config.go | 1 + pkg/rtc/mediatracksubscriptions.go | 4 +- pkg/rtc/participant.go | 11 +- pkg/rtc/types/interfaces.go | 3 +- .../typesfakes/fake_local_participant.go | 135 +++++++++++++----- pkg/service/roomallocator.go | 15 +- pkg/service/roommanager.go | 5 +- 10 files changed, 132 insertions(+), 53 deletions(-) diff --git a/config-sample.yaml b/config-sample.yaml index d7a4fe773..6025a3a9b 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -162,6 +162,9 @@ keys: # enabled: true # min: 100 # max: 2000 +# # improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use +# # so not recommended for rooms with frequent subscription changes +# sync_streams: true # Webhooks # when configured, LiveKit notifies your URL handler with room events diff --git a/go.mod b/go.mod index bb0795cb2..d31a29bdd 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5 - github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2 + github.com/livekit/protocol v1.7.3-0.20230928065809-281e00a4a67d github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 757166a60..b08db3a89 100644 --- a/go.sum +++ b/go.sum @@ -127,10 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5 h1:CjXYkNKSrdIn7GMD1ySoXrURhL5U9d6vG32vxcUhzIU= github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.7.3-0.20230919182418-0708b5a5bb84 h1:4WOaspDesbbrjGPsu6Vp1VjcWxTXVjyjdtQAzIUXn5s= -github.com/livekit/protocol v1.7.3-0.20230919182418-0708b5a5bb84/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= -github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2 h1:yIRqvyO3qDPO+4EdcHMjsINJYl6KE9AXJUgfChsw+0s= -github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230928065809-281e00a4a67d h1:JLc/seGGKdnv0JUDCkMprJYzud2E8ahQ3QZgP/Imb14= +github.com/livekit/protocol v1.7.3-0.20230928065809-281e00a4a67d/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/config/config.go b/pkg/config/config.go index b99be109e..7997e7f93 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -222,6 +222,7 @@ type RoomConfig struct { EnableRemoteUnmute bool `yaml:"enable_remote_unmute,omitempty"` MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"` PlayoutDelay PlayoutDelayConfig `yaml:"playout_delay,omitempty"` + SyncStreams bool `yaml:"sync_streams,omitempty"` } type CodecSpec struct { diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 552fc06ce..33ca4921f 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -115,7 +115,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * } streamID := wr.StreamID() - if sub.SupportSyncStreamID() && t.params.MediaTrack.Stream() != "" { + if sub.SupportsSyncStreamID() && t.params.MediaTrack.Stream() != "" { streamID = PackSyncStreamID(t.params.MediaTrack.PublisherID(), t.params.MediaTrack.Stream()) } @@ -246,7 +246,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * } sub.VerifySubscribeParticipantInfo(subTrack.PublisherID(), subTrack.PublisherVersion()) - if sub.ProtocolVersion().SupportsTransceiverReuse() { + if sub.SupportsTransceiverReuse() { // // AddTrack will create a new transceiver or re-use an unused one // if the attributes match. This prevents SDP from bloating diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 30dfea7d0..2d9d21cb7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -122,6 +122,7 @@ type ParticipantParams struct { SubscriptionLimitAudio int32 SubscriptionLimitVideo int32 PlayoutDelay *livekit.PlayoutDelay + SyncStreams bool } type ParticipantImpl struct { @@ -1113,7 +1114,7 @@ func (p *ParticipantImpl) setupTransportManager() error { TCPFallbackRTTThreshold: p.params.TCPFallbackRTTThreshold, AllowUDPUnstableFallback: p.params.AllowUDPUnstableFallback, TURNSEnabled: p.params.TURNSEnabled, - AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled() && p.SupportSyncStreamID(), + AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled() && p.SupportsSyncStreamID(), Logger: p.params.Logger.WithComponent(sutils.ComponentTransport), }) if err != nil { @@ -2250,8 +2251,12 @@ func (p *ParticipantImpl) GetPlayoutDelayConfig() *livekit.PlayoutDelay { return p.params.PlayoutDelay } -func (p *ParticipantImpl) SupportSyncStreamID() bool { - return p.ProtocolVersion().SupportSyncStreamID() && !p.params.ClientInfo.isFirefox() +func (p *ParticipantImpl) SupportsSyncStreamID() bool { + return p.ProtocolVersion().SupportSyncStreamID() && !p.params.ClientInfo.isFirefox() && p.params.SyncStreams +} + +func (p *ParticipantImpl) SupportsTransceiverReuse() bool { + return p.ProtocolVersion().SupportsTransceiverReuse() && !p.SupportsSyncStreamID() } func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 0b0e003ad..3a971fce4 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -303,7 +303,8 @@ type LocalParticipant interface { GetLogger() logger.Logger GetAdaptiveStream() bool ProtocolVersion() ProtocolVersion - SupportSyncStreamID() bool + SupportsSyncStreamID() bool + SupportsTransceiverReuse() bool ConnectedAt() time.Time IsClosed() bool IsReady() bool diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index fedebacf5..ac0b18932 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -793,14 +793,24 @@ type FakeLocalParticipant struct { arg2 livekit.TrackID arg3 bool } - SupportSyncStreamIDStub func() bool - supportSyncStreamIDMutex sync.RWMutex - supportSyncStreamIDArgsForCall []struct { + SupportsSyncStreamIDStub func() bool + supportsSyncStreamIDMutex sync.RWMutex + supportsSyncStreamIDArgsForCall []struct { } - supportSyncStreamIDReturns struct { + supportsSyncStreamIDReturns struct { result1 bool } - supportSyncStreamIDReturnsOnCall map[int]struct { + supportsSyncStreamIDReturnsOnCall map[int]struct { + result1 bool + } + SupportsTransceiverReuseStub func() bool + supportsTransceiverReuseMutex sync.RWMutex + supportsTransceiverReuseArgsForCall []struct { + } + supportsTransceiverReuseReturns struct { + result1 bool + } + supportsTransceiverReuseReturnsOnCall map[int]struct { result1 bool } ToProtoStub func() *livekit.ParticipantInfo @@ -5153,15 +5163,15 @@ func (fake *FakeLocalParticipant) SubscriptionPermissionUpdateArgsForCall(i int) return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeLocalParticipant) SupportSyncStreamID() bool { - fake.supportSyncStreamIDMutex.Lock() - ret, specificReturn := fake.supportSyncStreamIDReturnsOnCall[len(fake.supportSyncStreamIDArgsForCall)] - fake.supportSyncStreamIDArgsForCall = append(fake.supportSyncStreamIDArgsForCall, struct { +func (fake *FakeLocalParticipant) SupportsSyncStreamID() bool { + fake.supportsSyncStreamIDMutex.Lock() + ret, specificReturn := fake.supportsSyncStreamIDReturnsOnCall[len(fake.supportsSyncStreamIDArgsForCall)] + fake.supportsSyncStreamIDArgsForCall = append(fake.supportsSyncStreamIDArgsForCall, struct { }{}) - stub := fake.SupportSyncStreamIDStub - fakeReturns := fake.supportSyncStreamIDReturns - fake.recordInvocation("SupportSyncStreamID", []interface{}{}) - fake.supportSyncStreamIDMutex.Unlock() + stub := fake.SupportsSyncStreamIDStub + fakeReturns := fake.supportsSyncStreamIDReturns + fake.recordInvocation("SupportsSyncStreamID", []interface{}{}) + fake.supportsSyncStreamIDMutex.Unlock() if stub != nil { return stub() } @@ -5171,37 +5181,90 @@ func (fake *FakeLocalParticipant) SupportSyncStreamID() bool { return fakeReturns.result1 } -func (fake *FakeLocalParticipant) SupportSyncStreamIDCallCount() int { - fake.supportSyncStreamIDMutex.RLock() - defer fake.supportSyncStreamIDMutex.RUnlock() - return len(fake.supportSyncStreamIDArgsForCall) +func (fake *FakeLocalParticipant) SupportsSyncStreamIDCallCount() int { + fake.supportsSyncStreamIDMutex.RLock() + defer fake.supportsSyncStreamIDMutex.RUnlock() + return len(fake.supportsSyncStreamIDArgsForCall) } -func (fake *FakeLocalParticipant) SupportSyncStreamIDCalls(stub func() bool) { - fake.supportSyncStreamIDMutex.Lock() - defer fake.supportSyncStreamIDMutex.Unlock() - fake.SupportSyncStreamIDStub = stub +func (fake *FakeLocalParticipant) SupportsSyncStreamIDCalls(stub func() bool) { + fake.supportsSyncStreamIDMutex.Lock() + defer fake.supportsSyncStreamIDMutex.Unlock() + fake.SupportsSyncStreamIDStub = stub } -func (fake *FakeLocalParticipant) SupportSyncStreamIDReturns(result1 bool) { - fake.supportSyncStreamIDMutex.Lock() - defer fake.supportSyncStreamIDMutex.Unlock() - fake.SupportSyncStreamIDStub = nil - fake.supportSyncStreamIDReturns = struct { +func (fake *FakeLocalParticipant) SupportsSyncStreamIDReturns(result1 bool) { + fake.supportsSyncStreamIDMutex.Lock() + defer fake.supportsSyncStreamIDMutex.Unlock() + fake.SupportsSyncStreamIDStub = nil + fake.supportsSyncStreamIDReturns = struct { result1 bool }{result1} } -func (fake *FakeLocalParticipant) SupportSyncStreamIDReturnsOnCall(i int, result1 bool) { - fake.supportSyncStreamIDMutex.Lock() - defer fake.supportSyncStreamIDMutex.Unlock() - fake.SupportSyncStreamIDStub = nil - if fake.supportSyncStreamIDReturnsOnCall == nil { - fake.supportSyncStreamIDReturnsOnCall = make(map[int]struct { +func (fake *FakeLocalParticipant) SupportsSyncStreamIDReturnsOnCall(i int, result1 bool) { + fake.supportsSyncStreamIDMutex.Lock() + defer fake.supportsSyncStreamIDMutex.Unlock() + fake.SupportsSyncStreamIDStub = nil + if fake.supportsSyncStreamIDReturnsOnCall == nil { + fake.supportsSyncStreamIDReturnsOnCall = make(map[int]struct { result1 bool }) } - fake.supportSyncStreamIDReturnsOnCall[i] = struct { + fake.supportsSyncStreamIDReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) SupportsTransceiverReuse() bool { + fake.supportsTransceiverReuseMutex.Lock() + ret, specificReturn := fake.supportsTransceiverReuseReturnsOnCall[len(fake.supportsTransceiverReuseArgsForCall)] + fake.supportsTransceiverReuseArgsForCall = append(fake.supportsTransceiverReuseArgsForCall, struct { + }{}) + stub := fake.SupportsTransceiverReuseStub + fakeReturns := fake.supportsTransceiverReuseReturns + fake.recordInvocation("SupportsTransceiverReuse", []interface{}{}) + fake.supportsTransceiverReuseMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) SupportsTransceiverReuseCallCount() int { + fake.supportsTransceiverReuseMutex.RLock() + defer fake.supportsTransceiverReuseMutex.RUnlock() + return len(fake.supportsTransceiverReuseArgsForCall) +} + +func (fake *FakeLocalParticipant) SupportsTransceiverReuseCalls(stub func() bool) { + fake.supportsTransceiverReuseMutex.Lock() + defer fake.supportsTransceiverReuseMutex.Unlock() + fake.SupportsTransceiverReuseStub = stub +} + +func (fake *FakeLocalParticipant) SupportsTransceiverReuseReturns(result1 bool) { + fake.supportsTransceiverReuseMutex.Lock() + defer fake.supportsTransceiverReuseMutex.Unlock() + fake.SupportsTransceiverReuseStub = nil + fake.supportsTransceiverReuseReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) SupportsTransceiverReuseReturnsOnCall(i int, result1 bool) { + fake.supportsTransceiverReuseMutex.Lock() + defer fake.supportsTransceiverReuseMutex.Unlock() + fake.SupportsTransceiverReuseStub = nil + if fake.supportsTransceiverReuseReturnsOnCall == nil { + fake.supportsTransceiverReuseReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.supportsTransceiverReuseReturnsOnCall[i] = struct { result1 bool }{result1} } @@ -6033,8 +6096,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.subscriptionPermissionMutex.RUnlock() fake.subscriptionPermissionUpdateMutex.RLock() defer fake.subscriptionPermissionUpdateMutex.RUnlock() - fake.supportSyncStreamIDMutex.RLock() - defer fake.supportSyncStreamIDMutex.RUnlock() + fake.supportsSyncStreamIDMutex.RLock() + defer fake.supportsSyncStreamIDMutex.RUnlock() + fake.supportsTransceiverReuseMutex.RLock() + defer fake.supportsTransceiverReuseMutex.RUnlock() fake.toProtoMutex.RLock() defer fake.toProtoMutex.RUnlock() fake.toProtoWithVersionMutex.RLock() diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 6db962549..2d538cdd1 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -68,7 +68,8 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre CreationTime: time.Now().Unix(), TurnPassword: utils.RandomSecret(), } - applyDefaultRoomConfig(rm, &r.config.Room) + internal = &livekit.RoomInternal{} + applyDefaultRoomConfig(rm, internal, &r.config.Room) } else if err != nil { return nil, err } @@ -83,15 +84,18 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre rm.Metadata = req.Metadata } if req.Egress != nil && req.Egress.Tracks != nil { - internal = &livekit.RoomInternal{TrackEgress: req.Egress.Tracks} + internal.TrackEgress = req.Egress.Tracks } if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 { - rm.PlayoutDelay = &livekit.PlayoutDelay{ + internal.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: true, Min: req.MinPlayoutDelay, Max: req.MaxPlayoutDelay, } } + if req.SyncStreams { + internal.SyncStreams = true + } if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil { return nil, err @@ -149,7 +153,7 @@ func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName return nil } -func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) { +func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal, conf *config.RoomConfig) { room.EmptyTimeout = conf.EmptyTimeout room.MaxParticipants = conf.MaxParticipants for _, codec := range conf.EnabledCodecs { @@ -158,9 +162,10 @@ func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) { FmtpLine: codec.FmtpLine, }) } - room.PlayoutDelay = &livekit.PlayoutDelay{ + internal.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: conf.PlayoutDelay.Enabled, Min: uint32(conf.PlayoutDelay.Min), Max: uint32(conf.PlayoutDelay.Max), } + internal.SyncStreams = conf.SyncStreams } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index b436c7bca..5797142e3 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -240,7 +240,7 @@ func (r *RoomManager) StartSession( } defer room.Release() - protoRoom := room.ToProto() + protoRoom, roomInternal := room.ToProto(), room.Internal() // only create the room, but don't start a participant session if pi.Identity == "" { @@ -408,7 +408,8 @@ func (r *RoomManager) StartSession( SubscriberAllowPause: subscriberAllowPause, SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio, SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo, - PlayoutDelay: protoRoom.PlayoutDelay, + PlayoutDelay: roomInternal.GetPlayoutDelay(), + SyncStreams: roomInternal.GetSyncStreams(), }) if err != nil { return err