Merge remote-tracking branch 'origin/master' into raja_fr

This commit is contained in:
boks1971
2023-10-26 00:00:41 +05:30
22 changed files with 217 additions and 128 deletions
-6
View File
@@ -112,12 +112,6 @@ rtc:
# batch_io:
# batch_size: 128
# max_flush_interval: 2ms
# # force a reconnect on a publication error
# reconnect_on_publication_error: true
# # force a reconnect on a subscription error
# reconnect_on_subscription_error: true
# # force a reconnect on a data channel error
# reconnect_on_data_channel_error: true
# # max number of bytes to buffer for data channel. 0 means unlimited.
# # when this limit is breached, data messages will be dropped till the buffered amount drops below this limit.
# data_channel_max_buffered_amount: 0
+3 -3
View File
@@ -18,8 +18,8 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca
github.com/livekit/psrpc v0.3.3
github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd
github.com/livekit/psrpc v0.4.0
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.7.0
@@ -78,7 +78,7 @@ require (
github.com/mdlayher/netlink v1.7.1 // indirect
github.com/mdlayher/socket v0.4.0 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/logging v0.2.2 // indirect
+6 -6
View File
@@ -125,10 +125,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca h1:M3gymSJFriayMERPin3BSGgYTUpLVfGaMgmqp+3JV6I=
github.com/livekit/protocol v1.8.1-0.20231023052729-5fd3ded802ca/go.mod h1:K47UnOyc9C0L9LEBDznwd1NMBSsohgyhfurshU5f+4Y=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd h1:5By2nxIMS9MApyJ654STmtwb0V4MeuaH77lqUTxV3nw=
github.com/livekit/protocol v1.8.1-0.20231024024326-07ca9d4e47bd/go.mod h1:oTWtPGfpZSJGKRrbSvDQK0jiuUylYzhiw/bnGB4Cqko=
github.com/livekit/psrpc v0.4.0 h1:oC4l99HSot/aUza4ZR3ZcZW1SRZm34KCHX3wAbmw6Lo=
github.com/livekit/psrpc v0.4.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
@@ -162,8 +162,8 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+1 -1
View File
@@ -133,7 +133,7 @@ func (r *signalClient) StartParticipantSignal(
signalResponseMessageReader{},
r.config,
)
l.Infow("signal stream closed", "error", err)
l.Debugw("signal stream closed", "error", err)
resChan.Close()
}()
+8
View File
@@ -37,6 +37,10 @@ func (c ClientInfo) isGo() bool {
return c.ClientInfo != nil && c.ClientInfo.Sdk == livekit.ClientInfo_GO
}
func (c ClientInfo) isLinux() bool {
return c.ClientInfo != nil && strings.EqualFold(c.ClientInfo.Os, "linux")
}
func (c ClientInfo) SupportsAudioRED() bool {
return !c.isFirefox() && !c.isSafari()
}
@@ -80,6 +84,10 @@ func (c ClientInfo) SupportsChangeRTPSenderEncodingActive() bool {
return !c.isFirefox()
}
func (c ClientInfo) ComplyWithCodecOrderInSDPAnswer() bool {
return !(c.isLinux() && c.isFirefox())
}
// compareVersion compares a semver against the current client SDK version
// returning 1 if current version is greater than version
// 0 if they are the same, and -1 if it's an earlier version
+4 -1
View File
@@ -180,7 +180,10 @@ func (p *ParticipantImpl) setCodecPreferencesVideoForPublisher(offer webrtc.Sess
}
unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats[:0], preferredCodecs...)
unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats, leftCodecs...)
// if the client don't comply with codec order in SDP answer, only keep preferred codecs to force client to use it
if p.params.ClientInfo.ComplyWithCodecOrderInSDPAnswer() {
unmatchVideo.MediaName.Formats = append(unmatchVideo.MediaName.Formats, leftCodecs...)
}
}
}
+12 -16
View File
@@ -190,7 +190,17 @@ func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomNa
}
func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
return s.StartEgressWithClusterId(ctx, "", req)
info, err := s.StartEgressWithClusterId(ctx, "", req)
if err != nil {
return nil, err
}
_, err = s.io.CreateEgress(ctx, info)
if err != nil {
logger.Errorw("failed to create egress", err)
}
return info, nil
}
func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
@@ -203,17 +213,7 @@ func (s *egressLauncher) StartEgressWithClusterId(ctx context.Context, clusterId
req.EgressId = utils.NewGuid(utils.EgressPrefix)
}
info, err := s.client.StartEgress(ctx, clusterId, req)
if err != nil {
return nil, err
}
_, err = s.io.CreateEgress(ctx, info)
if err != nil {
logger.Errorw("failed to create egress", err)
}
return info, nil
return s.client.StartEgress(ctx, clusterId, req)
}
type LayoutMetadata struct {
@@ -283,10 +283,6 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
return info, nil
}
func (s *EgressService) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutputsRequest) (*livekit.EgressInfo, error) {
return nil, twirp.NewError(twirp.Unimplemented, "Update Outputs unimplemented")
}
func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) {
if req.RoomName != "" {
AppendLogFields(ctx, "room", req.RoomName)
+1 -1
View File
@@ -83,6 +83,6 @@ type IOClient interface {
//counterfeiter:generate . RoomAllocator
type RoomAllocator interface {
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error)
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error)
ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error
}
+11 -11
View File
@@ -50,10 +50,10 @@ func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore
// CreateRoom creates a new room from a request and allocates it to a node to handle
// it'll also monitor its state, and cleans it up when appropriate
func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) {
token, err := r.roomStore.LockRoom(ctx, livekit.RoomName(req.Name), 5*time.Second)
if err != nil {
return nil, err
return nil, false, err
}
defer func() {
_ = r.roomStore.UnlockRoom(ctx, livekit.RoomName(req.Name), token)
@@ -71,7 +71,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
internal = &livekit.RoomInternal{}
applyDefaultRoomConfig(rm, internal, &r.config.Room)
} else if err != nil {
return nil, err
return nil, false, err
}
if req.EmptyTimeout > 0 {
@@ -98,23 +98,23 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
}
if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil {
return nil, err
return nil, false, err
}
// check if room already assigned
existing, err := r.router.GetNodeForRoom(ctx, livekit.RoomName(rm.Name))
if err != routing.ErrNotFound && err != nil {
return nil, err
return nil, false, err
}
// if already assigned and still available, keep it on that node
if err == nil && selector.IsAvailable(existing) {
// if node hosting the room is full, deny entry
if selector.LimitsReached(r.config.Limit, existing.Stats) {
return nil, routing.ErrNodeLimitReached
return nil, false, routing.ErrNodeLimitReached
}
return rm, nil
return rm, false, nil
}
// select a new node
@@ -122,12 +122,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
if nodeID == "" {
nodes, err := r.router.ListNodes()
if err != nil {
return nil, err
return nil, false, err
}
node, err := r.selector.SelectNode(nodes)
if err != nil {
return nil, err
return nil, false, err
}
nodeID = livekit.NodeID(node.Id)
@@ -136,10 +136,10 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
logger.Infow("selected node for room", "room", rm.Name, "roomID", rm.Sid, "selectedNodeID", nodeID)
err = r.router.SetNodeForRoom(ctx, livekit.RoomName(rm.Name), nodeID)
if err != nil {
return nil, err
return nil, false, err
}
return rm, nil
return rm, true, nil
}
func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error {
+3 -3
View File
@@ -39,7 +39,7 @@ func TestCreateRoom(t *testing.T) {
ra, conf := newTestRoomAllocator(t, conf, node)
room, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
room, _, err := ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
require.NoError(t, err)
require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout)
require.NotEmpty(t, room.EnabledCodecs)
@@ -57,7 +57,7 @@ func TestCreateRoom(t *testing.T) {
ra, _ := newTestRoomAllocator(t, conf, node)
_, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
_, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
require.ErrorIs(t, err, routing.ErrNodeLimitReached)
})
@@ -73,7 +73,7 @@ func TestCreateRoom(t *testing.T) {
ra, _ := newTestRoomAllocator(t, conf, node)
_, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
_, _, err = ra.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "low-limit-room"})
require.ErrorIs(t, err, routing.ErrNodeLimitReached)
})
}
+16 -8
View File
@@ -433,10 +433,12 @@ func (r *RoomManager) StartSession(
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false)
return err
}
if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil {
pLogger.Errorw("could not join register participant topic", err)
_ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false)
return err
if r.config.PSRPC.Enabled {
if err := r.roomServer.RegisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity())); err != nil {
pLogger.Errorw("could not join register participant topic", err)
_ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false)
return err
}
}
if err = r.roomStore.StoreParticipant(ctx, roomName, participant.ToProto()); err != nil {
pLogger.Errorw("could not store participant", err)
@@ -461,7 +463,9 @@ func (r *RoomManager) StartSession(
pLogger.Errorw("could not delete participant", err)
}
r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity()))
if r.config.PSRPC.Enabled {
r.roomServer.DeregisterAllParticipantTopics(rpc.FormatParticipantTopic(roomName, participant.Identity()))
}
// update room store with new numParticipants
proto := room.ToProto()
@@ -503,8 +507,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
return nil, err
}
if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil {
return nil, err
if r.config.PSRPC.Enabled {
if err := r.roomServer.RegisterAllRoomTopics(rpc.FormatRoomTopic(roomName)); err != nil {
return nil, err
}
}
r.lock.Lock()
@@ -525,7 +531,9 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, &r.config.Audio, r.serverInfo, r.telemetry, r.egressLauncher)
newRoom.OnClose(func() {
r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName))
if r.config.PSRPC.Enabled {
r.roomServer.DeregisterAllRoomTopics(rpc.FormatRoomTopic(roomName))
}
roomInfo := newRoom.ToProto()
r.telemetry.RoomEnded(ctx, roomInfo)
+3 -3
View File
@@ -79,7 +79,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, ErrEgressNotConnected
}
rm, err := s.roomAllocator.CreateRoom(ctx, req)
rm, created, err := s.roomAllocator.CreateRoom(ctx, req)
if err != nil {
err = errors.Wrap(err, "could not create room")
return nil, err
@@ -109,7 +109,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, err
}
if req.Egress != nil && req.Egress.Room != nil {
if created && req.Egress != nil && req.Egress.Room != nil {
egress := &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: req.Egress.Room,
@@ -424,7 +424,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
// no one has joined the room, would not have been created on an RTC node.
// in this case, we'd want to run create again
_, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{
_, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{
Name: req.Room,
Metadata: req.Metadata,
})
+1 -1
View File
@@ -517,7 +517,7 @@ type connectionResult struct {
func (s *RTCService) startConnection(ctx context.Context, roomName livekit.RoomName, pi routing.ParticipantInit, timeout time.Duration) (connectionResult, *livekit.SignalResponse, error) {
var cr connectionResult
var err error
cr.Room, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)})
cr.Room, _, err = s.roomAllocator.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: string(roomName)})
if err != nil {
return cr, nil, err
}
+19 -14
View File
@@ -10,7 +10,7 @@ import (
)
type FakeRoomAllocator struct {
CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error)
CreateRoomStub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error)
createRoomMutex sync.RWMutex
createRoomArgsForCall []struct {
arg1 context.Context
@@ -18,11 +18,13 @@ type FakeRoomAllocator struct {
}
createRoomReturns struct {
result1 *livekit.Room
result2 error
result2 bool
result3 error
}
createRoomReturnsOnCall map[int]struct {
result1 *livekit.Room
result2 error
result2 bool
result3 error
}
ValidateCreateRoomStub func(context.Context, livekit.RoomName) error
validateCreateRoomMutex sync.RWMutex
@@ -40,7 +42,7 @@ type FakeRoomAllocator struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, error) {
func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.CreateRoomRequest) (*livekit.Room, bool, error) {
fake.createRoomMutex.Lock()
ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)]
fake.createRoomArgsForCall = append(fake.createRoomArgsForCall, struct {
@@ -55,9 +57,9 @@ func (fake *FakeRoomAllocator) CreateRoom(arg1 context.Context, arg2 *livekit.Cr
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
return ret.result1, ret.result2, ret.result3
}
return fakeReturns.result1, fakeReturns.result2
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeRoomAllocator) CreateRoomCallCount() int {
@@ -66,7 +68,7 @@ func (fake *FakeRoomAllocator) CreateRoomCallCount() int {
return len(fake.createRoomArgsForCall)
}
func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, error)) {
func (fake *FakeRoomAllocator) CreateRoomCalls(stub func(context.Context, *livekit.CreateRoomRequest) (*livekit.Room, bool, error)) {
fake.createRoomMutex.Lock()
defer fake.createRoomMutex.Unlock()
fake.CreateRoomStub = stub
@@ -79,30 +81,33 @@ func (fake *FakeRoomAllocator) CreateRoomArgsForCall(i int) (context.Context, *l
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 error) {
func (fake *FakeRoomAllocator) CreateRoomReturns(result1 *livekit.Room, result2 bool, result3 error) {
fake.createRoomMutex.Lock()
defer fake.createRoomMutex.Unlock()
fake.CreateRoomStub = nil
fake.createRoomReturns = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) {
func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.Room, result2 bool, result3 error) {
fake.createRoomMutex.Lock()
defer fake.createRoomMutex.Unlock()
fake.CreateRoomStub = nil
if fake.createRoomReturnsOnCall == nil {
fake.createRoomReturnsOnCall = make(map[int]struct {
result1 *livekit.Room
result2 error
result2 bool
result3 error
})
}
fake.createRoomReturnsOnCall[i] = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
result2 bool
result3 error
}{result1, result2, result3}
}
func (fake *FakeRoomAllocator) ValidateCreateRoom(arg1 context.Context, arg2 livekit.RoomName) error {
+1 -1
View File
@@ -164,7 +164,7 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
signalRequestMessageReader{},
r.config,
)
l.Infow("signal stream closed", "error", err)
l.Debugw("signal stream closed", "error", err)
reqChan.Close()
}()
+3 -3
View File
@@ -154,7 +154,7 @@ func (r *RTPStatsReceiver) Update(
return
}
}
if -gapSN >= cNumSequenceNumbers {
if -gapSN >= cNumSequenceNumbers/2 {
r.logger.Warnw(
"large sequence number gap negative", nil,
"extStartSN", r.sequenceNumber.GetExtendedStart(),
@@ -226,7 +226,7 @@ func (r *RTPStatsReceiver) Update(
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
if gapSN >= cNumSequenceNumbers {
if gapSN >= cNumSequenceNumbers/2 {
r.logger.Warnw(
"large sequence number gap", nil,
"extStartSN", r.sequenceNumber.GetExtendedStart(),
@@ -333,7 +333,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) ||
(timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) {
if r.clockSkewCount%10 == 0 {
if r.clockSkewCount%100 == 0 {
r.logger.Infow(
"clock rate skew",
"first", r.srFirst.ToString(),
+18 -3
View File
@@ -286,7 +286,7 @@ func (r *RTPStatsSender) Update(
// do not start on a padding only packet
return
}
if -gapSN >= cNumSequenceNumbers {
if -gapSN >= cNumSequenceNumbers/2 {
r.logger.Warnw(
"large sequence number gap negative", nil,
"extStartSN", r.extStartSN,
@@ -352,7 +352,7 @@ func (r *RTPStatsSender) Update(
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, true)
}
} else { // in-order
if gapSN >= cNumSequenceNumbers {
if gapSN >= cNumSequenceNumbers/2 {
r.logger.Warnw(
"large sequence number gap", nil,
"extStartSN", r.extStartSN,
@@ -394,7 +394,22 @@ func (r *RTPStatsSender) Update(
"tsBefore", r.extStartTS,
"tsAfter", extTimestamp,
)
r.extStartTS = extTimestamp
if extTimestamp == 0 { // TODO-REMOVE-AFTER-DEBUG
r.logger.Errorw(
"invalid start timestamp", nil,
"snBefore", r.extStartSN,
"snAfter", extSequenceNumber,
"snHighest", r.extHighestSN,
"tsBefore", r.extStartTS,
"tsAfter", extTimestamp,
"tsHighest", r.extHighestTS,
"firstTime", r.firstTime.String(),
"startTime", r.startTime.String(),
)
}
if extTimestamp != 0 {
r.extStartTS = extTimestamp
}
}
if extTimestamp > r.extHighestTS {
+15 -5
View File
@@ -270,7 +270,9 @@ type DownTrack struct {
pacer pacer.Pacer
maxLayerNotifierCh chan struct{}
maxLayerNotifierChMu sync.RWMutex
maxLayerNotifierCh chan struct{}
maxLayerNotifierChClosed bool
cbMu sync.RWMutex
onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat)
@@ -638,14 +640,18 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
}
func (d *DownTrack) postMaxLayerNotifierEvent() {
if d.IsClosed() || d.kind != webrtc.RTPCodecTypeVideo {
if d.kind != webrtc.RTPCodecTypeVideo {
return
}
select {
case d.maxLayerNotifierCh <- struct{}{}:
default:
d.maxLayerNotifierChMu.RLock()
if !d.maxLayerNotifierChClosed {
select {
case d.maxLayerNotifierCh <- struct{}{}:
default:
}
}
d.maxLayerNotifierChMu.RUnlock()
}
func (d *DownTrack) maxLayerNotifierWorker() {
@@ -960,6 +966,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
}
d.bound.Store(false)
d.onBindAndConnectedChange()
d.params.Logger.Debugw("closing sender", "kind", d.kind)
}
d.params.Receiver.DeleteDownTrack(d.params.SubID)
@@ -975,7 +982,10 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.rtpStats.Stop()
d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString())
d.maxLayerNotifierChMu.Lock()
d.maxLayerNotifierChClosed = true
close(d.maxLayerNotifierCh)
d.maxLayerNotifierChMu.Unlock()
if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil {
onCloseHandler(!flush)
+81 -33
View File
@@ -84,6 +84,7 @@ type sequencer struct {
size int
startTime int64
initialized bool
extStartSN uint64
extHighestSN uint64
snOffset uint64
extHighestTS uint64
@@ -133,46 +134,61 @@ func (s *sequencer) push(
if !s.initialized {
s.initialized = true
s.extHighestSN = extModifiedSN - 1
s.extStartSN = extModifiedSN
s.extHighestSN = extModifiedSN
s.extHighestTS = extModifiedTS
s.updateSNOffset()
}
snOffset := s.snOffset
diff := int64(extModifiedSN - s.extHighestSN)
if diff >= 0 {
s.extHighestSN = extModifiedSN
} else {
if diff < -int64(s.size) {
s.logger.Warnw(
"old packet, cannot be sequenced", nil,
"extHighestSN", s.extHighestSN,
"extIncomingSN", extIncomingSN,
"extModifiedSN", extModifiedSN,
)
return
}
if extModifiedSN < s.extStartSN {
// old packet, should not happen
return
}
extHighestSNAdjusted := s.extHighestSN - s.snOffset
extModifiedSNAdjusted := extModifiedSN - s.snOffset
if extModifiedSN < s.extHighestSN {
if s.snRangeMap != nil {
var err error
snOffset, err = s.snRangeMap.GetValue(extModifiedSN)
snOffset, err := s.snRangeMap.GetValue(extModifiedSN)
if err != nil {
s.logger.Errorw(
"could not get sequence number offset", err,
"extStartSN", s.extStartSN,
"extHighestSN", s.extHighestSN,
"extIncomingSN", extIncomingSN,
"extModifiedSN", extModifiedSN,
"snOffset", s.snOffset,
)
return
}
extModifiedSNAdjusted = extModifiedSN - snOffset
}
}
if int64(extModifiedSNAdjusted-extHighestSNAdjusted) <= -int64(s.size) {
s.logger.Warnw(
"old packet, cannot be sequenced", nil,
"extHighestSN", s.extHighestSN,
"extIncomingSN", extIncomingSN,
"extModifiedSN", extModifiedSN,
)
return
}
// invalidate missing sequence numbers
if extModifiedSNAdjusted > extHighestSNAdjusted {
numInvalidated := 0
for esn := extHighestSNAdjusted + 1; esn != extModifiedSNAdjusted; esn++ {
s.invalidateSlot(int(esn % uint64(s.size)))
numInvalidated++
if numInvalidated >= s.size {
break
}
}
}
if int64(extModifiedTS-s.extHighestTS) >= 0 {
s.extHighestTS = extModifiedTS
}
slot := (extModifiedSN - snOffset) % uint64(s.size)
slot := extModifiedSNAdjusted % uint64(s.size)
s.meta[slot] = packetMeta{
sourceSeqNo: uint16(extIncomingSN),
targetSeqNo: uint16(extModifiedSN),
@@ -183,6 +199,13 @@ func (s *sequencer) push(
ddBytes: append([]byte{}, ddBytes...),
lastNack: s.getRefTime(packetTime), // delay retransmissions after the original transmission
}
if extModifiedSN > s.extHighestSN {
s.extHighestSN = extModifiedSN
}
if extModifiedTS > s.extHighestTS {
s.extHighestTS = extModifiedTS
}
}
func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive uint64) {
@@ -223,10 +246,7 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui
}
slot := (sn - snOffset) % uint64(s.size)
s.meta[slot] = packetMeta{
sourceSeqNo: 0,
targetSeqNo: 0,
}
s.invalidateSlot(int(slot))
}
return
}
@@ -244,6 +264,10 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
s.Lock()
defer s.Unlock()
if !s.initialized {
return nil
}
snOffset := uint64(0)
var err error
extPacketMetas := make([]extPacketMeta, 0, len(seqNo))
@@ -263,11 +287,6 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
extSN -= (1 << 16)
}
if s.extHighestSN-extSN >= uint64(s.size) {
// too old
continue
}
if s.snRangeMap != nil {
snOffset, err = s.snRangeMap.GetValue(extSN)
if err != nil {
@@ -276,9 +295,17 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
}
}
slot := (extSN - snOffset) % uint64(s.size)
extSNAdjusted := extSN - snOffset
extHighestSNAdjusted := s.extHighestSN - s.snOffset
if extHighestSNAdjusted-extSNAdjusted >= uint64(s.size) {
// too old
continue
}
slot := extSNAdjusted % uint64(s.size)
meta := &s.meta[slot]
if meta.targetSeqNo != sn {
if meta.targetSeqNo != sn || s.isInvalidSlot(int(slot)) {
// invalid slot access could happen if padding packets exclusion range could not be recorded
continue
}
@@ -320,3 +347,24 @@ func (s *sequencer) updateSNOffset() {
}
s.snOffset = snOffset
}
func (s *sequencer) invalidateSlot(slot int) {
if slot >= len(s.meta) {
return
}
s.meta[slot] = packetMeta{
sourceSeqNo: 0,
targetSeqNo: 0,
lastNack: 0,
}
}
func (s *sequencer) isInvalidSlot(slot int) bool {
if slot >= len(s.meta) {
return true
}
meta := &s.meta[slot]
return meta.sourceSeqNo == 0 && meta.targetSeqNo == 0 && meta.lastNack == 0
}
+6
View File
@@ -651,6 +651,12 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer)
}
// SVC-TODO: better SVC detection
if s.isSVC {
// there is only one stream in SVC
return ts, nil
}
if layer != referenceLayer && s.layerOffsets[referenceLayer][layer] == 0 {
return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer)
}
@@ -88,11 +88,6 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
return
}
if !d.currentLayer.IsValid() && !extPkt.KeyFrame {
d.decisions.AddDropped(extFrameNum)
return
}
if ddwdt.StructureUpdated {
d.updateDependencyStructure(dd.AttachedStructure, ddwdt.DecodeTargets)
}
@@ -149,7 +144,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
return
}
// // DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame
// DD-TODO : if bandwidth in congest, could drop the 'Discardable' frame
if dti == dede.DecodeTargetNotPresent {
// d.logger.Debugw(fmt.Sprintf("drop packet for decode target not present, highestDecodeTarget %d, incoming %v, fn: %d/%d",
// highestDecodeTarget,
@@ -193,6 +188,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
"req", d.requestSpatial,
"maxSeen", d.maxSeenLayer,
"feed", extPkt.Packet.SSRC,
"frame", extFrameNum,
)
}
@@ -201,7 +197,7 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
d.previousActiveDecodeTargetsBitmask = d.activeDecodeTargetsBitmask
d.activeDecodeTargetsBitmask = buffer.GetActiveDecodeTargetBitmask(d.currentLayer, ddwdt.DecodeTargets)
d.logger.Debugw("switch to target", "highest", highestDecodeTarget.Layer, "current", d.currentLayer, "bitmask", *d.activeDecodeTargetsBitmask)
d.logger.Debugw("switch to target", "highest", highestDecodeTarget.Layer, "current", d.currentLayer, "bitmask", *d.activeDecodeTargetsBitmask, "frame", extFrameNum)
}
ddExtension := &dede.DependencyDescriptorExtension{
+2 -2
View File
@@ -49,7 +49,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate
if fd.ChainDiffs[fc.chainIdx] == 0 {
if fc.broken {
fc.broken = false
fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx)
fc.logger.Debugw("frame chain intact", "chanIdx", fc.chainIdx, "frame", extFrameNum)
}
fc.expectFrames = fc.expectFrames[:0]
return true
@@ -62,7 +62,7 @@ func (fc *FrameChain) OnFrame(extFrameNum uint64, fd *dd.FrameDependencyTemplate
prevFrameInChain := extFrameNum - uint64(fd.ChainDiffs[fc.chainIdx])
sd, err := fc.decisions.GetDecision(prevFrameInChain)
if err != nil {
fc.logger.Debugw("could not get decision", "err", err, "frame", extFrameNum, "prevFrame", prevFrameInChain)
fc.logger.Debugw("could not get decision", "err", err, "chanIdx", fc.chainIdx, "frame", extFrameNum, "prevFrame", prevFrameInChain)
}
var intact bool