Add SyncStreams flag to Room (#2110)

* Add SyncStreams flag to Room

* Increase protocol version

* Revert version change

* Move flags to internal & solve comment
This commit is contained in:
cnderrauber
2023-09-28 15:41:44 +08:00
committed by GitHub
parent 4c9d95149d
commit 92a355e1f3
10 changed files with 132 additions and 53 deletions

View File

@@ -162,6 +162,9 @@ keys:
# enabled: true
# min: 100
# max: 2000
# # improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use
# # so not recommended for rooms with frequent subscription changes
# sync_streams: true
# Webhooks
# when configured, LiveKit notifies your URL handler with room events

2
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5
github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2
github.com/livekit/protocol v1.7.3-0.20230928065809-281e00a4a67d
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0

6
go.sum
View File

@@ -127,10 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5 h1:CjXYkNKSrdIn7GMD1ySoXrURhL5U9d6vG32vxcUhzIU=
github.com/livekit/mediatransportutil v0.0.0-20230919184714-b8f0fa0133c5/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.7.3-0.20230919182418-0708b5a5bb84 h1:4WOaspDesbbrjGPsu6Vp1VjcWxTXVjyjdtQAzIUXn5s=
github.com/livekit/protocol v1.7.3-0.20230919182418-0708b5a5bb84/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2 h1:yIRqvyO3qDPO+4EdcHMjsINJYl6KE9AXJUgfChsw+0s=
github.com/livekit/protocol v1.7.3-0.20230920084913-821c244d8ce2/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/protocol v1.7.3-0.20230928065809-281e00a4a67d h1:JLc/seGGKdnv0JUDCkMprJYzud2E8ahQ3QZgP/Imb14=
github.com/livekit/protocol v1.7.3-0.20230928065809-281e00a4a67d/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=

View File

@@ -222,6 +222,7 @@ type RoomConfig struct {
EnableRemoteUnmute bool `yaml:"enable_remote_unmute,omitempty"`
MaxMetadataSize uint32 `yaml:"max_metadata_size,omitempty"`
PlayoutDelay PlayoutDelayConfig `yaml:"playout_delay,omitempty"`
SyncStreams bool `yaml:"sync_streams,omitempty"`
}
type CodecSpec struct {

View File

@@ -115,7 +115,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
}
streamID := wr.StreamID()
if sub.SupportSyncStreamID() && t.params.MediaTrack.Stream() != "" {
if sub.SupportsSyncStreamID() && t.params.MediaTrack.Stream() != "" {
streamID = PackSyncStreamID(t.params.MediaTrack.PublisherID(), t.params.MediaTrack.Stream())
}
@@ -246,7 +246,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
}
sub.VerifySubscribeParticipantInfo(subTrack.PublisherID(), subTrack.PublisherVersion())
if sub.ProtocolVersion().SupportsTransceiverReuse() {
if sub.SupportsTransceiverReuse() {
//
// AddTrack will create a new transceiver or re-use an unused one
// if the attributes match. This prevents SDP from bloating

View File

@@ -122,6 +122,7 @@ type ParticipantParams struct {
SubscriptionLimitAudio int32
SubscriptionLimitVideo int32
PlayoutDelay *livekit.PlayoutDelay
SyncStreams bool
}
type ParticipantImpl struct {
@@ -1113,7 +1114,7 @@ func (p *ParticipantImpl) setupTransportManager() error {
TCPFallbackRTTThreshold: p.params.TCPFallbackRTTThreshold,
AllowUDPUnstableFallback: p.params.AllowUDPUnstableFallback,
TURNSEnabled: p.params.TURNSEnabled,
AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled() && p.SupportSyncStreamID(),
AllowPlayoutDelay: p.params.PlayoutDelay.GetEnabled() && p.SupportsSyncStreamID(),
Logger: p.params.Logger.WithComponent(sutils.ComponentTransport),
})
if err != nil {
@@ -2250,8 +2251,12 @@ func (p *ParticipantImpl) GetPlayoutDelayConfig() *livekit.PlayoutDelay {
return p.params.PlayoutDelay
}
func (p *ParticipantImpl) SupportSyncStreamID() bool {
return p.ProtocolVersion().SupportSyncStreamID() && !p.params.ClientInfo.isFirefox()
func (p *ParticipantImpl) SupportsSyncStreamID() bool {
return p.ProtocolVersion().SupportSyncStreamID() && !p.params.ClientInfo.isFirefox() && p.params.SyncStreams
}
func (p *ParticipantImpl) SupportsTransceiverReuse() bool {
return p.ProtocolVersion().SupportsTransceiverReuse() && !p.SupportsSyncStreamID()
}
func codecsFromMediaDescription(m *sdp.MediaDescription) (out []sdp.Codec, err error) {

View File

@@ -303,7 +303,8 @@ type LocalParticipant interface {
GetLogger() logger.Logger
GetAdaptiveStream() bool
ProtocolVersion() ProtocolVersion
SupportSyncStreamID() bool
SupportsSyncStreamID() bool
SupportsTransceiverReuse() bool
ConnectedAt() time.Time
IsClosed() bool
IsReady() bool

View File

@@ -793,14 +793,24 @@ type FakeLocalParticipant struct {
arg2 livekit.TrackID
arg3 bool
}
SupportSyncStreamIDStub func() bool
supportSyncStreamIDMutex sync.RWMutex
supportSyncStreamIDArgsForCall []struct {
SupportsSyncStreamIDStub func() bool
supportsSyncStreamIDMutex sync.RWMutex
supportsSyncStreamIDArgsForCall []struct {
}
supportSyncStreamIDReturns struct {
supportsSyncStreamIDReturns struct {
result1 bool
}
supportSyncStreamIDReturnsOnCall map[int]struct {
supportsSyncStreamIDReturnsOnCall map[int]struct {
result1 bool
}
SupportsTransceiverReuseStub func() bool
supportsTransceiverReuseMutex sync.RWMutex
supportsTransceiverReuseArgsForCall []struct {
}
supportsTransceiverReuseReturns struct {
result1 bool
}
supportsTransceiverReuseReturnsOnCall map[int]struct {
result1 bool
}
ToProtoStub func() *livekit.ParticipantInfo
@@ -5153,15 +5163,15 @@ func (fake *FakeLocalParticipant) SubscriptionPermissionUpdateArgsForCall(i int)
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) SupportSyncStreamID() bool {
fake.supportSyncStreamIDMutex.Lock()
ret, specificReturn := fake.supportSyncStreamIDReturnsOnCall[len(fake.supportSyncStreamIDArgsForCall)]
fake.supportSyncStreamIDArgsForCall = append(fake.supportSyncStreamIDArgsForCall, struct {
func (fake *FakeLocalParticipant) SupportsSyncStreamID() bool {
fake.supportsSyncStreamIDMutex.Lock()
ret, specificReturn := fake.supportsSyncStreamIDReturnsOnCall[len(fake.supportsSyncStreamIDArgsForCall)]
fake.supportsSyncStreamIDArgsForCall = append(fake.supportsSyncStreamIDArgsForCall, struct {
}{})
stub := fake.SupportSyncStreamIDStub
fakeReturns := fake.supportSyncStreamIDReturns
fake.recordInvocation("SupportSyncStreamID", []interface{}{})
fake.supportSyncStreamIDMutex.Unlock()
stub := fake.SupportsSyncStreamIDStub
fakeReturns := fake.supportsSyncStreamIDReturns
fake.recordInvocation("SupportsSyncStreamID", []interface{}{})
fake.supportsSyncStreamIDMutex.Unlock()
if stub != nil {
return stub()
}
@@ -5171,37 +5181,90 @@ func (fake *FakeLocalParticipant) SupportSyncStreamID() bool {
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) SupportSyncStreamIDCallCount() int {
fake.supportSyncStreamIDMutex.RLock()
defer fake.supportSyncStreamIDMutex.RUnlock()
return len(fake.supportSyncStreamIDArgsForCall)
func (fake *FakeLocalParticipant) SupportsSyncStreamIDCallCount() int {
fake.supportsSyncStreamIDMutex.RLock()
defer fake.supportsSyncStreamIDMutex.RUnlock()
return len(fake.supportsSyncStreamIDArgsForCall)
}
func (fake *FakeLocalParticipant) SupportSyncStreamIDCalls(stub func() bool) {
fake.supportSyncStreamIDMutex.Lock()
defer fake.supportSyncStreamIDMutex.Unlock()
fake.SupportSyncStreamIDStub = stub
func (fake *FakeLocalParticipant) SupportsSyncStreamIDCalls(stub func() bool) {
fake.supportsSyncStreamIDMutex.Lock()
defer fake.supportsSyncStreamIDMutex.Unlock()
fake.SupportsSyncStreamIDStub = stub
}
func (fake *FakeLocalParticipant) SupportSyncStreamIDReturns(result1 bool) {
fake.supportSyncStreamIDMutex.Lock()
defer fake.supportSyncStreamIDMutex.Unlock()
fake.SupportSyncStreamIDStub = nil
fake.supportSyncStreamIDReturns = struct {
func (fake *FakeLocalParticipant) SupportsSyncStreamIDReturns(result1 bool) {
fake.supportsSyncStreamIDMutex.Lock()
defer fake.supportsSyncStreamIDMutex.Unlock()
fake.SupportsSyncStreamIDStub = nil
fake.supportsSyncStreamIDReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) SupportSyncStreamIDReturnsOnCall(i int, result1 bool) {
fake.supportSyncStreamIDMutex.Lock()
defer fake.supportSyncStreamIDMutex.Unlock()
fake.SupportSyncStreamIDStub = nil
if fake.supportSyncStreamIDReturnsOnCall == nil {
fake.supportSyncStreamIDReturnsOnCall = make(map[int]struct {
func (fake *FakeLocalParticipant) SupportsSyncStreamIDReturnsOnCall(i int, result1 bool) {
fake.supportsSyncStreamIDMutex.Lock()
defer fake.supportsSyncStreamIDMutex.Unlock()
fake.SupportsSyncStreamIDStub = nil
if fake.supportsSyncStreamIDReturnsOnCall == nil {
fake.supportsSyncStreamIDReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.supportSyncStreamIDReturnsOnCall[i] = struct {
fake.supportsSyncStreamIDReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) SupportsTransceiverReuse() bool {
fake.supportsTransceiverReuseMutex.Lock()
ret, specificReturn := fake.supportsTransceiverReuseReturnsOnCall[len(fake.supportsTransceiverReuseArgsForCall)]
fake.supportsTransceiverReuseArgsForCall = append(fake.supportsTransceiverReuseArgsForCall, struct {
}{})
stub := fake.SupportsTransceiverReuseStub
fakeReturns := fake.supportsTransceiverReuseReturns
fake.recordInvocation("SupportsTransceiverReuse", []interface{}{})
fake.supportsTransceiverReuseMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) SupportsTransceiverReuseCallCount() int {
fake.supportsTransceiverReuseMutex.RLock()
defer fake.supportsTransceiverReuseMutex.RUnlock()
return len(fake.supportsTransceiverReuseArgsForCall)
}
func (fake *FakeLocalParticipant) SupportsTransceiverReuseCalls(stub func() bool) {
fake.supportsTransceiverReuseMutex.Lock()
defer fake.supportsTransceiverReuseMutex.Unlock()
fake.SupportsTransceiverReuseStub = stub
}
func (fake *FakeLocalParticipant) SupportsTransceiverReuseReturns(result1 bool) {
fake.supportsTransceiverReuseMutex.Lock()
defer fake.supportsTransceiverReuseMutex.Unlock()
fake.SupportsTransceiverReuseStub = nil
fake.supportsTransceiverReuseReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) SupportsTransceiverReuseReturnsOnCall(i int, result1 bool) {
fake.supportsTransceiverReuseMutex.Lock()
defer fake.supportsTransceiverReuseMutex.Unlock()
fake.SupportsTransceiverReuseStub = nil
if fake.supportsTransceiverReuseReturnsOnCall == nil {
fake.supportsTransceiverReuseReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.supportsTransceiverReuseReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
@@ -6033,8 +6096,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.subscriptionPermissionMutex.RUnlock()
fake.subscriptionPermissionUpdateMutex.RLock()
defer fake.subscriptionPermissionUpdateMutex.RUnlock()
fake.supportSyncStreamIDMutex.RLock()
defer fake.supportSyncStreamIDMutex.RUnlock()
fake.supportsSyncStreamIDMutex.RLock()
defer fake.supportsSyncStreamIDMutex.RUnlock()
fake.supportsTransceiverReuseMutex.RLock()
defer fake.supportsTransceiverReuseMutex.RUnlock()
fake.toProtoMutex.RLock()
defer fake.toProtoMutex.RUnlock()
fake.toProtoWithVersionMutex.RLock()

View File

@@ -68,7 +68,8 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
CreationTime: time.Now().Unix(),
TurnPassword: utils.RandomSecret(),
}
applyDefaultRoomConfig(rm, &r.config.Room)
internal = &livekit.RoomInternal{}
applyDefaultRoomConfig(rm, internal, &r.config.Room)
} else if err != nil {
return nil, err
}
@@ -83,15 +84,18 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
rm.Metadata = req.Metadata
}
if req.Egress != nil && req.Egress.Tracks != nil {
internal = &livekit.RoomInternal{TrackEgress: req.Egress.Tracks}
internal.TrackEgress = req.Egress.Tracks
}
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
rm.PlayoutDelay = &livekit.PlayoutDelay{
internal.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: true,
Min: req.MinPlayoutDelay,
Max: req.MaxPlayoutDelay,
}
}
if req.SyncStreams {
internal.SyncStreams = true
}
if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil {
return nil, err
@@ -149,7 +153,7 @@ func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName
return nil
}
func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) {
func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal, conf *config.RoomConfig) {
room.EmptyTimeout = conf.EmptyTimeout
room.MaxParticipants = conf.MaxParticipants
for _, codec := range conf.EnabledCodecs {
@@ -158,9 +162,10 @@ func applyDefaultRoomConfig(room *livekit.Room, conf *config.RoomConfig) {
FmtpLine: codec.FmtpLine,
})
}
room.PlayoutDelay = &livekit.PlayoutDelay{
internal.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: conf.PlayoutDelay.Enabled,
Min: uint32(conf.PlayoutDelay.Min),
Max: uint32(conf.PlayoutDelay.Max),
}
internal.SyncStreams = conf.SyncStreams
}

View File

@@ -240,7 +240,7 @@ func (r *RoomManager) StartSession(
}
defer room.Release()
protoRoom := room.ToProto()
protoRoom, roomInternal := room.ToProto(), room.Internal()
// only create the room, but don't start a participant session
if pi.Identity == "" {
@@ -408,7 +408,8 @@ func (r *RoomManager) StartSession(
SubscriberAllowPause: subscriberAllowPause,
SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio,
SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo,
PlayoutDelay: protoRoom.PlayoutDelay,
PlayoutDelay: roomInternal.GetPlayoutDelay(),
SyncStreams: roomInternal.GetSyncStreams(),
})
if err != nil {
return err