diff --git a/config-sample.yaml b/config-sample.yaml index b204340c4..4f9fb3a1a 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -112,12 +112,6 @@ rtc: # batch_io: # batch_size: 128 # max_flush_interval: 2ms - # # force a reconnect on a publication error - # reconnect_on_publication_error: true - # # force a reconnect on a subscription error - # reconnect_on_subscription_error: true - # # force a reconnect on a data channel error - # reconnect_on_data_channel_error: true # # max number of bytes to buffer for data channel. 0 means unlimited. # # when this limit is breached, data messages will be dropped till the buffered amount drops below this limit. # data_channel_max_buffered_amount: 0 diff --git a/go.mod b/go.mod index 8a990ac1a..8ec9cd47e 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca - github.com/livekit/psrpc v0.3.3 + github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd + github.com/livekit/psrpc v0.4.0 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.7.0 @@ -78,7 +78,7 @@ require ( github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect github.com/nats-io/nats.go v1.31.0 // indirect - github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nkeys v0.4.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect github.com/pion/logging v0.2.2 // indirect diff --git a/go.sum b/go.sum index 9927c1032..53e811338 100644 --- a/go.sum +++ b/go.sum @@ -125,10 +125,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca h1:M3gymSJFriayMERPin3BSGgYTUpLVfGaMgmqp+3JV6I= -github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y= -github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= -github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= +github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd h1:5By2nxIMS9MApyJ654STmtwb0V4MeuaH77lqUTxV3nw= +github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd/go.mod h1:oTWtPGfpZSJGKRrbSvDQK0jiuUylYzhiw/bnGB4Cqko= +github.com/livekit/psrpc v0.4.0 h1:oC4l99HSot/aUza4ZR3ZcZW1SRZm34KCHX3wAbmw6Lo= +github.com/livekit/psrpc v0.4.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -162,8 +162,8 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= -github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= +github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index ecb1c6c7c..e7181038d 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -133,7 +133,7 @@ func (r *signalClient) StartParticipantSignal( signalResponseMessageReader{}, r.config, ) - l.Infow("signal stream closed", "error", err) + l.Debugw("signal stream closed", "error", err) resChan.Close() }() diff --git a/pkg/rtc/clientinfo.go b/pkg/rtc/clientinfo.go index 7912968b0..62117e0d1 100644 --- a/pkg/rtc/clientinfo.go +++ b/pkg/rtc/clientinfo.go @@ -37,6 +37,10 @@ func (c ClientInfo) isGo() bool { return c.ClientInfo != nil && c.ClientInfo.Sdk == livekit.ClientInfo_GO } +func (c ClientInfo) isLinux() bool { + return c.ClientInfo != nil && strings.EqualFold(c.ClientInfo.Os, "linux") +} + func (c ClientInfo) SupportsAudioRED() bool { return !c.isFirefox() && !c.isSafari() } @@ -80,6 +84,10 @@ func (c ClientInfo) SupportsChangeRTPSenderEncodingActive() bool { return !c.isFirefox() } +func (c ClientInfo) ComplyWithCodecOrderInSDPAnswer() bool { + return !(c.isLinux() && c.isFirefox()) +} + // compareVersion compares a semver against the current client SDK version // returning 1 if current version is greater than version // 0 if they are the same, and -1 if it's an earlier version diff --git a/pkg/rtc/participant_sdp.go b/pkg/rtc/participant_sdp.go index 9bacce907..70829a34c 100644 --- a/pkg/rtc/participant_sdp.go +++ b/pkg/rtc/participant_sdp.go @@ -180,7 +180,10 @@ func (p *ParticipantImpl) setCodecPreferencesVideoForPublisher(offer webrtc.Sess } unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats[:0], preferredCodecs...) - unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats, leftCodecs...) + // if the client don't comply with codec order in SDP answer, only keep preferred codecs to force client to use it + if p.params.ClientInfo.ComplyWithCodecOrderInSDPAnswer() { + unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats, leftCodecs...) + } } } diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 0a77a231f..d971aaa34 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -190,7 +190,17 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa } func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { - return s.StartEgressWithClusterId(ctx, "", req) + info, err := s.StartEgressWithClusterId(ctx, "", req) + if err != nil { + return nil, err + } + + _, err = s.io.CreateEgress(ctx, info) + if err != nil { + logger.Errorw("failed to create egress", err) + } + + return info, nil } func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { @@ -203,17 +213,7 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId req.EgressId = utils.NewGuid(utils.EgressPrefix) } - info, err := s.client.StartEgress(ctx, clusterId, req) - if err != nil { - return nil, err - } - - _, err = s.io.CreateEgress(ctx, info) - if err != nil { - logger.Errorw("failed to create egress", err) - } - - return info, nil + return s.client.StartEgress(ctx, clusterId, req) } type LayoutMetadata struct { @@ -283,10 +283,6 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr return info, nil } -func (s *EgressService) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutputsRequest) (*livekit.EgressInfo, error) { - return nil, twirp.NewError(twirp.Unimplemented, "Update Outputs unimplemented") -} - func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) { if req.RoomName != "" { AppendLogFields(ctx, "room", req.RoomName) diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index ba390554f..653269225 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -83,6 +83,6 @@ type IOClient interface { //counterfeiter:generate . RoomAllocator type RoomAllocator interface { - CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) + CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error } diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 2d538cdd1..f222f7b15 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -50,10 +50,10 @@ func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore // CreateRoom creates a new room from a request and allocates it to a node to handle // it'll also monitor its state, and cleans it up when appropriate -func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) { +func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) { token, err := r.roomStore.LockRoom(ctx, livekit.RoomName(req.Name), 5*time.Second) if err != nil { - return nil, err + return nil, false, err } defer func() { _ = r.roomStore.UnlockRoom(ctx, livekit.RoomName(req.Name), token) @@ -71,7 +71,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre internal = &livekit.RoomInternal{} applyDefaultRoomConfig(rm, internal, &r.config.Room) } else if err != nil { - return nil, err + return nil, false, err } if req.EmptyTimeout > 0 { @@ -98,23 +98,23 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre } if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil { - return nil, err + return nil, false, err } // check if room already assigned existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name)) if err != routing.ErrNotFound && err != nil { - return nil, err + return nil, false, err } // if already assigned and still available, keep it on that node if err == nil && selector.IsAvailable(existing) { // if node hosting the room is full, deny entry if selector.LimitsReached(r.config.Limit, existing.Stats) { - return nil, routing.ErrNodeLimitReached + return nil, false, routing.ErrNodeLimitReached } - return rm, nil + return rm, false, nil } // select a new node @@ -122,12 +122,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre if nodeID == "" { nodes, err := r.router.ListNodes() if err != nil { - return nil, err + return nil, false, err } node, err := r.selector.SelectNode(nodes) if err != nil { - return nil, err + return nil, false, err } nodeID = livekit.NodeID(node.Id) @@ -136,10 +136,10 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre logger.Infow("selected node for room", "room", rm.Name, "roomID", rm.Sid, "selectedNodeID", nodeID) err = r.router.SetNodeForRoom(ctx, livekit.RoomName(rm.Name), nodeID) if err != nil { - return nil, err + return nil, false, err } - return rm, nil + return rm, true, nil } func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error { diff --git a/pkg/service/roomallocator_test.go b/pkg/service/roomallocator_test.go index 4397e22c4..8c87dc8c2 100644 --- a/pkg/service/roomallocator_test.go +++ b/pkg/service/roomallocator_test.go @@ -39,7 +39,7 @@ func TestCreateRoom(t *testing.T) { ra, conf := newTestRoomAllocator(t, conf, node) - room, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) + room, _, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) require.NoError(t, err) require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout) require.NotEmpty(t, room.EnabledCodecs) @@ -57,7 +57,7 @@ func TestCreateRoom(t *testing.T) { ra, _ := newTestRoomAllocator(t, conf, node) - _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) + _, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) require.ErrorIs(t, err, routing.ErrNodeLimitReached) }) @@ -73,7 +73,7 @@ func TestCreateRoom(t *testing.T) { ra, _ := newTestRoomAllocator(t, conf, node) - _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) + _, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"}) require.ErrorIs(t, err, routing.ErrNodeLimitReached) }) } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 8a21933dd..14caa049e 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -433,10 +433,12 @@ func (r *RoomManager) StartSession( _ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false) return err } - if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil { - pLogger.Errorw("could not join register participant topic", err) - _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) - return err + if r.config.PSRPC.Enabled { + if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil { + pLogger.Errorw("could not join register participant topic", err) + _ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false) + return err + } } if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil { pLogger.Errorw("could not store participant", err) @@ -461,7 +463,9 @@ func (r *RoomManager) StartSession( pLogger.Errorw("could not delete participant", err) } - r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())) + if r.config.PSRPC.Enabled { + r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())) + } // update room store with new numParticipants proto := room.ToProto() @@ -503,8 +507,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room return nil, err } - if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil { - return nil, err + if r.config.PSRPC.Enabled { + if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil { + return nil, err + } } r.lock.Lock() @@ -525,7 +531,9 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher) newRoom.OnClose(func() { - r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName)) + if r.config.PSRPC.Enabled { + r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName)) + } roomInfo := newRoom.ToProto() r.telemetry.RoomEnded(ctx, roomInfo) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 2f75ac00c..dd9cefebc 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -79,7 +79,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, ErrEgressNotConnected } - rm, err := s.roomAllocator.CreateRoom(ctx, req) + rm, created, err := s.roomAllocator.CreateRoom(ctx, req) if err != nil { err = errors.Wrap(err, "could not create room") return nil, err @@ -109,7 +109,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, err } - if req.Egress != nil && req.Egress.Room != nil { + if created && req.Egress != nil && req.Egress.Room != nil { egress := &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_RoomComposite{ RoomComposite: req.Egress.Room, @@ -424,7 +424,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat // no one has joined the room, would not have been created on an RTC node. // in this case, we'd want to run create again - _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{ + _, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{ Name: req.Room, Metadata: req.Metadata, }) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index dc203fec9..b0683115b 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -517,7 +517,7 @@ type connectionResult struct { func (s *RTCService) startConnection(ctx context.Context, roomName livekit.RoomName, pi routing.ParticipantInit, timeout time.Duration) (connectionResult, *livekit.SignalResponse, error) { var cr connectionResult var err error - cr.Room, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)}) + cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)}) if err != nil { return cr, nil, err } diff --git a/pkg/service/servicefakes/fake_room_allocator.go b/pkg/service/servicefakes/fake_room_allocator.go index e88dde554..134b9649b 100644 --- a/pkg/service/servicefakes/fake_room_allocator.go +++ b/pkg/service/servicefakes/fake_room_allocator.go @@ -10,7 +10,7 @@ import ( ) type FakeRoomAllocator struct { - CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error) + CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error) createRoomMutex sync.RWMutex createRoomArgsForCall []struct { arg1 context.Context @@ -18,11 +18,13 @@ type FakeRoomAllocator struct { } createRoomReturns struct { result1 *livekit.Room - result2 error + result2 bool + result3 error } createRoomReturnsOnCall map[int]struct { result1 *livekit.Room - result2 error + result2 bool + result3 error } ValidateCreateRoomStub func(context.Context, livekit.RoomName) error validateCreateRoomMutex sync.RWMutex @@ -40,7 +42,7 @@ type FakeRoomAllocator struct { invocationsMutex sync.RWMutex } -func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, error) { +func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, bool, error) { fake.createRoomMutex.Lock() ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)] fake.createRoomArgsForCall = append(fake.createRoomArgsForCall, struct { @@ -55,9 +57,9 @@ func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.Cr return stub(arg1, arg2) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1, ret.result2, ret.result3 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } func (fake *FakeRoomAllocator) CreateRoomCallCount() int { @@ -66,7 +68,7 @@ func (fake *FakeRoomAllocator) CreateRoomCallCount() int { return len(fake.createRoomArgsForCall) } -func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error)) { +func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error)) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = stub @@ -79,30 +81,33 @@ func (fake *FakeRoomAllocator) CreateRoomArgsForCall(i int) (context.Context, *l return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 error) { +func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 bool, result3 error) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = nil fake.createRoomReturns = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 bool + result3 error + }{result1, result2, result3} } -func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) { +func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 bool, result3 error) { fake.createRoomMutex.Lock() defer fake.createRoomMutex.Unlock() fake.CreateRoomStub = nil if fake.createRoomReturnsOnCall == nil { fake.createRoomReturnsOnCall = make(map[int]struct { result1 *livekit.Room - result2 error + result2 bool + result3 error }) } fake.createRoomReturnsOnCall[i] = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 bool + result3 error + }{result1, result2, result3} } func (fake *FakeRoomAllocator) ValidateCreateRoom(arg1 context.Context, arg2 livekit.RoomName) error { diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 862d79e44..4c9e085be 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -164,7 +164,7 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe signalRequestMessageReader{}, r.config, ) - l.Infow("signal stream closed", "error", err) + l.Debugw("signal stream closed", "error", err) reqChan.Close() }() diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 77300f7ab..3869ebae8 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -154,7 +154,7 @@ func (r *RTPStatsReceiver) Update( return } } - if -gapSN >= cNumSequenceNumbers { + if -gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap negative", nil, "extStartSN", r.sequenceNumber.GetExtendedStart(), @@ -226,7 +226,7 @@ func (r *RTPStatsReceiver) Update( flowState.ExtSequenceNumber = resSN.ExtendedVal flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order - if gapSN >= cNumSequenceNumbers { + if gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap", nil, "extStartSN", r.sequenceNumber.GetExtendedStart(), @@ -333,7 +333,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) || (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { - if r.clockSkewCount%10 == 0 { + if r.clockSkewCount%100 == 0 { r.logger.Infow( "clock rate skew", "first", r.srFirst.ToString(), diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 59e01a831..ff262c11c 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -286,7 +286,7 @@ func (r *RTPStatsSender) Update( // do not start on a padding only packet return } - if -gapSN >= cNumSequenceNumbers { + if -gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap negative", nil, "extStartSN", r.extStartSN, @@ -352,7 +352,7 @@ func (r *RTPStatsSender) Update( r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, true) } } else { // in-order - if gapSN >= cNumSequenceNumbers { + if gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap", nil, "extStartSN", r.extStartSN, @@ -394,7 +394,22 @@ func (r *RTPStatsSender) Update( "tsBefore", r.extStartTS, "tsAfter", extTimestamp, ) - r.extStartTS = extTimestamp + if extTimestamp == 0 { // TODO-REMOVE-AFTER-DEBUG + r.logger.Errorw( + "invalid start timestamp", nil, + "snBefore", r.extStartSN, + "snAfter", extSequenceNumber, + "snHighest", r.extHighestSN, + "tsBefore", r.extStartTS, + "tsAfter", extTimestamp, + "tsHighest", r.extHighestTS, + "firstTime", r.firstTime.String(), + "startTime", r.startTime.String(), + ) + } + if extTimestamp != 0 { + r.extStartTS = extTimestamp + } } if extTimestamp > r.extHighestTS { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index a7086386c..22d745eb5 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -270,7 +270,9 @@ type DownTrack struct { pacer pacer.Pacer - maxLayerNotifierCh chan struct{} + maxLayerNotifierChMu sync.RWMutex + maxLayerNotifierCh chan struct{} + maxLayerNotifierChClosed bool cbMu sync.RWMutex onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) @@ -638,14 +640,18 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { } func (d *DownTrack) postMaxLayerNotifierEvent() { - if d.IsClosed() || d.kind != webrtc.RTPCodecTypeVideo { + if d.kind != webrtc.RTPCodecTypeVideo { return } - select { - case d.maxLayerNotifierCh <- struct{}{}: - default: + d.maxLayerNotifierChMu.RLock() + if !d.maxLayerNotifierChClosed { + select { + case d.maxLayerNotifierCh <- struct{}{}: + default: + } } + d.maxLayerNotifierChMu.RUnlock() } func (d *DownTrack) maxLayerNotifierWorker() { @@ -960,6 +966,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } d.bound.Store(false) + d.onBindAndConnectedChange() d.params.Logger.Debugw("closing sender", "kind", d.kind) } d.params.Receiver.DeleteDownTrack(d.params.SubID) @@ -975,7 +982,10 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtpStats.Stop() d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) + d.maxLayerNotifierChMu.Lock() + d.maxLayerNotifierChClosed = true close(d.maxLayerNotifierCh) + d.maxLayerNotifierChMu.Unlock() if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { onCloseHandler(!flush) diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index ee6ed8bdb..4e26a8735 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -84,6 +84,7 @@ type sequencer struct { size int startTime int64 initialized bool + extStartSN uint64 extHighestSN uint64 snOffset uint64 extHighestTS uint64 @@ -133,46 +134,61 @@ func (s *sequencer) push( if !s.initialized { s.initialized = true - s.extHighestSN = extModifiedSN - 1 + s.extStartSN = extModifiedSN + s.extHighestSN = extModifiedSN s.extHighestTS = extModifiedTS s.updateSNOffset() } - snOffset := s.snOffset - diff := int64(extModifiedSN - s.extHighestSN) - if diff >= 0 { - s.extHighestSN = extModifiedSN - } else { - if diff < -int64(s.size) { - s.logger.Warnw( - "old packet, cannot be sequenced", nil, - "extHighestSN", s.extHighestSN, - "extIncomingSN", extIncomingSN, - "extModifiedSN", extModifiedSN, - ) - return - } + if extModifiedSN < s.extStartSN { + // old packet, should not happen + return + } + extHighestSNAdjusted := s.extHighestSN - s.snOffset + extModifiedSNAdjusted := extModifiedSN - s.snOffset + if extModifiedSN < s.extHighestSN { if s.snRangeMap != nil { - var err error - snOffset, err = s.snRangeMap.GetValue(extModifiedSN) + snOffset, err := s.snRangeMap.GetValue(extModifiedSN) if err != nil { s.logger.Errorw( "could not get sequence number offset", err, + "extStartSN", s.extStartSN, "extHighestSN", s.extHighestSN, "extIncomingSN", extIncomingSN, "extModifiedSN", extModifiedSN, + "snOffset", s.snOffset, ) return } + + extModifiedSNAdjusted = extModifiedSN - snOffset + } + } + + if int64(extModifiedSNAdjusted-extHighestSNAdjusted) <= -int64(s.size) { + s.logger.Warnw( + "old packet, cannot be sequenced", nil, + "extHighestSN", s.extHighestSN, + "extIncomingSN", extIncomingSN, + "extModifiedSN", extModifiedSN, + ) + return + } + + // invalidate missing sequence numbers + if extModifiedSNAdjusted > extHighestSNAdjusted { + numInvalidated := 0 + for esn := extHighestSNAdjusted + 1; esn != extModifiedSNAdjusted; esn++ { + s.invalidateSlot(int(esn % uint64(s.size))) + numInvalidated++ + if numInvalidated >= s.size { + break + } } } - if int64(extModifiedTS-s.extHighestTS) >= 0 { - s.extHighestTS = extModifiedTS - } - - slot := (extModifiedSN - snOffset) % uint64(s.size) + slot := extModifiedSNAdjusted % uint64(s.size) s.meta[slot] = packetMeta{ sourceSeqNo: uint16(extIncomingSN), targetSeqNo: uint16(extModifiedSN), @@ -183,6 +199,13 @@ func (s *sequencer) push( ddBytes: append([]byte{}, ddBytes...), lastNack: s.getRefTime(packetTime), // delay retransmissions after the original transmission } + + if extModifiedSN > s.extHighestSN { + s.extHighestSN = extModifiedSN + } + if extModifiedTS > s.extHighestTS { + s.extHighestTS = extModifiedTS + } } func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive uint64) { @@ -223,10 +246,7 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui } slot := (sn - snOffset) % uint64(s.size) - s.meta[slot] = packetMeta{ - sourceSeqNo: 0, - targetSeqNo: 0, - } + s.invalidateSlot(int(slot)) } return } @@ -244,6 +264,10 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { s.Lock() defer s.Unlock() + if !s.initialized { + return nil + } + snOffset := uint64(0) var err error extPacketMetas := make([]extPacketMeta, 0, len(seqNo)) @@ -263,11 +287,6 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { extSN -= (1 << 16) } - if s.extHighestSN-extSN >= uint64(s.size) { - // too old - continue - } - if s.snRangeMap != nil { snOffset, err = s.snRangeMap.GetValue(extSN) if err != nil { @@ -276,9 +295,17 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { } } - slot := (extSN - snOffset) % uint64(s.size) + extSNAdjusted := extSN - snOffset + extHighestSNAdjusted := s.extHighestSN - s.snOffset + if extHighestSNAdjusted-extSNAdjusted >= uint64(s.size) { + // too old + continue + } + + slot := extSNAdjusted % uint64(s.size) meta := &s.meta[slot] - if meta.targetSeqNo != sn { + if meta.targetSeqNo != sn || s.isInvalidSlot(int(slot)) { + // invalid slot access could happen if padding packets exclusion range could not be recorded continue } @@ -320,3 +347,24 @@ func (s *sequencer) updateSNOffset() { } s.snOffset = snOffset } + +func (s *sequencer) invalidateSlot(slot int) { + if slot >= len(s.meta) { + return + } + + s.meta[slot] = packetMeta{ + sourceSeqNo: 0, + targetSeqNo: 0, + lastNack: 0, + } +} + +func (s *sequencer) isInvalidSlot(slot int) bool { + if slot >= len(s.meta) { + return true + } + + meta := &s.meta[slot] + return meta.sourceSeqNo == 0 && meta.targetSeqNo == 0 && meta.lastNack == 0 +} diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 9aa82f91e..0aba20ff9 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -651,6 +651,12 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer) } + // SVC-TODO: better SVC detection + if s.isSVC { + // there is only one stream in SVC + return ts, nil + } + if layer != referenceLayer && s.layerOffsets[referenceLayer][layer] == 0 { return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer) } diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 9d29427be..afff336c3 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -88,11 +88,6 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r return } - if !d.currentLayer.IsValid() && !extPkt.KeyFrame { - d.decisions.AddDropped(extFrameNum) - return - } - if ddwdt.StructureUpdated { d.updateDependencyStructure(dd.AttachedStructure, ddwdt.DecodeTargets) } @@ -149,7 +144,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r return } - // // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame + // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame if dti == dede.DecodeTargetNotPresent { // d.logger.Debugw(fmt.Sprintf("drop packet for decode target not present, highestDecodeTarget %d, incoming %v, fn: %d/%d", // highestDecodeTarget, @@ -193,6 +188,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r "req", d.requestSpatial, "maxSeen", d.maxSeenLayer, "feed", extPkt.Packet.SSRC, + "frame", extFrameNum, ) } @@ -201,7 +197,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r d.previousActiveDecodeTargetsBitmask = d.activeDecodeTargetsBitmask d.activeDecodeTargetsBitmask = buffer.GetActiveDecodeTargetBitmask(d.currentLayer, ddwdt.DecodeTargets) - d.logger.Debugw("switch to target", "highest", highestDecodeTarget.Layer, "current", d.currentLayer, "bitmask", *d.activeDecodeTargetsBitmask) + d.logger.Debugw("switch to target", "highest", highestDecodeTarget.Layer, "current", d.currentLayer, "bitmask", *d.activeDecodeTargetsBitmask, "frame", extFrameNum) } ddExtension := &dede.DependencyDescriptorExtension{ diff --git a/pkg/sfu/videolayerselector/framechain.go b/pkg/sfu/videolayerselector/framechain.go index 5edddfad9..613f2d264 100644 --- a/pkg/sfu/videolayerselector/framechain.go +++ b/pkg/sfu/videolayerselector/framechain.go @@ -49,7 +49,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate if fd.ChainDiffs[fc.chainIdx] == 0 { if fc.broken { fc.broken = false - fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx) + fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx, "frame", extFrameNum) } fc.expectFrames = fc.expectFrames[:0] return true @@ -62,7 +62,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate prevFrameInChain := extFrameNum - uint64(fd.ChainDiffs[fc.chainIdx]) sd, err := fc.decisions.GetDecision(prevFrameInChain) if err != nil { - fc.logger.Debugw("could not get decision", "err", err, "frame", extFrameNum, "prevFrame", prevFrameInChain) + fc.logger.Debugw("could not get decision", "err", err, "chanIdx", fc.chainIdx, "frame", extFrameNum, "prevFrame", prevFrameInChain) } var intact bool