Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-08-25 14:46:19 +05:30
17 changed files with 547 additions and 85 deletions
+39
View File
@@ -2,6 +2,45 @@
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.4.5] - 2023-08-22
### Added
- Add ability to roll back video layer selection. (#1871)
- Allow listing ingress by id (#1874)
- E2EE trailer for server injected packets. (#1908)
- Add support for ingress URL pull (#1938 #1939)
- (experimental) Add control of playout delay (#1838 #1930)
- Add option to advertise external ip only (#1962)
- Allow data packet to be sent to participants by identity (#1982)
### Fixed
- Fix RTC IP when binding to 0.0.0.0 (#1862)
- Prevent anachronous sample reading in connection stats (#1863)
- Fixed resubscribe race due to desire changed before cleaning up (#1865)
- Fixed numPublisher computation by marking dirty after track published changes (#1878)
- Attempt to avoid out-of-order max subscribed layer notifications. (#1882)
- Improved packet loss handling for SVC codecs (#1912 )
- Frame integrity check for SVC codecs (#1914)
- Issue full reconnect if subscriber PC is closed on ICERestart (#1919)
- Do not post max layer event for audio. (#1932)
- Never use dd tracker for non-svc codec (#1952)
- Fix race condition causing new participants to have stale room metadata (#1969)
- Fixed VP9 handling for non-SVC content. (#1973)
- Ensure older session does not clobber newer session. (#1974)
- Do not start RTPStats on a padding packet. (#1984)
### Changed
- Push track quality to poor on a bandwidth constrained pause (#1867)
- AV sync improvements (#1875 #1892 #1944 #1951 #1955 #1956 #1968 #1971 #1986)
- Do not send unnecessary room updates when content isn't changed (#1881)
- start reading signal messages before session handler finishes (#1883)
- changing key file permissions control to allow group readable (#1893)
- close disconnected participants when signal channel fails (#1895)
- Improved stream allocator handling during transitions and reallocation. (#1905 #1906)
- Stream allocator tweaks to reduce re-allocation (#1936)
- Reduce NACK traffic by delaying retransmission after first send. (#1918)
- Temper stream allocator more to avoid false negative downgrades (#1920)
## [1.4.4] - 2023-07-08
### Added
+2 -2
View File
@@ -17,7 +17,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0
github.com/livekit/protocol v1.6.1
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
@@ -41,7 +41,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/thoas/go-funk v0.9.3
github.com/twitchtv/twirp v8.1.3+incompatible
github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f
github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3
github.com/urfave/cli/v2 v2.25.7
github.com/urfave/negroni/v3 v3.0.0
go.uber.org/atomic v1.11.0
+4 -4
View File
@@ -122,8 +122,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c h1:4udPqCusH93MK/7q8ZfDqcLJHGoQeKKsMi5b+/BpQvk=
github.com/livekit/mediatransportutil v0.0.0-20230815100155-96164dbcfd8c/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I=
github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14=
github.com/livekit/protocol v1.6.1 h1:MjRg/UBmynE636In1GD9PbrF2u/C10WwaVIkObsZYtk=
github.com/livekit/protocol v1.6.1/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
@@ -260,8 +260,8 @@ github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU=
github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f h1:A+MmlgpvrHLeUP8dkBVn4Pnf5Bp5Yk2OALm7SEJLLE8=
github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E=
github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 h1:YsXCA7ZdgFMgwDpNpYj4y2WPRVrOVVDAkQlFc477T54=
github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/urfave/negroni/v3 v3.0.0 h1:Vo8CeZfu1lFR9gW8GnAb6dOGCJyijfil9j/jKKc/JhU=
+1 -17
View File
@@ -886,23 +886,6 @@ func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParti
// signal connection methods
//
func (p *ParticipantImpl) GetAudioLevel() (level float64, active bool) {
level = 0
for _, pt := range p.GetPublishedTracks() {
mediaTrack := pt.(types.LocalMediaTrack)
if mediaTrack.Source() == livekit.TrackSource_MICROPHONE {
tl, ta := mediaTrack.GetAudioLevel()
if ta {
active = true
if tl > level {
level = tl
}
}
}
}
return
}
func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo {
numTracks := 0
minQuality := livekit.ConnectionQuality_EXCELLENT
@@ -1704,6 +1687,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
)
mid := p.TransportManager.GetPublisherMid(rtpReceiver)
if mid == "" {
p.pendingTracksLock.Unlock()
p.pubLogger.Warnw("could not get mid for track", nil, "trackID", track.ID())
return nil, false
}
+4 -2
View File
@@ -243,6 +243,8 @@ type Participant interface {
GetPublishedTracks() []MediaTrack
RemovePublishedTrack(track MediaTrack, willBeResumed bool, shouldClose bool)
GetAudioLevel() (smoothedLevel float64, active bool)
// HasPermission checks permission of the subscriber by identity. Returns true if subscriber is allowed to subscribe
// to the track with trackID
HasPermission(trackID livekit.TrackID, subIdentity livekit.ParticipantIdentity) bool
@@ -348,7 +350,6 @@ type LocalParticipant interface {
GetSubscribedParticipants() []livekit.ParticipantID
IsSubscribedTo(sid livekit.ParticipantID) bool
GetAudioLevel() (smoothedLevel float64, active bool)
GetConnectionQuality() *livekit.ConnectionQualityInfo
// server sent messages
@@ -445,6 +446,8 @@ type MediaTrack interface {
UpdateVideoLayers(layers []*livekit.VideoLayer)
IsSimulcast() bool
GetAudioLevel() (level float64, active bool)
Close(willBeResumed bool)
IsOpen() bool
@@ -480,7 +483,6 @@ type LocalMediaTrack interface {
SignalCid() string
HasSdpCid(cid string) bool
GetAudioLevel() (level float64, active bool)
GetConnectionScoreAndQuality() (float32, livekit.ConnectionQuality)
SetRTT(rtt uint32)
@@ -48,6 +48,18 @@ type FakeMediaTrack struct {
getAllSubscribersReturnsOnCall map[int]struct {
result1 []livekit.ParticipantID
}
GetAudioLevelStub func() (float64, bool)
getAudioLevelMutex sync.RWMutex
getAudioLevelArgsForCall []struct {
}
getAudioLevelReturns struct {
result1 float64
result2 bool
}
getAudioLevelReturnsOnCall map[int]struct {
result1 float64
result2 bool
}
GetNumSubscribersStub func() int
getNumSubscribersMutex sync.RWMutex
getNumSubscribersArgsForCall []struct {
@@ -478,6 +490,62 @@ func (fake *FakeMediaTrack) GetAllSubscribersReturnsOnCall(i int, result1 []live
}{result1}
}
func (fake *FakeMediaTrack) GetAudioLevel() (float64, bool) {
fake.getAudioLevelMutex.Lock()
ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)]
fake.getAudioLevelArgsForCall = append(fake.getAudioLevelArgsForCall, struct {
}{})
stub := fake.GetAudioLevelStub
fakeReturns := fake.getAudioLevelReturns
fake.recordInvocation("GetAudioLevel", []interface{}{})
fake.getAudioLevelMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeMediaTrack) GetAudioLevelCallCount() int {
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
return len(fake.getAudioLevelArgsForCall)
}
func (fake *FakeMediaTrack) GetAudioLevelCalls(stub func() (float64, bool)) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = stub
}
func (fake *FakeMediaTrack) GetAudioLevelReturns(result1 float64, result2 bool) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = nil
fake.getAudioLevelReturns = struct {
result1 float64
result2 bool
}{result1, result2}
}
func (fake *FakeMediaTrack) GetAudioLevelReturnsOnCall(i int, result1 float64, result2 bool) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = nil
if fake.getAudioLevelReturnsOnCall == nil {
fake.getAudioLevelReturnsOnCall = make(map[int]struct {
result1 float64
result2 bool
})
}
fake.getAudioLevelReturnsOnCall[i] = struct {
result1 float64
result2 bool
}{result1, result2}
}
func (fake *FakeMediaTrack) GetNumSubscribers() int {
fake.getNumSubscribersMutex.Lock()
ret, specificReturn := fake.getNumSubscribersReturnsOnCall[len(fake.getNumSubscribersArgsForCall)]
@@ -1640,6 +1708,8 @@ func (fake *FakeMediaTrack) Invocations() map[string][][]interface{} {
defer fake.closeMutex.RUnlock()
fake.getAllSubscribersMutex.RLock()
defer fake.getAllSubscribersMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
fake.getNumSubscribersMutex.RLock()
defer fake.getNumSubscribersMutex.RUnlock()
fake.getQualityForDimensionMutex.RLock()
@@ -43,6 +43,18 @@ type FakeParticipant struct {
debugInfoReturnsOnCall map[int]struct {
result1 map[string]interface{}
}
GetAudioLevelStub func() (float64, bool)
getAudioLevelMutex sync.RWMutex
getAudioLevelArgsForCall []struct {
}
getAudioLevelReturns struct {
result1 float64
result2 bool
}
getAudioLevelReturnsOnCall map[int]struct {
result1 float64
result2 bool
}
GetPublishedTrackStub func(livekit.TrackID) types.MediaTrack
getPublishedTrackMutex sync.RWMutex
getPublishedTrackArgsForCall []struct {
@@ -377,6 +389,62 @@ func (fake *FakeParticipant) DebugInfoReturnsOnCall(i int, result1 map[string]in
}{result1}
}
func (fake *FakeParticipant) GetAudioLevel() (float64, bool) {
fake.getAudioLevelMutex.Lock()
ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)]
fake.getAudioLevelArgsForCall = append(fake.getAudioLevelArgsForCall, struct {
}{})
stub := fake.GetAudioLevelStub
fakeReturns := fake.getAudioLevelReturns
fake.recordInvocation("GetAudioLevel", []interface{}{})
fake.getAudioLevelMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeParticipant) GetAudioLevelCallCount() int {
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
return len(fake.getAudioLevelArgsForCall)
}
func (fake *FakeParticipant) GetAudioLevelCalls(stub func() (float64, bool)) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = stub
}
func (fake *FakeParticipant) GetAudioLevelReturns(result1 float64, result2 bool) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = nil
fake.getAudioLevelReturns = struct {
result1 float64
result2 bool
}{result1, result2}
}
func (fake *FakeParticipant) GetAudioLevelReturnsOnCall(i int, result1 float64, result2 bool) {
fake.getAudioLevelMutex.Lock()
defer fake.getAudioLevelMutex.Unlock()
fake.GetAudioLevelStub = nil
if fake.getAudioLevelReturnsOnCall == nil {
fake.getAudioLevelReturnsOnCall = make(map[int]struct {
result1 float64
result2 bool
})
}
fake.getAudioLevelReturnsOnCall[i] = struct {
result1 float64
result2 bool
}{result1, result2}
}
func (fake *FakeParticipant) GetPublishedTrack(arg1 livekit.TrackID) types.MediaTrack {
fake.getPublishedTrackMutex.Lock()
ret, specificReturn := fake.getPublishedTrackReturnsOnCall[len(fake.getPublishedTrackArgsForCall)]
@@ -1236,6 +1304,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.closeMutex.RUnlock()
fake.debugInfoMutex.RLock()
defer fake.debugInfoMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
fake.getPublishedTrackMutex.RLock()
defer fake.getPublishedTrackMutex.RUnlock()
fake.getPublishedTracksMutex.RLock()
+16
View File
@@ -418,3 +418,19 @@ func (u *UpTrackManager) DebugInfo() map[string]interface{} {
return info
}
func (u *UpTrackManager) GetAudioLevel() (level float64, active bool) {
level = 0
for _, pt := range u.GetPublishedTracks() {
if pt.Source() == livekit.TrackSource_MICROPHONE {
tl, ta := pt.GetAudioLevel()
if ta {
active = true
if tl > level {
level = tl
}
}
}
}
return
}
+4 -1
View File
@@ -139,7 +139,10 @@ func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string
urlStr = urlObj.String()
}
sk := utils.NewGuid("")
var sk string
if req.InputType != livekit.IngressInput_URL_INPUT {
sk = utils.NewGuid("")
}
info := &livekit.IngressInfo{
IngressId: utils.NewGuid(utils.IngressPrefix),
+7 -3
View File
@@ -514,7 +514,7 @@ func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo)
if info.IngressId == "" {
return errors.New("Missing IngressId")
}
if info.StreamKey == "" {
if info.StreamKey == "" && info.InputType != livekit.IngressInput_URL_INPUT {
return errors.New("Missing StreamKey")
}
@@ -543,7 +543,9 @@ func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo)
results, err := tx.TxPipelined(s.ctx, func(p redis.Pipeliner) error {
p.HSet(s.ctx, IngressKey, info.IngressId, data)
p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId)
if info.StreamKey != "" {
p.HSet(s.ctx, StreamKeyKey, info.StreamKey, info.IngressId)
}
if oldRoom != info.RoomName {
if oldRoom != "" {
@@ -799,7 +801,9 @@ func (s *RedisStore) UpdateIngressState(ctx context.Context, ingressId string, s
func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) error {
tx := s.rc.TxPipeline()
tx.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId)
tx.HDel(s.ctx, StreamKeyKey, info.IngressId)
if info.StreamKey != "" {
tx.HDel(s.ctx, StreamKeyKey, info.StreamKey)
}
tx.HDel(s.ctx, IngressKey, info.IngressId)
tx.Del(s.ctx, IngressStatePrefix+info.IngressId)
if _, err := tx.Exec(s.ctx); err != nil {
+32 -27
View File
@@ -30,6 +30,7 @@ import (
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/sfu/audio"
"github.com/livekit/livekit-server/pkg/sfu/utils"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/mediatransportutil"
"github.com/livekit/mediatransportutil/pkg/bucket"
@@ -80,6 +81,8 @@ type Buffer struct {
closed atomic.Bool
mime string
snRangeMap *utils.RangeMap[uint32, uint32]
// supported feedbacks
latestTSForAudioLevelInitialized bool
latestTSForAudioLevel uint32
@@ -124,6 +127,7 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
mediaSSRC: ssrc,
videoPool: vp,
audioPool: ap,
snRangeMap: utils.NewRangeMap[uint32, uint32](100),
pliThrottle: int64(500 * time.Millisecond),
logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU),
}
@@ -404,42 +408,40 @@ func (b *Buffer) SetRTT(rtt uint32) {
}
func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
pktBuf, err := b.bucket.AddPacket(pkt)
if err != nil {
//
// Even when erroring, do
// 1. state update
// 2. TWCC just in case remote side is retransmitting an old packet for probing
//
// But, do not forward those packets
//
var rtpPacket rtp.Packet
if uerr := rtpPacket.Unmarshal(pkt); uerr == nil {
b.updateStreamState(&rtpPacket, arrivalTime)
b.processHeaderExtensions(&rtpPacket, arrivalTime)
}
var rtpPacket rtp.Packet
if err := rtpPacket.Unmarshal(pkt); err != nil {
b.logger.Errorw("could not unmarshal RTP packet", err)
return
}
extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime)
b.processHeaderExtensions(&rtpPacket, arrivalTime)
if !isOutOfOrder && len(rtpPacket.Payload) == 0 {
// drop padding only in-order packet
b.snRangeMap.IncValue(1)
return
}
// add to RTX buffer using sequence number after accounting for dropped padding only packets
snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber)
if err != nil {
b.logger.Errorw("could not get sequence number adjustment", err)
return
}
rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment)
_, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber)
if err != nil {
if err != bucket.ErrRTXPacket {
b.logger.Warnw("could not add RTP packet to bucket", err)
}
return
}
var p rtp.Packet
err = p.Unmarshal(pktBuf)
if err != nil {
b.logger.Warnw("error unmarshaling RTP packet", err)
return
}
b.updateStreamState(&p, arrivalTime)
b.processHeaderExtensions(&p, arrivalTime)
b.doNACKs()
b.doReports(arrivalTime)
ep := b.getExtPacket(&p, arrivalTime)
ep := b.getExtPacket(&rtpPacket, arrivalTime)
if ep == nil {
return
}
@@ -497,18 +499,21 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
}
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) {
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) {
flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime)
if b.nacker != nil {
b.nacker.Remove(p.SequenceNumber)
if flowState.HasLoss {
b.snRangeMap.AddRange(flowState.LossStartInclusive, flowState.LossEndExclusive)
for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ {
b.nacker.Push(lost)
b.nacker.Push(uint16(lost))
}
}
}
return flowState.ExtSeqNumber, flowState.IsOutOfOrder
}
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
+33 -8
View File
@@ -65,8 +65,12 @@ func (d driftResult) String() string {
type RTPFlowState struct {
HasLoss bool
LossStartInclusive uint16
LossEndExclusive uint16
LossStartInclusive uint32
LossEndExclusive uint32
IsOutOfOrder bool
ExtSeqNumber uint32
}
type IntervalStats struct {
@@ -449,14 +453,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
}
flowState.IsOutOfOrder = true
cycles := r.cycles
if rtph.SequenceNumber > r.highestSN {
cycles--
}
flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, cycles)
// in-order
default:
if diff > 1 {
flowState.HasLoss = true
flowState.LossStartInclusive = r.highestSN + 1
flowState.LossEndExclusive = rtph.SequenceNumber
}
// update gap histogram
r.updateGapHistogram(int(diff))
@@ -466,6 +472,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false)
if diff > 1 {
flowState.HasLoss = true
cycles := r.cycles
if r.highestSN+1 < r.highestSN {
cycles++
}
flowState.LossStartInclusive = getExtSN(r.highestSN+1, cycles)
}
if rtph.SequenceNumber < r.highestSN && !first {
r.cycles++
}
@@ -481,6 +497,11 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
r.highestTime = packetTime
}
if flowState.HasLoss {
flowState.LossEndExclusive = getExtSN(rtph.SequenceNumber, r.cycles)
}
flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, r.cycles)
}
if !isDuplicate {
@@ -1733,6 +1754,10 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snaps
// ----------------------------------
func getExtSN(sn uint16, cycles uint16) uint32 {
return (uint32(cycles) << 16) | uint32(sn)
}
func getExtTS(ts uint32, cycles uint32) uint64 {
return (uint64(cycles) << 32) | uint64(ts)
}
+2 -2
View File
@@ -126,8 +126,8 @@ func TestRTPStats_Update(t *testing.T) {
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
require.True(t, flowState.HasLoss)
require.Equal(t, sequenceNumber-9, flowState.LossStartInclusive)
require.Equal(t, sequenceNumber, flowState.LossEndExclusive)
require.Equal(t, uint32(sequenceNumber-9), flowState.LossStartInclusive)
require.Equal(t, uint32(sequenceNumber), flowState.LossEndExclusive)
require.Equal(t, uint32(17), r.packetsLost)
// out-of-order should decrement number of lost packets
+17 -18
View File
@@ -154,12 +154,13 @@ type TranslationParams struct {
// -------------------------------------------------------------------
type ForwarderState struct {
Started bool
PreStartTime time.Time
FirstTS uint32
RefTSOffset uint32
RTP RTPMungerState
Codec interface{}
Started bool
ReferenceLayerSpatial int32
PreStartTime time.Time
FirstTS uint32
RefTSOffset uint32
RTP RTPMungerState
Codec interface{}
}
func (f ForwarderState) String() string {
@@ -168,8 +169,9 @@ func (f ForwarderState) String() string {
case codecmunger.VP8State:
codecString = codecState.String()
}
return fmt.Sprintf("ForwarderState{started: %v, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}",
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}",
f.Started,
f.ReferenceLayerSpatial,
f.PreStartTime.String(),
f.FirstTS,
f.RefTSOffset,
@@ -330,12 +332,13 @@ func (f *Forwarder) GetState() ForwarderState {
}
return ForwarderState{
Started: f.started,
PreStartTime: f.preStartTime,
FirstTS: f.firstTS,
RefTSOffset: f.refTSOffset,
RTP: f.rtpMunger.GetLast(),
Codec: f.codecMunger.GetState(),
Started: f.started,
ReferenceLayerSpatial: f.referenceLayerSpatial,
PreStartTime: f.preStartTime,
FirstTS: f.firstTS,
RefTSOffset: f.refTSOffset,
RTP: f.rtpMunger.GetLast(),
Codec: f.codecMunger.GetState(),
}
}
@@ -351,6 +354,7 @@ func (f *Forwarder) SeedState(state ForwarderState) {
f.codecMunger.SeedState(state.Codec)
f.started = true
f.referenceLayerSpatial = state.ReferenceLayerSpatial
f.preStartTime = state.PreStartTime
f.firstTS = state.FirstTS
f.refTSOffset = state.RefTSOffset
@@ -1451,11 +1455,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
)
}
if f.referenceLayerSpatial == buffer.InvalidLayerSpatial {
// on a resume, reference layer may not be set, so only set when it is invalid
f.referenceLayerSpatial = layer
}
// Compute how much time passed between the previous forwarded packet
// and the current incoming (to be forwarded) packet and calculate
// timestamp offset on source change.
+123
View File
@@ -0,0 +1,123 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"errors"
"math"
"unsafe"
)
const (
minRanges = 1
)
var (
errReversedOrder = errors.New("end is before start")
errKeyNotFound = errors.New("key not found")
)
type rangeType interface {
uint32
}
type valueType interface {
uint32
}
type rangeVal[RT rangeType, VT valueType] struct {
start RT
end RT
value VT
}
type RangeMap[RT rangeType, VT valueType] struct {
halfRange RT
size int
ranges []rangeVal[RT, VT]
runningValue VT
}
func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] {
var t RT
return &RangeMap[RT, VT]{
halfRange: 1 << ((unsafe.Sizeof(t) * 8) - 1),
size: int(math.Max(float64(size), float64(minRanges))),
}
}
func (r *RangeMap[RT, VT]) IncValue(inc VT) {
r.runningValue += inc
}
func (r *RangeMap[RT, VT]) AddRange(startInclusive RT, endExclusive RT) error {
if endExclusive-startInclusive > r.halfRange {
return errReversedOrder
}
isNewRange := true
// check if last range can be extended
if len(r.ranges) != 0 {
lr := &r.ranges[len(r.ranges)-1]
if startInclusive <= lr.end {
return errReversedOrder
}
if lr.value == r.runningValue {
lr.end = endExclusive - 1
isNewRange = false
} else {
// end last range before start and start a new range
lr.end = startInclusive - 1
}
}
if isNewRange {
r.ranges = append(r.ranges, rangeVal[RT, VT]{
start: startInclusive,
end: endExclusive - 1,
value: r.runningValue,
})
}
r.prune()
return nil
}
func (r *RangeMap[RT, VT]) GetValue(key RT) (VT, error) {
numRanges := len(r.ranges)
if numRanges != 0 {
if key > r.ranges[numRanges-1].end {
return r.runningValue, nil
}
if key < r.ranges[0].start {
return 0, errKeyNotFound
}
}
for _, rv := range r.ranges {
if key-rv.start < r.halfRange && rv.end-key < r.halfRange {
return rv.value, nil
}
}
return r.runningValue, nil
}
func (r *RangeMap[RT, VT]) prune() {
if len(r.ranges) > r.size {
r.ranges = r.ranges[len(r.ranges)-r.size:]
}
}
+122
View File
@@ -0,0 +1,122 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestRangeMapUint32(t *testing.T) {
r := NewRangeMap[uint32, uint32](2)
// getting value for any key should be 0 default
value, err := r.GetValue(33333)
require.NoError(t, err)
require.Equal(t, uint32(0), value)
value, err = r.GetValue(0xffffffff)
require.NoError(t, err)
require.Equal(t, uint32(0), value)
// getting value for any key should be incremented value
r.IncValue(2)
value, err = r.GetValue(66666666)
require.NoError(t, err)
require.Equal(t, uint32(2), value)
value, err = r.GetValue(0)
require.NoError(t, err)
require.Equal(t, uint32(2), value)
// add a couple of ranges, as the value is same should just extend
err = r.AddRange(10, 20)
require.NoError(t, err)
err = r.AddRange(30, 40)
require.NoError(t, err)
require.Equal(t, 1, len(r.ranges))
require.Equal(t, uint32(10), r.ranges[0].start)
require.Equal(t, uint32(39), r.ranges[0].end)
require.Equal(t, uint32(2), r.ranges[0].value)
// bump value
r.IncValue(1)
// getting value in previously added range should return 2
value, err = r.GetValue(22)
require.NoError(t, err)
require.Equal(t, uint32(2), value)
// outside range should return 3
value, err = r.GetValue(662)
require.NoError(t, err)
require.Equal(t, uint32(3), value)
// adding out-of-order range should return error
err = r.AddRange(60, 50)
require.Error(t, err, errReversedOrder)
// adding overlapping should return error
err = r.AddRange(30, 50)
require.Error(t, err, errReversedOrder)
// adding a non-overlapping range should extend previous range and add new one
err = r.AddRange(50, 60)
require.NoError(t, err)
require.Equal(t, 2, len(r.ranges))
require.Equal(t, uint32(10), r.ranges[0].start)
require.Equal(t, uint32(49), r.ranges[0].end)
require.Equal(t, uint32(2), r.ranges[0].value)
require.Equal(t, uint32(50), r.ranges[1].start)
require.Equal(t, uint32(59), r.ranges[1].end)
require.Equal(t, uint32(3), r.ranges[1].value)
// getting an old value should not succeed, but start of first range should return no error
value, err = r.GetValue(9)
require.Error(t, err, errKeyNotFound)
value, err = r.GetValue(10)
require.NoError(t, err)
require.Equal(t, uint32(2), value)
// adding another range should prune the first one as size if set to 2
r.IncValue(10)
err = r.AddRange(1000, 1233)
require.NoError(t, err)
require.Equal(t, 2, len(r.ranges))
require.Equal(t, uint32(50), r.ranges[0].start)
require.Equal(t, uint32(999), r.ranges[0].end)
require.Equal(t, uint32(3), r.ranges[0].value)
require.Equal(t, uint32(1000), r.ranges[1].start)
require.Equal(t, uint32(1232), r.ranges[1].end)
require.Equal(t, uint32(13), r.ranges[1].value)
// previously valid range should return key not found after pruning
value, err = r.GetValue(10)
require.Error(t, err, errKeyNotFound)
value, err = r.GetValue(999)
require.NoError(t, err)
require.Equal(t, uint32(3), value)
value, err = r.GetValue(1200)
require.NoError(t, err)
require.Equal(t, uint32(13), value)
// something newer than what is in ranges should return running value
value, err = r.GetValue(3000)
require.NoError(t, err)
require.Equal(t, uint32(13), value)
}
+1 -1
View File
@@ -14,4 +14,4 @@
package version
const Version = "1.4.4"
const Version = "1.4.5"