diff --git a/CHANGELOG b/CHANGELOG index f4893fd8c..87f096580 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,45 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.4.5] - 2023-08-22 + +### Added +- Add ability to roll back video layer selection. (#1871) +- Allow listing ingress by id (#1874) +- E2EE trailer for server injected packets. (#1908) +- Add support for ingress URL pull (#1938 #1939) +- (experimental) Add control of playout delay (#1838 #1930) +- Add option to advertise external ip only (#1962) +- Allow data packet to be sent to participants by identity (#1982) + +### Fixed +- Fix RTC IP when binding to 0.0.0.0 (#1862) +- Prevent anachronous sample reading in connection stats (#1863) +- Fixed resubscribe race due to desire changed before cleaning up (#1865) +- Fixed numPublisher computation by marking dirty after track published changes (#1878) +- Attempt to avoid out-of-order max subscribed layer notifications. (#1882) +- Improved packet loss handling for SVC codecs (#1912 ) +- Frame integrity check for SVC codecs (#1914) +- Issue full reconnect if subscriber PC is closed on ICERestart (#1919) +- Do not post max layer event for audio. (#1932) +- Never use dd tracker for non-svc codec (#1952) +- Fix race condition causing new participants to have stale room metadata (#1969) +- Fixed VP9 handling for non-SVC content. (#1973) +- Ensure older session does not clobber newer session. (#1974) +- Do not start RTPStats on a padding packet. (#1984) + +### Changed +- Push track quality to poor on a bandwidth constrained pause (#1867) +- AV sync improvements (#1875 #1892 #1944 #1951 #1955 #1956 #1968 #1971 #1986) +- Do not send unnecessary room updates when content isn't changed (#1881) +- start reading signal messages before session handler finishes (#1883) +- changing key file permissions control to allow group readable (#1893) +- close disconnected participants when signal channel fails (#1895) +- Improved stream allocator handling during transitions and reallocation. (#1905 #1906) +- Stream allocator tweaks to reduce re-allocation (#1936) +- Reduce NACK traffic by delaying retransmission after first send. (#1918) +- Temper stream allocator more to avoid false negative downgrades (#1920) + ## [1.4.4] - 2023-07-08 ### Added diff --git a/go.mod b/go.mod index aa8c09fd7..24d5bf8fb 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c + github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 github.com/livekit/protocol v1.6.1 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 @@ -41,7 +41,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thoas/go-funk v0.9.3 github.com/twitchtv/twirp v8.1.3+incompatible - github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f + github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 diff --git a/go.sum b/go.sum index 900588264..31050de5f 100644 --- a/go.sum +++ b/go.sum @@ -122,8 +122,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c h1:4udPqCusH93MK/7q8ZfDqcLJHGoQeKKsMi5b+/BpQvk= -github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= +github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I= +github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk= github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= @@ -260,8 +260,8 @@ github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU= github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= -github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f h1:A+MmlgpvrHLeUP8dkBVn4Pnf5Bp5Yk2OALm7SEJLLE8= -github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E= +github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 h1:YsXCA7ZdgFMgwDpNpYj4y2WPRVrOVVDAkQlFc477T54= +github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/urfave/negroni/v3 v3.0.0 h1:Vo8CeZfu1lFR9gW8GnAb6dOGCJyijfil9j/jKKc/JhU= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 359aec5a0..51aabb7c8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -886,23 +886,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti // signal connection methods // -func (p *ParticipantImpl) GetAudioLevel() (level float64, active bool) { - level = 0 - for _, pt := range p.GetPublishedTracks() { - mediaTrack := pt.(types.LocalMediaTrack) - if mediaTrack.Source() == livekit.TrackSource_MICROPHONE { - tl, ta := mediaTrack.GetAudioLevel() - if ta { - active = true - if tl > level { - level = tl - } - } - } - } - return -} - func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo { numTracks := 0 minQuality := livekit.ConnectionQuality_EXCELLENT @@ -1704,6 +1687,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei ) mid := p.TransportManager.GetPublisherMid(rtpReceiver) if mid == "" { + p.pendingTracksLock.Unlock() p.pubLogger.Warnw("could not get mid for track", nil, "trackID", track.ID()) return nil, false } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index baacff5dd..5275b5ad3 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -243,6 +243,8 @@ type Participant interface { GetPublishedTracks() []MediaTrack RemovePublishedTrack(track MediaTrack, willBeResumed bool, shouldClose bool) + GetAudioLevel() (smoothedLevel float64, active bool) + // HasPermission checks permission of the subscriber by identity. Returns true if subscriber is allowed to subscribe // to the track with trackID HasPermission(trackID livekit.TrackID, subIdentity livekit.ParticipantIdentity) bool @@ -348,7 +350,6 @@ type LocalParticipant interface { GetSubscribedParticipants() []livekit.ParticipantID IsSubscribedTo(sid livekit.ParticipantID) bool - GetAudioLevel() (smoothedLevel float64, active bool) GetConnectionQuality() *livekit.ConnectionQualityInfo // server sent messages @@ -445,6 +446,8 @@ type MediaTrack interface { UpdateVideoLayers(layers []*livekit.VideoLayer) IsSimulcast() bool + GetAudioLevel() (level float64, active bool) + Close(willBeResumed bool) IsOpen() bool @@ -480,7 +483,6 @@ type LocalMediaTrack interface { SignalCid() string HasSdpCid(cid string) bool - GetAudioLevel() (level float64, active bool) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality) SetRTT(rtt uint32) diff --git a/pkg/rtc/types/typesfakes/fake_media_track.go b/pkg/rtc/types/typesfakes/fake_media_track.go index af0c8c1b0..fd472c52e 100644 --- a/pkg/rtc/types/typesfakes/fake_media_track.go +++ b/pkg/rtc/types/typesfakes/fake_media_track.go @@ -48,6 +48,18 @@ type FakeMediaTrack struct { getAllSubscribersReturnsOnCall map[int]struct { result1 []livekit.ParticipantID } + GetAudioLevelStub func() (float64, bool) + getAudioLevelMutex sync.RWMutex + getAudioLevelArgsForCall []struct { + } + getAudioLevelReturns struct { + result1 float64 + result2 bool + } + getAudioLevelReturnsOnCall map[int]struct { + result1 float64 + result2 bool + } GetNumSubscribersStub func() int getNumSubscribersMutex sync.RWMutex getNumSubscribersArgsForCall []struct { @@ -478,6 +490,62 @@ func (fake *FakeMediaTrack) GetAllSubscribersReturnsOnCall(i int, result1 []live }{result1} } +func (fake *FakeMediaTrack) GetAudioLevel() (float64, bool) { + fake.getAudioLevelMutex.Lock() + ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] + fake.getAudioLevelArgsForCall = append(fake.getAudioLevelArgsForCall, struct { + }{}) + stub := fake.GetAudioLevelStub + fakeReturns := fake.getAudioLevelReturns + fake.recordInvocation("GetAudioLevel", []interface{}{}) + fake.getAudioLevelMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeMediaTrack) GetAudioLevelCallCount() int { + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() + return len(fake.getAudioLevelArgsForCall) +} + +func (fake *FakeMediaTrack) GetAudioLevelCalls(stub func() (float64, bool)) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = stub +} + +func (fake *FakeMediaTrack) GetAudioLevelReturns(result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + fake.getAudioLevelReturns = struct { + result1 float64 + result2 bool + }{result1, result2} +} + +func (fake *FakeMediaTrack) GetAudioLevelReturnsOnCall(i int, result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + if fake.getAudioLevelReturnsOnCall == nil { + fake.getAudioLevelReturnsOnCall = make(map[int]struct { + result1 float64 + result2 bool + }) + } + fake.getAudioLevelReturnsOnCall[i] = struct { + result1 float64 + result2 bool + }{result1, result2} +} + func (fake *FakeMediaTrack) GetNumSubscribers() int { fake.getNumSubscribersMutex.Lock() ret, specificReturn := fake.getNumSubscribersReturnsOnCall[len(fake.getNumSubscribersArgsForCall)] @@ -1640,6 +1708,8 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} { defer fake.closeMutex.RUnlock() fake.getAllSubscribersMutex.RLock() defer fake.getAllSubscribersMutex.RUnlock() + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() fake.getNumSubscribersMutex.RLock() defer fake.getNumSubscribersMutex.RUnlock() fake.getQualityForDimensionMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index a660644cb..fa92204fe 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -43,6 +43,18 @@ type FakeParticipant struct { debugInfoReturnsOnCall map[int]struct { result1 map[string]interface{} } + GetAudioLevelStub func() (float64, bool) + getAudioLevelMutex sync.RWMutex + getAudioLevelArgsForCall []struct { + } + getAudioLevelReturns struct { + result1 float64 + result2 bool + } + getAudioLevelReturnsOnCall map[int]struct { + result1 float64 + result2 bool + } GetPublishedTrackStub func(livekit.TrackID) types.MediaTrack getPublishedTrackMutex sync.RWMutex getPublishedTrackArgsForCall []struct { @@ -377,6 +389,62 @@ func (fake *FakeParticipant) DebugInfoReturnsOnCall(i int, result1 map[string]in }{result1} } +func (fake *FakeParticipant) GetAudioLevel() (float64, bool) { + fake.getAudioLevelMutex.Lock() + ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] + fake.getAudioLevelArgsForCall = append(fake.getAudioLevelArgsForCall, struct { + }{}) + stub := fake.GetAudioLevelStub + fakeReturns := fake.getAudioLevelReturns + fake.recordInvocation("GetAudioLevel", []interface{}{}) + fake.getAudioLevelMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeParticipant) GetAudioLevelCallCount() int { + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() + return len(fake.getAudioLevelArgsForCall) +} + +func (fake *FakeParticipant) GetAudioLevelCalls(stub func() (float64, bool)) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = stub +} + +func (fake *FakeParticipant) GetAudioLevelReturns(result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + fake.getAudioLevelReturns = struct { + result1 float64 + result2 bool + }{result1, result2} +} + +func (fake *FakeParticipant) GetAudioLevelReturnsOnCall(i int, result1 float64, result2 bool) { + fake.getAudioLevelMutex.Lock() + defer fake.getAudioLevelMutex.Unlock() + fake.GetAudioLevelStub = nil + if fake.getAudioLevelReturnsOnCall == nil { + fake.getAudioLevelReturnsOnCall = make(map[int]struct { + result1 float64 + result2 bool + }) + } + fake.getAudioLevelReturnsOnCall[i] = struct { + result1 float64 + result2 bool + }{result1, result2} +} + func (fake *FakeParticipant) GetPublishedTrack(arg1 livekit.TrackID) types.MediaTrack { fake.getPublishedTrackMutex.Lock() ret, specificReturn := fake.getPublishedTrackReturnsOnCall[len(fake.getPublishedTrackArgsForCall)] @@ -1236,6 +1304,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.closeMutex.RUnlock() fake.debugInfoMutex.RLock() defer fake.debugInfoMutex.RUnlock() + fake.getAudioLevelMutex.RLock() + defer fake.getAudioLevelMutex.RUnlock() fake.getPublishedTrackMutex.RLock() defer fake.getPublishedTrackMutex.RUnlock() fake.getPublishedTracksMutex.RLock() diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index 961134c83..1d1dfa6e5 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -418,3 +418,19 @@ func (u *UpTrackManager) DebugInfo() map[string]interface{} { return info } + +func (u *UpTrackManager) GetAudioLevel() (level float64, active bool) { + level = 0 + for _, pt := range u.GetPublishedTracks() { + if pt.Source() == livekit.TrackSource_MICROPHONE { + tl, ta := pt.GetAudioLevel() + if ta { + active = true + if tl > level { + level = tl + } + } + } + } + return +} diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index 73de7663c..a8fd81d6c 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -139,7 +139,10 @@ func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string urlStr = urlObj.String() } - sk := utils.NewGuid("") + var sk string + if req.InputType != livekit.IngressInput_URL_INPUT { + sk = utils.NewGuid("") + } info := &livekit.IngressInfo{ IngressId: utils.NewGuid(utils.IngressPrefix), diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index e8fe4ecc9..df119a5e5 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -514,7 +514,7 @@ func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo) if info.IngressId == "" { return errors.New("Missing IngressId") } - if info.StreamKey == "" { + if info.StreamKey == "" && info.InputType != livekit.IngressInput_URL_INPUT { return errors.New("Missing StreamKey") } @@ -543,7 +543,9 @@ func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo) results, err := tx.TxPipelined(s.ctx, func(p redis.Pipeliner) error { p.HSet(s.ctx, IngressKey, info.IngressId, data) - p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId) + if info.StreamKey != "" { + p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId) + } if oldRoom != info.RoomName { if oldRoom != "" { @@ -799,7 +801,9 @@ func (s *RedisStore) UpdateIngressState(ctx context.Context, ingressId string, s func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error { tx := s.rc.TxPipeline() tx.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId) - tx.HDel(s.ctx, StreamKeyKey, info.IngressId) + if info.StreamKey != "" { + tx.HDel(s.ctx, StreamKeyKey, info.StreamKey) + } tx.HDel(s.ctx, IngressKey, info.IngressId) tx.Del(s.ctx, IngressStatePrefix+info.IngressId) if _, err := tx.Exec(s.ctx); err != nil { diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index baae76b46..957aaa452 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -30,6 +30,7 @@ import ( "go.uber.org/atomic" "github.com/livekit/livekit-server/pkg/sfu/audio" + "github.com/livekit/livekit-server/pkg/sfu/utils" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil" "github.com/livekit/mediatransportutil/pkg/bucket" @@ -80,6 +81,8 @@ type Buffer struct { closed atomic.Bool mime string + snRangeMap *utils.RangeMap[uint32, uint32] + // supported feedbacks latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -124,6 +127,7 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer { mediaSSRC: ssrc, videoPool: vp, audioPool: ap, + snRangeMap: utils.NewRangeMap[uint32, uint32](100), pliThrottle: int64(500 * time.Millisecond), logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU), } @@ -404,42 +408,40 @@ func (b *Buffer) SetRTT(rtt uint32) { } func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { - pktBuf, err := b.bucket.AddPacket(pkt) - if err != nil { - // - // Even when erroring, do - // 1. state update - // 2. TWCC just in case remote side is retransmitting an old packet for probing - // - // But, do not forward those packets - // - var rtpPacket rtp.Packet - if uerr := rtpPacket.Unmarshal(pkt); uerr == nil { - b.updateStreamState(&rtpPacket, arrivalTime) - b.processHeaderExtensions(&rtpPacket, arrivalTime) - } + var rtpPacket rtp.Packet + if err := rtpPacket.Unmarshal(pkt); err != nil { + b.logger.Errorw("could not unmarshal RTP packet", err) + return + } + extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime) + b.processHeaderExtensions(&rtpPacket, arrivalTime) + if !isOutOfOrder && len(rtpPacket.Payload) == 0 { + // drop padding only in-order packet + b.snRangeMap.IncValue(1) + return + } + + // add to RTX buffer using sequence number after accounting for dropped padding only packets + snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber) + if err != nil { + b.logger.Errorw("could not get sequence number adjustment", err) + return + } + rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment) + _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) + if err != nil { if err != bucket.ErrRTXPacket { b.logger.Warnw("could not add RTP packet to bucket", err) } return } - var p rtp.Packet - err = p.Unmarshal(pktBuf) - if err != nil { - b.logger.Warnw("error unmarshaling RTP packet", err) - return - } - - b.updateStreamState(&p, arrivalTime) - b.processHeaderExtensions(&p, arrivalTime) - b.doNACKs() b.doReports(arrivalTime) - ep := b.getExtPacket(&p, arrivalTime) + ep := b.getExtPacket(&rtpPacket, arrivalTime) if ep == nil { return } @@ -497,18 +499,21 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) { } } -func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) { +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) { flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime) if b.nacker != nil { b.nacker.Remove(p.SequenceNumber) if flowState.HasLoss { + b.snRangeMap.AddRange(flowState.LossStartInclusive, flowState.LossEndExclusive) for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ { - b.nacker.Push(lost) + b.nacker.Push(uint16(lost)) } } } + + return flowState.ExtSeqNumber, flowState.IsOutOfOrder } func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) { diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 24a0c34a7..91069d377 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -65,8 +65,12 @@ func (d driftResult) String() string { type RTPFlowState struct { HasLoss bool - LossStartInclusive uint16 - LossEndExclusive uint16 + LossStartInclusive uint32 + LossEndExclusive uint32 + + IsOutOfOrder bool + + ExtSeqNumber uint32 } type IntervalStats struct { @@ -449,14 +453,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } } + flowState.IsOutOfOrder = true + + cycles := r.cycles + if rtph.SequenceNumber > r.highestSN { + cycles-- + } + flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, cycles) + // in-order default: - if diff > 1 { - flowState.HasLoss = true - flowState.LossStartInclusive = r.highestSN + 1 - flowState.LossEndExclusive = rtph.SequenceNumber - } - // update gap histogram r.updateGapHistogram(int(diff)) @@ -466,6 +472,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false) + if diff > 1 { + flowState.HasLoss = true + + cycles := r.cycles + if r.highestSN+1 < r.highestSN { + cycles++ + } + flowState.LossStartInclusive = getExtSN(r.highestSN+1, cycles) + } + if rtph.SequenceNumber < r.highestSN && !first { r.cycles++ } @@ -481,6 +497,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa // NOTE: this may not be the first packet with this time stamp if there is packet loss. r.highestTime = packetTime } + + if flowState.HasLoss { + flowState.LossEndExclusive = getExtSN(rtph.SequenceNumber, r.cycles) + } + flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, r.cycles) } if !isDuplicate { @@ -1733,6 +1754,10 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snaps // ---------------------------------- +func getExtSN(sn uint16, cycles uint16) uint32 { + return (uint32(cycles) << 16) | uint32(sn) +} + func getExtTS(ts uint32, cycles uint32) uint64 { return (uint64(cycles) << 32) | uint64(ts) } diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index 389d330c3..74e4774f4 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -126,8 +126,8 @@ func TestRTPStats_Update(t *testing.T) { packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now()) require.True(t, flowState.HasLoss) - require.Equal(t, sequenceNumber-9, flowState.LossStartInclusive) - require.Equal(t, sequenceNumber, flowState.LossEndExclusive) + require.Equal(t, uint32(sequenceNumber-9), flowState.LossStartInclusive) + require.Equal(t, uint32(sequenceNumber), flowState.LossEndExclusive) require.Equal(t, uint32(17), r.packetsLost) // out-of-order should decrement number of lost packets diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 1d5c5ac7f..88876d307 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -154,12 +154,13 @@ type TranslationParams struct { // ------------------------------------------------------------------- type ForwarderState struct { - Started bool - PreStartTime time.Time - FirstTS uint32 - RefTSOffset uint32 - RTP RTPMungerState - Codec interface{} + Started bool + ReferenceLayerSpatial int32 + PreStartTime time.Time + FirstTS uint32 + RefTSOffset uint32 + RTP RTPMungerState + Codec interface{} } func (f ForwarderState) String() string { @@ -168,8 +169,9 @@ func (f ForwarderState) String() string { case codecmunger.VP8State: codecString = codecState.String() } - return fmt.Sprintf("ForwarderState{started: %v, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", + return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", f.Started, + f.ReferenceLayerSpatial, f.PreStartTime.String(), f.FirstTS, f.RefTSOffset, @@ -330,12 +332,13 @@ func (f *Forwarder) GetState() ForwarderState { } return ForwarderState{ - Started: f.started, - PreStartTime: f.preStartTime, - FirstTS: f.firstTS, - RefTSOffset: f.refTSOffset, - RTP: f.rtpMunger.GetLast(), - Codec: f.codecMunger.GetState(), + Started: f.started, + ReferenceLayerSpatial: f.referenceLayerSpatial, + PreStartTime: f.preStartTime, + FirstTS: f.firstTS, + RefTSOffset: f.refTSOffset, + RTP: f.rtpMunger.GetLast(), + Codec: f.codecMunger.GetState(), } } @@ -351,6 +354,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.codecMunger.SeedState(state.Codec) f.started = true + f.referenceLayerSpatial = state.ReferenceLayerSpatial f.preStartTime = state.PreStartTime f.firstTS = state.FirstTS f.refTSOffset = state.RefTSOffset @@ -1451,11 +1455,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e ) } - if f.referenceLayerSpatial == buffer.InvalidLayerSpatial { - // on a resume, reference layer may not be set, so only set when it is invalid - f.referenceLayerSpatial = layer - } - // Compute how much time passed between the previous forwarded packet // and the current incoming (to be forwarded) packet and calculate // timestamp offset on source change. diff --git a/pkg/sfu/utils/rangemap.go b/pkg/sfu/utils/rangemap.go new file mode 100644 index 000000000..75ffd1646 --- /dev/null +++ b/pkg/sfu/utils/rangemap.go @@ -0,0 +1,123 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "errors" + "math" + "unsafe" +) + +const ( + minRanges = 1 +) + +var ( + errReversedOrder = errors.New("end is before start") + errKeyNotFound = errors.New("key not found") +) + +type rangeType interface { + uint32 +} + +type valueType interface { + uint32 +} + +type rangeVal[RT rangeType, VT valueType] struct { + start RT + end RT + value VT +} + +type RangeMap[RT rangeType, VT valueType] struct { + halfRange RT + + size int + ranges []rangeVal[RT, VT] + runningValue VT +} + +func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] { + var t RT + return &RangeMap[RT, VT]{ + halfRange: 1 << ((unsafe.Sizeof(t) * 8) - 1), + size: int(math.Max(float64(size), float64(minRanges))), + } +} + +func (r *RangeMap[RT, VT]) IncValue(inc VT) { + r.runningValue += inc +} + +func (r *RangeMap[RT, VT]) AddRange(startInclusive RT, endExclusive RT) error { + if endExclusive-startInclusive > r.halfRange { + return errReversedOrder + } + + isNewRange := true + // check if last range can be extended + if len(r.ranges) != 0 { + lr := &r.ranges[len(r.ranges)-1] + if startInclusive <= lr.end { + return errReversedOrder + } + if lr.value == r.runningValue { + lr.end = endExclusive - 1 + isNewRange = false + } else { + // end last range before start and start a new range + lr.end = startInclusive - 1 + } + } + + if isNewRange { + r.ranges = append(r.ranges, rangeVal[RT, VT]{ + start: startInclusive, + end: endExclusive - 1, + value: r.runningValue, + }) + } + r.prune() + return nil +} + +func (r *RangeMap[RT, VT]) GetValue(key RT) (VT, error) { + numRanges := len(r.ranges) + if numRanges != 0 { + if key > r.ranges[numRanges-1].end { + return r.runningValue, nil + } + + if key < r.ranges[0].start { + return 0, errKeyNotFound + } + } + + for _, rv := range r.ranges { + if key-rv.start < r.halfRange && rv.end-key < r.halfRange { + return rv.value, nil + } + } + + return r.runningValue, nil +} + +func (r *RangeMap[RT, VT]) prune() { + if len(r.ranges) > r.size { + r.ranges = r.ranges[len(r.ranges)-r.size:] + } +} diff --git a/pkg/sfu/utils/rangemap_test.go b/pkg/sfu/utils/rangemap_test.go new file mode 100644 index 000000000..dcc6a745d --- /dev/null +++ b/pkg/sfu/utils/rangemap_test.go @@ -0,0 +1,122 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRangeMapUint32(t *testing.T) { + r := NewRangeMap[uint32, uint32](2) + + // getting value for any key should be 0 default + value, err := r.GetValue(33333) + require.NoError(t, err) + require.Equal(t, uint32(0), value) + value, err = r.GetValue(0xffffffff) + require.NoError(t, err) + require.Equal(t, uint32(0), value) + + // getting value for any key should be incremented value + r.IncValue(2) + value, err = r.GetValue(66666666) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + value, err = r.GetValue(0) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + + // add a couple of ranges, as the value is same should just extend + err = r.AddRange(10, 20) + require.NoError(t, err) + err = r.AddRange(30, 40) + require.NoError(t, err) + require.Equal(t, 1, len(r.ranges)) + require.Equal(t, uint32(10), r.ranges[0].start) + require.Equal(t, uint32(39), r.ranges[0].end) + require.Equal(t, uint32(2), r.ranges[0].value) + + // bump value + r.IncValue(1) + // getting value in previously added range should return 2 + value, err = r.GetValue(22) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + // outside range should return 3 + value, err = r.GetValue(662) + require.NoError(t, err) + require.Equal(t, uint32(3), value) + + // adding out-of-order range should return error + err = r.AddRange(60, 50) + require.Error(t, err, errReversedOrder) + + // adding overlapping should return error + err = r.AddRange(30, 50) + require.Error(t, err, errReversedOrder) + + // adding a non-overlapping range should extend previous range and add new one + err = r.AddRange(50, 60) + require.NoError(t, err) + require.Equal(t, 2, len(r.ranges)) + + require.Equal(t, uint32(10), r.ranges[0].start) + require.Equal(t, uint32(49), r.ranges[0].end) + require.Equal(t, uint32(2), r.ranges[0].value) + + require.Equal(t, uint32(50), r.ranges[1].start) + require.Equal(t, uint32(59), r.ranges[1].end) + require.Equal(t, uint32(3), r.ranges[1].value) + + // getting an old value should not succeed, but start of first range should return no error + value, err = r.GetValue(9) + require.Error(t, err, errKeyNotFound) + value, err = r.GetValue(10) + require.NoError(t, err) + require.Equal(t, uint32(2), value) + + // adding another range should prune the first one as size if set to 2 + r.IncValue(10) + err = r.AddRange(1000, 1233) + require.NoError(t, err) + require.Equal(t, 2, len(r.ranges)) + + require.Equal(t, uint32(50), r.ranges[0].start) + require.Equal(t, uint32(999), r.ranges[0].end) + require.Equal(t, uint32(3), r.ranges[0].value) + + require.Equal(t, uint32(1000), r.ranges[1].start) + require.Equal(t, uint32(1232), r.ranges[1].end) + require.Equal(t, uint32(13), r.ranges[1].value) + + // previously valid range should return key not found after pruning + value, err = r.GetValue(10) + require.Error(t, err, errKeyNotFound) + + value, err = r.GetValue(999) + require.NoError(t, err) + require.Equal(t, uint32(3), value) + + value, err = r.GetValue(1200) + require.NoError(t, err) + require.Equal(t, uint32(13), value) + + // something newer than what is in ranges should return running value + value, err = r.GetValue(3000) + require.NoError(t, err) + require.Equal(t, uint32(13), value) +} diff --git a/version/version.go b/version/version.go index 1df2fdae5..a4b4ba000 100644 --- a/version/version.go +++ b/version/version.go @@ -14,4 +14,4 @@ package version -const Version = "1.4.4" +const Version = "1.4.5"