Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-09-18 09:36:56 +05:30
18 changed files with 701 additions and 459 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f
github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e
github.com/livekit/psrpc v0.3.3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -127,8 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg=
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32 h1:5PdmCpGGXA2hz1pKGgKSJYTjmk3Kkm+kNiW5NOFARCI=
github.com/livekit/protocol v1.7.3-0.20230911160509-47d330eafb32/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e h1:WEet0iH/JazBFNhhH+YuZHtXpKefb7mnbCC2al3peyA=
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
+20 -3
View File
@@ -56,7 +56,7 @@ const (
sdBatchSize = 30
rttUpdateInterval = 5 * time.Second
disconnectCleanupDuration = 15 * time.Second
disconnectCleanupDuration = 5 * time.Second
migrationWaitDuration = 3 * time.Second
)
@@ -561,7 +561,9 @@ func (p *ParticipantImpl) HandleSignalSourceClose() {
p.TransportManager.SetSignalSourceValid(false)
if !p.TransportManager.HasPublisherEverConnected() && !p.TransportManager.HasSubscriberEverConnected() {
p.params.Logger.Infow("closing disconnected participant")
p.params.Logger.Infow("closing disconnected participant",
"reason", types.ParticipantCloseReasonJoinFailed,
)
_ = p.Close(false, types.ParticipantCloseReasonJoinFailed, false)
}
}
@@ -1402,7 +1404,9 @@ func (p *ParticipantImpl) setupDisconnectTimer() {
if p.IsClosed() || p.IsDisconnected() {
return
}
p.params.Logger.Infow("closing disconnected participant")
p.params.Logger.Infow("closing disconnected participant",
"reason", types.ParticipantCloseReasonPeerConnectionDisconnected,
)
_ = p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected, false)
})
p.lock.Unlock()
@@ -1633,6 +1637,19 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
return ti
}
func (p *ParticipantImpl) GetPendingTrack(trackID livekit.TrackID) *livekit.TrackInfo {
p.pendingTracksLock.RLock()
defer p.pendingTracksLock.RUnlock()
for _, t := range p.pendingTracks {
if livekit.TrackID(t.trackInfos[0].Sid) == trackID {
return t.trackInfos[0]
}
}
return nil
}
func (p *ParticipantImpl) sendTrackPublished(cid string, ti *livekit.TrackInfo) {
p.pubLogger.Debugw("sending track published", "cid", cid, "trackInfo", ti.String())
_ = p.writeMessage(&livekit.SignalResponse{
+6 -1
View File
@@ -581,6 +581,10 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync
break
}
}
if !found {
// is there a pending track?
found = participant.GetPendingTrack(livekit.TrackID(ti.Sid)) != nil
}
if !found {
pLogger.Warnw("unknown track during resume", nil, "trackID", ti.Sid)
shouldReconnect = true
@@ -1331,7 +1335,8 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp
utils.ParallelExec(destParticipants, dataForwardLoadBalanceThreshold, 1, func(op types.LocalParticipant) {
err := op.SendDataPacket(dp, dpData)
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) {
if err != nil && !errors.Is(err, io.ErrClosedPipe) && !errors.Is(err, sctp.ErrStreamClosed) &&
!errors.Is(err, ErrTransportFailure) {
op.GetLogger().Infow("send data packet error", "error", err)
}
})
+1 -1
View File
@@ -60,7 +60,7 @@ const (
dtlsRetransmissionInterval = 100 * time.Millisecond
iceDisconnectedTimeout = 10 * time.Second // compatible for ice-lite with firefox client
iceFailedTimeout = 25 * time.Second // pion's default
iceFailedTimeout = 5 * time.Second // time between disconnected and failed
iceKeepaliveInterval = 2 * time.Second // pion's default
minTcpICEConnectTimeout = 5 * time.Second
+2 -1
View File
@@ -245,7 +245,7 @@ type Participant interface {
SetMetadata(metadata string)
IsPublisher() bool
GetPublishedTrack(sid livekit.TrackID) MediaTrack
GetPublishedTrack(trackID livekit.TrackID) MediaTrack
GetPublishedTracks() []MediaTrack
RemovePublishedTrack(track MediaTrack, willBeResumed bool, shouldClose bool)
@@ -315,6 +315,7 @@ type LocalParticipant interface {
GetICEConnectionType() ICEConnectionType
GetBufferFactory() *buffer.Factory
GetPlayoutDelayConfig() *livekit.PlayoutDelay
GetPendingTrack(trackID livekit.TrackID) *livekit.TrackInfo
SetResponseSink(sink routing.MessageSink)
CloseSignalConnection(reason SignallingCloseReason)
@@ -263,6 +263,17 @@ type FakeLocalParticipant struct {
getPacerReturnsOnCall map[int]struct {
result1 pacer.Pacer
}
GetPendingTrackStub func(livekit.TrackID) *livekit.TrackInfo
getPendingTrackMutex sync.RWMutex
getPendingTrackArgsForCall []struct {
arg1 livekit.TrackID
}
getPendingTrackReturns struct {
result1 *livekit.TrackInfo
}
getPendingTrackReturnsOnCall map[int]struct {
result1 *livekit.TrackInfo
}
GetPlayoutDelayConfigStub func() *livekit.PlayoutDelay
getPlayoutDelayConfigMutex sync.RWMutex
getPlayoutDelayConfigArgsForCall []struct {
@@ -2169,6 +2180,67 @@ func (fake *FakeLocalParticipant) GetPacerReturnsOnCall(i int, result1 pacer.Pac
}{result1}
}
func (fake *FakeLocalParticipant) GetPendingTrack(arg1 livekit.TrackID) *livekit.TrackInfo {
fake.getPendingTrackMutex.Lock()
ret, specificReturn := fake.getPendingTrackReturnsOnCall[len(fake.getPendingTrackArgsForCall)]
fake.getPendingTrackArgsForCall = append(fake.getPendingTrackArgsForCall, struct {
arg1 livekit.TrackID
}{arg1})
stub := fake.GetPendingTrackStub
fakeReturns := fake.getPendingTrackReturns
fake.recordInvocation("GetPendingTrack", []interface{}{arg1})
fake.getPendingTrackMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetPendingTrackCallCount() int {
fake.getPendingTrackMutex.RLock()
defer fake.getPendingTrackMutex.RUnlock()
return len(fake.getPendingTrackArgsForCall)
}
func (fake *FakeLocalParticipant) GetPendingTrackCalls(stub func(livekit.TrackID) *livekit.TrackInfo) {
fake.getPendingTrackMutex.Lock()
defer fake.getPendingTrackMutex.Unlock()
fake.GetPendingTrackStub = stub
}
func (fake *FakeLocalParticipant) GetPendingTrackArgsForCall(i int) livekit.TrackID {
fake.getPendingTrackMutex.RLock()
defer fake.getPendingTrackMutex.RUnlock()
argsForCall := fake.getPendingTrackArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) GetPendingTrackReturns(result1 *livekit.TrackInfo) {
fake.getPendingTrackMutex.Lock()
defer fake.getPendingTrackMutex.Unlock()
fake.GetPendingTrackStub = nil
fake.getPendingTrackReturns = struct {
result1 *livekit.TrackInfo
}{result1}
}
func (fake *FakeLocalParticipant) GetPendingTrackReturnsOnCall(i int, result1 *livekit.TrackInfo) {
fake.getPendingTrackMutex.Lock()
defer fake.getPendingTrackMutex.Unlock()
fake.GetPendingTrackStub = nil
if fake.getPendingTrackReturnsOnCall == nil {
fake.getPendingTrackReturnsOnCall = make(map[int]struct {
result1 *livekit.TrackInfo
})
}
fake.getPendingTrackReturnsOnCall[i] = struct {
result1 *livekit.TrackInfo
}{result1}
}
func (fake *FakeLocalParticipant) GetPlayoutDelayConfig() *livekit.PlayoutDelay {
fake.getPlayoutDelayConfigMutex.Lock()
ret, specificReturn := fake.getPlayoutDelayConfigReturnsOnCall[len(fake.getPlayoutDelayConfigArgsForCall)]
@@ -5829,6 +5901,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.getLoggerMutex.RUnlock()
fake.getPacerMutex.RLock()
defer fake.getPacerMutex.RUnlock()
fake.getPendingTrackMutex.RLock()
defer fake.getPendingTrackMutex.RUnlock()
fake.getPlayoutDelayConfigMutex.RLock()
defer fake.getPlayoutDelayConfigMutex.RUnlock()
fake.getPublishedTrackMutex.RLock()
+1
View File
@@ -38,6 +38,7 @@ var (
ErrPermissionDenied = errors.New("permissions denied")
ErrMissingAuthorization = errors.New("invalid authorization header. Must start with " + bearerPrefix)
ErrInvalidAuthorizationToken = errors.New("invalid authorization token")
ErrInvalidAPIKey = errors.New("invalid API key")
)
// authentication middleware
+52 -25
View File
@@ -66,6 +66,7 @@ type RoomManager struct {
clientConfManager clientconfiguration.ClientConfigurationManager
egressLauncher rtc.EgressLauncher
versionGenerator utils.TimedVersionGenerator
turnAuthHandler *TURNAuthHandler
rooms map[livekit.RoomName]*rtc.Room
@@ -81,6 +82,7 @@ func NewLocalRoomManager(
clientConfManager clientconfiguration.ClientConfigurationManager,
egressLauncher rtc.EgressLauncher,
versionGenerator utils.TimedVersionGenerator,
turnAuthHandler *TURNAuthHandler,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf)
if err != nil {
@@ -97,6 +99,7 @@ func NewLocalRoomManager(
clientConfManager: clientConfManager,
egressLauncher: egressLauncher,
versionGenerator: versionGenerator,
turnAuthHandler: turnAuthHandler,
rooms: make(map[livekit.RoomName]*rtc.Room),
@@ -244,6 +247,10 @@ func (r *RoomManager) StartSession(
return nil
}
// should not error out, error is logged in iceServersForParticipant even if it fails
// since this is used for TURN server credentials, we don't want to fail the request even if there's no TURN for the session
apiKey, _, _ := r.getFirstKeyPair()
participant := room.GetParticipant(pi.Identity)
if participant != nil {
// When reconnecting, it means WS has interrupted but underlying peer connection is still ok in this state,
@@ -286,8 +293,9 @@ func (r *RoomManager) StartSession(
participant,
requestSource,
responseSink,
r.iceServersForRoom(
protoRoom,
r.iceServersForParticipant(
apiKey,
participant,
iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS,
),
pi.ReconnectReason,
@@ -411,7 +419,8 @@ func (r *RoomManager) StartSession(
opts := rtc.ParticipantOptions{
AutoSubscribe: pi.AutoSubscribe,
}
if err = room.Join(participant, requestSource, &opts, r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)); err != nil {
iceServers := r.iceServersForParticipant(apiKey, participant, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)
if err = room.Join(participant, requestSource, &opts, iceServers); err != nil {
pLogger.Errorw("could not join room", err)
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false)
return err
@@ -684,7 +693,7 @@ func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.Roo
}
}
func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livekit.ICEServer {
func (r *RoomManager) iceServersForParticipant(apiKey string, participant types.LocalParticipant, tlsOnly bool) []*livekit.ICEServer {
var iceServers []*livekit.ICEServer
rtcConf := r.config.RTC
@@ -705,11 +714,19 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livek
urls = append(urls, fmt.Sprintf("turns:%s:443?transport=tcp", r.config.TURN.Domain))
}
if len(urls) > 0 {
iceServers = append(iceServers, &livekit.ICEServer{
Urls: urls,
Username: ri.Name,
Credential: ri.TurnPassword,
})
username := r.turnAuthHandler.CreateUsername(apiKey, participant.ID())
password, err := r.turnAuthHandler.CreatePassword(apiKey, participant.ID())
if err != nil {
participant.GetLogger().Warnw("could not create turn password", err)
hasSTUN = false
} else {
logger.Infow("created TURN password", "username", username, "password", password)
iceServers = append(iceServers, &livekit.ICEServer{
Urls: urls,
Username: username,
Credential: password,
})
}
}
}
@@ -746,23 +763,26 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livek
}
func (r *RoomManager) refreshToken(participant types.LocalParticipant) error {
for key, secret := range r.config.Keys {
grants := participant.ClaimGrants()
token := auth.NewAccessToken(key, secret)
token.SetName(grants.Name).
SetIdentity(string(participant.Identity())).
SetValidFor(tokenDefaultTTL).
SetMetadata(grants.Metadata).
AddGrant(grants.Video)
jwt, err := token.ToJWT()
if err == nil {
err = participant.SendRefreshToken(jwt)
}
if err != nil {
return err
}
break
key, secret, err := r.getFirstKeyPair()
if err != nil {
return err
}
grants := participant.ClaimGrants()
token := auth.NewAccessToken(key, secret)
token.SetName(grants.Name).
SetIdentity(string(participant.Identity())).
SetValidFor(tokenDefaultTTL).
SetMetadata(grants.Metadata).
AddGrant(grants.Video)
jwt, err := token.ToJWT()
if err == nil {
err = participant.SendRefreshToken(jwt)
}
if err != nil {
return err
}
return nil
}
@@ -786,6 +806,13 @@ func (r *RoomManager) getIceConfig(participant types.LocalParticipant) *livekit.
return iceConfigCacheEntry.iceConfig
}
func (r *RoomManager) getFirstKeyPair() (string, string, error) {
for key, secret := range r.config.Keys {
return key, secret, nil
}
return "", "", errors.New("no API keys configured")
}
// ------------------------------------
func iceServerForStunServers(servers []string) *livekit.ICEServer {
+46 -9
View File
@@ -15,14 +15,18 @@
package service
import (
"context"
"crypto/sha256"
"crypto/tls"
"fmt"
"net"
"strconv"
"strings"
"github.com/jxskiss/base62"
"github.com/pion/turn/v2"
"github.com/pkg/errors"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/logger/pionlogger"
@@ -142,14 +146,47 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler, standalone
return turn.NewServer(serverConfig)
}
func newTurnAuthHandler(roomStore ObjectStore) turn.AuthHandler {
return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
// room id should be the username, create a hashed room id
rm, _, err := roomStore.LoadRoom(context.Background(), livekit.RoomName(username), false)
if err != nil {
return nil, false
}
func getTURNAuthHandlerFunc(handler *TURNAuthHandler) turn.AuthHandler {
return handler.HandleAuth
}
return turn.GenerateAuthKey(username, LivekitRealm, rm.TurnPassword), true
type TURNAuthHandler struct {
keyProvider auth.KeyProvider
}
func NewTURNAuthHandler(keyProvider auth.KeyProvider) *TURNAuthHandler {
return &TURNAuthHandler{
keyProvider: keyProvider,
}
}
func (h *TURNAuthHandler) CreateUsername(apiKey string, pID livekit.ParticipantID) string {
return base62.EncodeToString([]byte(fmt.Sprintf("%s|%s", apiKey, pID)))
}
func (h *TURNAuthHandler) CreatePassword(apiKey string, pID livekit.ParticipantID) (string, error) {
secret := h.keyProvider.GetSecret(apiKey)
if secret == "" {
return "", ErrInvalidAPIKey
}
keyInput := fmt.Sprintf("%s|%s", secret, pID)
sum := sha256.Sum256([]byte(keyInput))
return base62.EncodeToString(sum[:]), nil
}
func (h *TURNAuthHandler) HandleAuth(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
decoded, err := base62.DecodeString(username)
if err != nil {
return nil, false
}
parts := strings.Split(string(decoded), "|")
if len(parts) != 2 {
return nil, false
}
password, err := h.CreatePassword(parts[0], livekit.ParticipantID(parts[1]))
if err != nil {
logger.Warnw("could not create TURN password", err, "username", username)
return nil, false
}
return turn.GenerateAuthKey(username, LivekitRealm, password), true
}
+2 -1
View File
@@ -73,7 +73,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewDefaultSignalServer,
routing.NewSignalClient,
NewLocalRoomManager,
newTurnAuthHandler,
NewTURNAuthHandler,
getTURNAuthHandlerFunc,
newInProcessTurnServer,
utils.NewDefaultTimedVersionGenerator,
NewLivekitServer,
+3 -2
View File
@@ -87,7 +87,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, telemetryService)
clientConfigurationManager := createClientConfiguration()
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator)
turnAuthHandler := NewTURNAuthHandler(keyProvider)
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler)
if err != nil {
return nil, err
}
@@ -95,7 +96,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
authHandler := newTurnAuthHandler(objectStore)
authHandler := getTURNAuthHandlerFunc(turnAuthHandler)
server, err := newInProcessTurnServer(conf, authHandler)
if err != nil {
return nil, err
+96 -184
View File
@@ -30,8 +30,6 @@ const (
cGapHistogramNumBins = 101
cNumSequenceNumbers = 65536
cFirstSnapshotID = 1
cSnInfoSize = 8192
cSnInfoMask = cSnInfoSize - 1
cFirstPacketTimeAdjustWindow = 2 * time.Minute
cFirstPacketTimeAdjustThreshold = 5 * time.Second
@@ -53,18 +51,6 @@ func RTPDriftToString(r *livekit.RTPDrift) string {
// -------------------------------------------------------
type intervalStats struct {
packets uint64
bytes uint64
headerBytes uint64
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
packetsLost uint64
packetsOutOfOrder uint64
frames uint32
}
type RTPDeltaInfo struct {
StartTime time.Time
Duration time.Duration
@@ -89,25 +75,34 @@ type RTPDeltaInfo struct {
}
type snapshot struct {
startTime time.Time
extStartSN uint64
packetsDuplicate uint64
bytesDuplicate uint64
headerBytesDuplicate uint64
packetsLostOverridden uint64
nacks uint32
plis uint32
firs uint32
maxRtt uint32
maxJitter float64
}
isValid bool
type snInfo struct {
hdrSize uint16
pktSize uint16
isPaddingOnly bool
marker bool
isOutOfOrder bool
startTime time.Time
extStartSN uint64
bytes uint64
headerBytes uint64
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
packetsDuplicate uint64
bytesDuplicate uint64
headerBytesDuplicate uint64
packetsOutOfOrder uint64
packetsLost uint64
frames uint32
nacks uint32
plis uint32
firs uint32
maxRtt uint32
maxJitter float64
}
type RTCPSenderReportData struct {
@@ -153,16 +148,13 @@ type rtpStatsBase struct {
packetsOutOfOrder uint64
packetsLost uint64
packetsLostOverridden uint64
packetsLost uint64
frames uint32
jitter float64
maxJitter float64
snInfos [cSnInfoSize]snInfo
gapHistogram [cGapHistogramNumBins]uint32
nacks uint32
@@ -189,7 +181,7 @@ type rtpStatsBase struct {
srNewest *RTCPSenderReportData
nextSnapshotID uint32
snapshots map[uint32]*snapshot
snapshots []snapshot
}
func newRTPStatsBase(params RTPStatsParams) *rtpStatsBase {
@@ -197,7 +189,7 @@ func newRTPStatsBase(params RTPStatsParams) *rtpStatsBase {
params: params,
logger: params.Logger,
nextSnapshotID: cFirstSnapshotID,
snapshots: make(map[uint32]*snapshot),
snapshots: make([]snapshot, 2),
}
}
@@ -235,8 +227,6 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool {
r.jitter = from.jitter
r.maxJitter = from.maxJitter
r.snInfos = from.snInfos
r.gapHistogram = from.gapHistogram
r.nacks = from.nacks
@@ -273,10 +263,8 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool {
}
r.nextSnapshotID = from.nextSnapshotID
for id, ss := range from.snapshots {
ssCopy := *ss
r.snapshots[id] = &ssCopy
}
r.snapshots = make([]snapshot, cap(from.snapshots))
copy(r.snapshots, from.snapshots)
return true
}
@@ -295,11 +283,14 @@ func (r *rtpStatsBase) newSnapshotID(extStartSN uint64) uint32 {
id := r.nextSnapshotID
r.nextSnapshotID++
if cap(r.snapshots) < int(r.nextSnapshotID-cFirstSnapshotID) {
snapshots := make([]snapshot, r.nextSnapshotID-cFirstSnapshotID)
copy(snapshots, r.snapshots)
r.snapshots = snapshots
}
if r.initialized {
r.snapshots[id] = &snapshot{
startTime: time.Now(),
extStartSN: extStartSN,
}
r.snapshots[id-cFirstSnapshotID] = r.initSnapshot(time.Now(), extStartSN)
}
return id
}
@@ -450,7 +441,8 @@ func (r *rtpStatsBase) UpdateRtt(rtt uint32) {
r.maxRtt = rtt
}
for _, s := range r.snapshots {
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
s := &r.snapshots[i]
if rtt > s.maxRtt {
s.maxRtt = rtt
}
@@ -528,7 +520,6 @@ func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) u
func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) *RTPDeltaInfo {
then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN)
if now == nil || then == nil {
return nil
}
@@ -551,21 +542,25 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
}
}
intervalStats := r.getIntervalStats(then.extStartSN, now.extStartSN, extHighestSN)
packetsLost := uint32(now.packetsLost - then.packetsLost)
if int32(packetsLost) < 0 {
packetsLost = 0
}
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
Packets: uint32(packetsExpected - intervalStats.packetsPadding),
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
Packets: uint32(packetsExpected - (now.packetsPadding - then.packetsPadding)),
Bytes: now.bytes - then.bytes,
HeaderBytes: now.headerBytes - then.headerBytes,
PacketsDuplicate: uint32(now.packetsDuplicate - then.packetsDuplicate),
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate,
PacketsPadding: uint32(intervalStats.packetsPadding),
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: uint32(intervalStats.packetsLost),
Frames: intervalStats.frames,
PacketsPadding: uint32(now.packetsPadding - then.packetsPadding),
BytesPadding: now.bytesPadding - then.bytesPadding,
HeaderBytesPadding: now.headerBytesPadding - then.headerBytesPadding,
PacketsLost: packetsLost,
PacketsOutOfOrder: uint32(now.packetsOutOfOrder - then.packetsOutOfOrder),
Frames: now.frames - then.frames,
RttMax: then.maxRtt,
JitterMax: then.maxJitter / float64(r.params.ClockRate) * 1e6,
Nacks: now.nacks - then.nacks,
@@ -751,107 +746,6 @@ func (r *rtpStatsBase) toProto(
return p
}
func (r *rtpStatsBase) getSnInfoOutOfOrderSlot(esn uint64, ehsn uint64) int {
offset := int64(ehsn - esn)
if offset >= cSnInfoSize || offset < 0 {
// too old OR too new (i. e. ahead of highest)
return -1
}
return int(esn & cSnInfoMask)
}
func (r *rtpStatsBase) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) {
var slot int
if int64(esn-ehsn) < 0 {
slot = r.getSnInfoOutOfOrderSlot(esn, ehsn)
if slot < 0 {
return
}
} else {
slot = int(esn & cSnInfoMask)
}
snInfo := &r.snInfos[slot]
snInfo.pktSize = pktSize
snInfo.hdrSize = hdrSize
snInfo.isPaddingOnly = payloadSize == 0
snInfo.marker = marker
snInfo.isOutOfOrder = isOutOfOrder
}
func (r *rtpStatsBase) clearSnInfos(extStartInclusive uint64, extEndExclusive uint64) {
if extEndExclusive <= extStartInclusive {
return
}
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
snInfo := &r.snInfos[esn&cSnInfoMask]
snInfo.pktSize = 0
snInfo.hdrSize = 0
snInfo.isPaddingOnly = false
snInfo.marker = false
}
}
func (r *rtpStatsBase) isSnInfoLost(esn uint64, ehsn uint64) bool {
slot := r.getSnInfoOutOfOrderSlot(esn, ehsn)
if slot < 0 {
return false
}
return r.snInfos[slot].pktSize == 0
}
func (r *rtpStatsBase) getIntervalStats(extStartInclusive uint64, extEndExclusive uint64, ehsn uint64) (intervalStats intervalStats) {
packetsNotFound := uint32(0)
processESN := func(esn uint64, ehsn uint64) {
slot := r.getSnInfoOutOfOrderSlot(esn, ehsn)
if slot < 0 {
packetsNotFound++
return
}
snInfo := &r.snInfos[slot]
switch {
case snInfo.pktSize == 0:
intervalStats.packetsLost++
case snInfo.isPaddingOnly:
intervalStats.packetsPadding++
intervalStats.bytesPadding += uint64(snInfo.pktSize)
intervalStats.headerBytesPadding += uint64(snInfo.hdrSize)
default:
intervalStats.packets++
intervalStats.bytes += uint64(snInfo.pktSize)
intervalStats.headerBytes += uint64(snInfo.hdrSize)
if snInfo.isOutOfOrder {
intervalStats.packetsOutOfOrder++
}
}
if snInfo.marker {
intervalStats.frames++
}
}
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
processESN(esn, ehsn)
}
if packetsNotFound != 0 {
r.logger.Errorw(
"could not find some packets", nil,
"start", extStartInclusive,
"end", extEndExclusive,
"count", packetsNotFound,
"highestSN", ehsn,
)
}
return
}
func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 {
// Do not update jitter on multiple packets of same frame.
// All packets of a frame have the same time stamp.
@@ -875,7 +769,8 @@ func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 {
r.maxJitter = r.jitter
}
for _, s := range r.snapshots {
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
s := &r.snapshots[i]
if r.jitter > s.maxJitter {
s.maxJitter = r.jitter
}
@@ -893,32 +788,17 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64,
return nil, nil
}
then := r.snapshots[snapshotID]
if then == nil {
then = &snapshot{
startTime: r.startTime,
extStartSN: extStartSN,
}
r.snapshots[snapshotID] = then
idx := snapshotID - cFirstSnapshotID
then := r.snapshots[idx]
if !then.isValid {
then = r.initSnapshot(r.startTime, extStartSN)
r.snapshots[idx] = then
}
// snapshot now
r.snapshots[snapshotID] = &snapshot{
startTime: time.Now(),
extStartSN: extHighestSN + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxJitter: r.jitter,
maxRtt: r.rtt,
}
// make a copy so that it can be used independently
now := *r.snapshots[snapshotID]
return then, &now
now := r.getSnapshot(time.Now(), extHighestSN+1)
r.snapshots[idx] = now
return &then, &now
}
func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, reportDrift *livekit.RTPDrift) {
@@ -975,6 +855,38 @@ func (r *rtpStatsBase) updateGapHistogram(gap int) {
}
}
func (r *rtpStatsBase) initSnapshot(startTime time.Time, extStartSN uint64) snapshot {
return snapshot{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
}
}
func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snapshot {
return snapshot{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
bytes: r.bytes,
headerBytes: r.headerBytes,
packetsPadding: r.packetsPadding,
bytesPadding: r.bytesPadding,
headerBytesPadding: r.headerBytesPadding,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
packetsLost: r.packetsLost,
packetsOutOfOrder: r.packetsOutOfOrder,
frames: r.frames,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxRtt: r.rtt,
maxJitter: r.jitter,
}
}
// ----------------------------------
func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
+34 -19
View File
@@ -22,6 +22,11 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/livekit"
protoutils "github.com/livekit/protocol/utils"
)
const (
cHistorySize = 4096
)
type RTPFlowState struct {
@@ -47,6 +52,8 @@ type RTPStatsReceiver struct {
sequenceNumber *utils.WrapAround[uint16, uint64]
timestamp *utils.WrapAround[uint32, uint64]
history *protoutils.Bitmap[uint64]
}
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
@@ -54,6 +61,7 @@ func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](),
timestamp: utils.NewWrapAround[uint32, uint64](),
history: protoutils.NewBitmap[uint64](cHistorySize),
}
}
@@ -61,7 +69,7 @@ func (r *RTPStatsReceiver) NewSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
return r.newSnapshotID(r.sequenceNumber.GetExtendedStart())
return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest())
}
func (r *RTPStatsReceiver) Update(
@@ -106,11 +114,8 @@ func (r *RTPStatsReceiver) Update(
resTS = r.timestamp.Update(timestamp)
// initialize snapshots if any
for i := uint32(cFirstSnapshotID); i < r.nextSnapshotID; i++ {
r.snapshots[i] = &snapshot{
startTime: r.startTime,
extStartSN: r.sequenceNumber.GetExtendedStart(),
}
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
r.snapshots[i] = r.initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart())
}
r.logger.Debugw(
@@ -149,7 +154,8 @@ func (r *RTPStatsReceiver) Update(
r.packetsLost += resSN.PreExtendedStart - resSN.ExtendedVal
extStartSN := r.sequenceNumber.GetExtendedStart()
for _, s := range r.snapshots {
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
s := &r.snapshots[i]
if s.extStartSN == resSN.PreExtendedStart {
s.extStartSN = extStartSN
}
@@ -170,14 +176,16 @@ func (r *RTPStatsReceiver) Update(
)
}
if !r.isSnInfoLost(resSN.ExtendedVal, resSN.PreExtendedHighest) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += uint64(hdrSize)
r.packetsDuplicate++
flowState.IsDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, true)
if r.isInRange(resSN.ExtendedVal, resSN.PreExtendedHighest) {
if r.history.IsSet(resSN.ExtendedVal) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += uint64(hdrSize)
r.packetsDuplicate++
flowState.IsDuplicate = true
} else {
r.packetsLost--
r.history.Set(resSN.ExtendedVal)
}
}
flowState.IsOutOfOrder = true
@@ -188,10 +196,10 @@ func (r *RTPStatsReceiver) Update(
r.updateGapHistogram(int(gapSN))
// update missing sequence numbers
r.clearSnInfos(resSN.PreExtendedHighest+1, resSN.ExtendedVal)
r.history.ClearRange(resSN.PreExtendedHighest+1, resSN.ExtendedVal-1)
r.packetsLost += uint64(gapSN - 1)
r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, false)
r.history.Set(resSN.ExtendedVal)
if timestamp != uint32(resTS.PreExtendedHighest) {
// update only on first packet as same timestamp could be in multiple packets.
@@ -409,8 +417,10 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin
return nil
}
intervalStats := r.getIntervalStats(then.extStartSN, now.extStartSN, extHighestSN)
packetsLost := intervalStats.packetsLost
packetsLost := uint32(now.packetsLost - then.packetsLost)
if int32(packetsLost) < 0 {
packetsLost = 0
}
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
if proxyFracLost > fracLost {
@@ -468,4 +478,9 @@ func (r *RTPStatsReceiver) ToProto() *livekit.RTPStats {
)
}
func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool {
diff := int64(ehsn - esn)
return diff >= 0 && diff < cHistorySize
}
// ----------------------------------
+8 -57
View File
@@ -205,14 +205,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
require.Equal(t, uint64(3), r.packetsOutOfOrder)
require.Equal(t, uint64(1), r.packetsDuplicate)
require.Equal(t, uint64(16), r.packetsLost)
intervalStats := r.getIntervalStats(
r.sequenceNumber.GetExtendedStart(),
r.sequenceNumber.GetExtendedHighest()+1,
r.sequenceNumber.GetExtendedHighest(),
)
require.Equal(t, uint64(16), intervalStats.packetsLost)
// test sequence number cache
// test sequence number history
// with a gap
sequenceNumber += 2
timestamp += 6000
@@ -230,14 +224,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive)
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
require.Equal(t, uint64(17), r.packetsLost)
expectedSnInfo := snInfo{
hdrSize: 12,
pktSize: 1012,
isPaddingOnly: false,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&cSnInfoMask])
require.False(t, r.history.IsSet(uint64(sequenceNumber)-1))
// out-of-order
sequenceNumber--
@@ -254,23 +241,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
)
require.False(t, flowState.HasLoss)
require.Equal(t, uint64(16), r.packetsLost)
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1011,
isPaddingOnly: false,
marker: false,
isOutOfOrder: true,
}
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&cSnInfoMask])
// check that last one is still fine
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1012,
isPaddingOnly: false,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber+1)&cSnInfoMask])
require.Equal(t, uint64(4), r.packetsOutOfOrder)
require.True(t, r.history.IsSet(uint64(sequenceNumber)))
// padding only
sequenceNumber += 2
@@ -286,31 +258,10 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
)
require.False(t, flowState.HasLoss)
require.Equal(t, uint64(16), r.packetsLost)
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 37,
isPaddingOnly: true,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&cSnInfoMask])
// check that last two are still fine
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1011,
isPaddingOnly: false,
marker: false,
isOutOfOrder: true,
}
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-2)&cSnInfoMask])
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1012,
isPaddingOnly: false,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-1)&cSnInfoMask])
require.Equal(t, uint64(4), r.packetsOutOfOrder)
require.True(t, r.history.IsSet(uint64(sequenceNumber)))
require.True(t, r.history.IsSet(uint64(sequenceNumber)-1))
require.True(t, r.history.IsSet(uint64(sequenceNumber)-2))
r.Stop()
}
+345 -129
View File
@@ -25,11 +25,91 @@ import (
"github.com/livekit/protocol/livekit"
)
const (
cSnInfoSize = 4096
cSnInfoMask = cSnInfoSize - 1
)
type snInfoFlag byte
const (
snInfoFlagMarker snInfoFlag = 1 << iota
snInfoFlagPadding
snInfoFlagOutOfOrder
)
type snInfo struct {
pktSize uint16
hdrSize uint8
flags snInfoFlag
}
// -------------------------------------------------------------------
type intervalStats struct {
packets uint64
bytes uint64
headerBytes uint64
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
packetsLost uint64
packetsOutOfOrder uint64
frames uint32
}
func (is *intervalStats) aggregate(other *intervalStats) {
if is == nil || other == nil {
return
}
is.packets += other.packets
is.bytes += other.bytes
is.headerBytes += other.headerBytes
is.packetsPadding += other.packetsPadding
is.bytesPadding += other.bytesPadding
is.headerBytesPadding += other.headerBytesPadding
is.packetsLost += other.packetsLost
is.packetsOutOfOrder += other.packetsOutOfOrder
is.frames += other.frames
}
// -------------------------------------------------------------------
type senderSnapshot struct {
snapshot
extStartSNFromRR uint64
packetsLostFromRR uint64
maxJitterFromRR float64
isValid bool
startTime time.Time
extStartSN uint64
bytes uint64
headerBytes uint64
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
packetsDuplicate uint64
bytesDuplicate uint64
headerBytesDuplicate uint64
packetsOutOfOrder uint64
packetsLostFeed uint64
packetsLost uint64
frames uint32
nacks uint32
plis uint32
firs uint32
maxRtt uint32
maxJitterFeed float64
maxJitter float64
extLastRRSN uint64
intervalStats intervalStats
}
type RTPStatsSender struct {
@@ -50,15 +130,17 @@ type RTPStatsSender struct {
jitterFromRR float64
maxJitterFromRR float64
snInfos [cSnInfoSize]snInfo
nextSenderSnapshotID uint32
senderSnapshots map[uint32]*senderSnapshot
senderSnapshots []senderSnapshot
}
func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender {
return &RTPStatsSender{
rtpStatsBase: newRTPStatsBase(params),
nextSenderSnapshotID: cFirstSnapshotID,
senderSnapshots: make(map[uint32]*senderSnapshot),
senderSnapshots: make([]senderSnapshot, 2),
}
}
@@ -85,18 +167,18 @@ func (r *RTPStatsSender) Seed(from *RTPStatsSender) {
r.jitterFromRR = from.jitterFromRR
r.maxJitterFromRR = from.maxJitterFromRR
r.snInfos = from.snInfos
r.nextSenderSnapshotID = from.nextSenderSnapshotID
for id, ss := range from.senderSnapshots {
ssCopy := *ss
r.senderSnapshots[id] = &ssCopy
}
r.senderSnapshots = make([]senderSnapshot, cap(from.senderSnapshots))
copy(r.senderSnapshots, from.senderSnapshots)
}
func (r *RTPStatsSender) NewSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
return r.newSnapshotID(r.extStartSN)
return r.newSnapshotID(r.extHighestSN)
}
func (r *RTPStatsSender) NewSenderSnapshotId() uint32 {
@@ -104,14 +186,16 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 {
defer r.lock.Unlock()
id := r.nextSenderSnapshotID
r.nextSenderSnapshotID++
if cap(r.senderSnapshots) < int(r.nextSenderSnapshotID-cFirstSnapshotID) {
senderSnapshots := make([]senderSnapshot, r.nextSenderSnapshotID-cFirstSnapshotID)
copy(senderSnapshots, r.senderSnapshots)
r.senderSnapshots = senderSnapshots
}
if r.initialized {
r.senderSnapshots[id] = &senderSnapshot{
snapshot: snapshot{
startTime: time.Now(),
extStartSN: r.extStartSN,
},
extStartSNFromRR: r.extStartSN,
}
r.senderSnapshots[id-cFirstSnapshotID] = r.initSenderSnapshot(time.Now(), r.extHighestSN)
}
return id
}
@@ -152,20 +236,11 @@ func (r *RTPStatsSender) Update(
r.extHighestTS = extTimestamp
// initialize snapshots if any
for i := uint32(cFirstSnapshotID); i < r.nextSnapshotID; i++ {
r.snapshots[i] = &snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
}
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
r.snapshots[i] = r.initSnapshot(r.startTime, r.extStartSN)
}
for i := uint32(cFirstSnapshotID); i < r.nextSenderSnapshotID; i++ {
r.senderSnapshots[i] = &senderSnapshot{
snapshot: snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
},
extStartSNFromRR: r.extStartSN,
}
for i := uint32(0); i < r.nextSenderSnapshotID-cFirstSnapshotID; i++ {
r.senderSnapshots[i] = r.initSenderSnapshot(r.startTime, r.extStartSN)
}
r.logger.Debugw(
@@ -190,14 +265,19 @@ func (r *RTPStatsSender) Update(
r.packetsLost += r.extStartSN - extSequenceNumber
// adjust start of snapshots
for _, s := range r.snapshots {
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
s := &r.snapshots[i]
if s.extStartSN == r.extStartSN {
s.extStartSN = extSequenceNumber
}
}
for _, s := range r.senderSnapshots {
for i := uint32(0); i < r.nextSenderSnapshotID-cFirstSnapshotID; i++ {
s := &r.senderSnapshots[i]
if s.extStartSN == r.extStartSN {
s.extStartSN = extSequenceNumber
if s.extLastRRSN == (r.extStartSN - 1) {
s.extLastRRSN = extSequenceNumber - 1
}
}
}
@@ -219,7 +299,7 @@ func (r *RTPStatsSender) Update(
isDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, true)
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, true)
}
} else { // in-order
// update gap histogram
@@ -229,7 +309,7 @@ func (r *RTPStatsSender) Update(
r.clearSnInfos(r.extHighestSN+1, extSequenceNumber)
r.packetsLost += uint64(gapSN - 1)
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, false)
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, false)
if extTimestamp != r.extHighestTS {
// update only on first packet as same timestamp could be in multiple packets.
@@ -254,9 +334,10 @@ func (r *RTPStatsSender) Update(
}
jitter := r.updateJitter(extTimestamp, packetTime)
for _, s := range r.senderSnapshots {
if jitter > s.maxJitter {
s.maxJitter = jitter
for i := uint32(0); i < r.nextSenderSnapshotID-cFirstSnapshotID; i++ {
s := &r.senderSnapshots[i]
if jitter > s.maxJitterFeed {
s.maxJitterFeed = jitter
}
}
}
@@ -302,46 +383,7 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
}
}
if r.lastRRTime.IsZero() || r.extHighestSNFromRR <= extHighestSNFromRR {
r.extHighestSNFromRR = extHighestSNFromRR
packetsLostFromRR := r.packetsLostFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost)
if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost {
packetsLostFromRR += (1 << 32)
}
r.packetsLostFromRR = packetsLostFromRR
if isRttChanged {
r.rtt = rtt
if rtt > r.maxRtt {
r.maxRtt = rtt
}
}
r.jitterFromRR = float64(rr.Jitter)
if r.jitterFromRR > r.maxJitterFromRR {
r.maxJitterFromRR = r.jitterFromRR
}
// update snapshots
for _, s := range r.snapshots {
if isRttChanged && rtt > s.maxRtt {
s.maxRtt = rtt
}
}
for _, s := range r.senderSnapshots {
if isRttChanged && rtt > s.maxRtt {
s.maxRtt = rtt
}
if r.jitterFromRR > s.maxJitterFromRR {
s.maxJitterFromRR = r.jitterFromRR
}
}
r.lastRRTime = time.Now()
r.lastRR = rr
} else {
if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR {
r.logger.Debugw(
fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR),
"lastRRTime", r.lastRRTime,
@@ -349,7 +391,57 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
"sinceLastRR", time.Since(r.lastRRTime),
"receivedRR", rr,
)
return
}
r.extHighestSNFromRR = extHighestSNFromRR
packetsLostFromRR := r.packetsLostFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost)
if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost {
packetsLostFromRR += (1 << 32)
}
r.packetsLostFromRR = packetsLostFromRR
if isRttChanged {
r.rtt = rtt
if rtt > r.maxRtt {
r.maxRtt = rtt
}
}
r.jitterFromRR = float64(rr.Jitter)
if r.jitterFromRR > r.maxJitterFromRR {
r.maxJitterFromRR = r.jitterFromRR
}
// update snapshots
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
s := &r.snapshots[i]
if isRttChanged && rtt > s.maxRtt {
s.maxRtt = rtt
}
}
extLastRRSN := r.extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000)
for i := uint32(0); i < r.nextSenderSnapshotID-cFirstSnapshotID; i++ {
s := &r.senderSnapshots[i]
if isRttChanged && rtt > s.maxRtt {
s.maxRtt = rtt
}
if r.jitterFromRR > s.maxJitter {
s.maxJitter = r.jitterFromRR
}
// on every RR, calculate delta since last RR using packet metadata cache
is := r.getIntervalStats(s.extLastRRSN+1, extLastRRSN+1, r.extHighestSN)
eis := &s.intervalStats
eis.aggregate(&is)
s.extLastRRSN = extLastRRSN
}
r.lastRRTime = time.Now()
r.lastRR = rr
return
}
@@ -487,11 +579,11 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
startTime := then.startTime
endTime := now.startTime
packetsExpected := now.extStartSNFromRR - then.extStartSNFromRR
packetsExpected := uint32(now.extStartSN - then.extStartSN)
if packetsExpected > cNumSequenceNumbers {
r.logger.Warnw(
"too many packets expected in delta (sender)",
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSNFromRR, now.extStartSNFromRR, packetsExpected),
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSN, now.extStartSN, packetsExpected),
)
return nil
}
@@ -500,29 +592,31 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
return nil
}
intervalStats := r.getIntervalStats(then.extStartSNFromRR, now.extStartSNFromRR, r.extHighestSN)
packetsLost := now.packetsLostFromRR - then.packetsLostFromRR
packetsLost := uint32(now.packetsLost - then.packetsLost)
if int32(packetsLost) < 0 {
packetsLost = 0
}
packetsLostFeed := uint32(now.packetsLostFeed - then.packetsLostFeed)
if int32(packetsLostFeed) < 0 {
packetsLostFeed = 0
}
if packetsLost > packetsExpected {
r.logger.Warnw(
"unexpected number of packets lost",
fmt.Errorf(
"start: %d, end: %d, expected: %d, lost: report: %d, interval: %d",
then.extStartSNFromRR,
now.extStartSNFromRR,
"start: %d, end: %d, expected: %d, lost: report: %d, feed: %d",
then.extStartSN,
now.extStartSN,
packetsExpected,
now.packetsLostFromRR-then.packetsLostFromRR,
intervalStats.packetsLost,
packetsLost,
packetsLostFeed,
),
)
packetsLost = packetsExpected
}
// discount jitter from publisher side + internal processing
maxJitter := then.maxJitterFromRR - then.maxJitter
maxJitter := then.maxJitter - then.maxJitterFeed
if maxJitter < 0.0 {
maxJitter = 0.0
}
@@ -531,19 +625,19 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
Packets: uint32(packetsExpected - intervalStats.packetsPadding),
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
Packets: packetsExpected - uint32(now.packetsPadding-then.packetsPadding),
Bytes: now.bytes - then.bytes,
HeaderBytes: now.headerBytes - then.headerBytes,
PacketsDuplicate: uint32(now.packetsDuplicate - then.packetsDuplicate),
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate,
PacketsPadding: uint32(intervalStats.packetsPadding),
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: uint32(packetsLost),
PacketsMissing: uint32(intervalStats.packetsLost),
PacketsOutOfOrder: uint32(intervalStats.packetsOutOfOrder),
Frames: intervalStats.frames,
PacketsPadding: uint32(now.packetsPadding - then.packetsPadding),
BytesPadding: now.bytesPadding - then.bytesPadding,
HeaderBytesPadding: now.headerBytesPadding - then.headerBytesPadding,
PacketsLost: packetsLost,
PacketsMissing: packetsLostFeed,
PacketsOutOfOrder: uint32(now.packetsOutOfOrder - then.packetsOutOfOrder),
Frames: now.frames - then.frames,
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
Nacks: now.nacks - then.nacks,
@@ -579,40 +673,162 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se
return nil, nil
}
then := r.senderSnapshots[senderSnapshotID]
if then == nil {
then = &senderSnapshot{
snapshot: snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
},
extStartSNFromRR: r.extStartSN,
}
r.senderSnapshots[senderSnapshotID] = then
idx := senderSnapshotID - cFirstSnapshotID
then := r.senderSnapshots[idx]
if !then.isValid {
then = r.initSenderSnapshot(r.startTime, r.extStartSN)
r.senderSnapshots[idx] = then
}
// snapshot now
r.senderSnapshots[senderSnapshotID] = &senderSnapshot{
snapshot: snapshot{
startTime: r.lastRRTime,
extStartSN: r.extHighestSN + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxJitter: r.jitter,
maxRtt: r.rtt,
},
extStartSNFromRR: r.extHighestSNFromRR + (r.extStartSN & 0xFFFF_FFFF_FFFF_0000) + 1,
packetsLostFromRR: r.packetsLostFromRR,
maxJitterFromRR: r.jitterFromRR,
}
// make a copy so that it can be used independently
now := *r.senderSnapshots[senderSnapshotID]
now := r.getSenderSnapshot(r.lastRRTime, &then)
r.senderSnapshots[idx] = now
return &then, &now
}
return then, &now
func (r *RTPStatsSender) initSenderSnapshot(startTime time.Time, extStartSN uint64) senderSnapshot {
return senderSnapshot{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
extLastRRSN: extStartSN - 1,
}
}
func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapshot) senderSnapshot {
if s == nil {
return senderSnapshot{}
}
return senderSnapshot{
isValid: true,
startTime: startTime,
extStartSN: s.extLastRRSN + 1,
bytes: s.bytes + s.intervalStats.bytes,
headerBytes: s.headerBytes + s.intervalStats.headerBytes,
packetsPadding: s.packetsPadding + s.intervalStats.packetsPadding,
bytesPadding: s.bytesPadding + s.intervalStats.bytesPadding,
headerBytesPadding: s.headerBytesPadding + s.intervalStats.headerBytesPadding,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
packetsLostFeed: r.packetsLost,
packetsOutOfOrder: s.packetsOutOfOrder + s.intervalStats.packetsOutOfOrder,
frames: s.frames + s.intervalStats.frames,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxRtt: r.rtt,
maxJitterFeed: r.jitter,
maxJitter: r.jitterFromRR,
extLastRRSN: s.extLastRRSN,
}
}
func (r *RTPStatsSender) getSnInfoOutOfOrderSlot(esn uint64, ehsn uint64) int {
offset := int64(ehsn - esn)
if offset >= cSnInfoSize || offset < 0 {
// too old OR too new (i. e. ahead of highest)
return -1
}
return int(esn & cSnInfoMask)
}
func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize uint8, payloadSize uint16, marker bool, isOutOfOrder bool) {
var slot int
if int64(esn-ehsn) < 0 {
slot = r.getSnInfoOutOfOrderSlot(esn, ehsn)
if slot < 0 {
return
}
} else {
slot = int(esn & cSnInfoMask)
}
snInfo := &r.snInfos[slot]
snInfo.pktSize = pktSize
snInfo.hdrSize = hdrSize
if marker {
snInfo.flags |= snInfoFlagMarker
}
if payloadSize == 0 {
snInfo.flags |= snInfoFlagPadding
}
if isOutOfOrder {
snInfo.flags |= snInfoFlagOutOfOrder
}
}
func (r *RTPStatsSender) clearSnInfos(extStartInclusive uint64, extEndExclusive uint64) {
if extEndExclusive <= extStartInclusive {
return
}
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
snInfo := &r.snInfos[esn&cSnInfoMask]
snInfo.pktSize = 0
snInfo.hdrSize = 0
snInfo.flags = 0
}
}
func (r *RTPStatsSender) isSnInfoLost(esn uint64, ehsn uint64) bool {
slot := r.getSnInfoOutOfOrderSlot(esn, ehsn)
if slot < 0 {
return false
}
return r.snInfos[slot].pktSize == 0
}
func (r *RTPStatsSender) getIntervalStats(extStartInclusive uint64, extEndExclusive uint64, ehsn uint64) (intervalStats intervalStats) {
packetsNotFound := uint32(0)
processESN := func(esn uint64, ehsn uint64) {
slot := r.getSnInfoOutOfOrderSlot(esn, ehsn)
if slot < 0 {
packetsNotFound++
return
}
snInfo := &r.snInfos[slot]
switch {
case snInfo.pktSize == 0:
intervalStats.packetsLost++
case snInfo.flags&snInfoFlagPadding != 0:
intervalStats.packetsPadding++
intervalStats.bytesPadding += uint64(snInfo.pktSize)
intervalStats.headerBytesPadding += uint64(snInfo.hdrSize)
default:
intervalStats.packets++
intervalStats.bytes += uint64(snInfo.pktSize)
intervalStats.headerBytes += uint64(snInfo.hdrSize)
if (snInfo.flags & snInfoFlagOutOfOrder) != 0 {
intervalStats.packetsOutOfOrder++
}
}
if (snInfo.flags & snInfoFlagMarker) != 0 {
intervalStats.frames++
}
}
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
processESN(esn, ehsn)
}
if packetsNotFound != 0 {
r.logger.Errorw(
"could not find some packets", nil,
"start", extStartInclusive,
"end", extEndExclusive,
"count", packetsNotFound,
"highestSN", ehsn,
)
}
return
}
// -------------------------------------------------------------------
+6 -21
View File
@@ -121,7 +121,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.extLastSN = extPkt.ExtSequenceNumber
r.extSecondLastSN = r.extLastSN - 1
r.updateSnOffset("init")
r.updateSnOffset()
r.extLastTS = extPkt.ExtTimestamp
}
@@ -130,7 +130,7 @@ func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64,
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust)
r.updateSnOffset("switch")
r.updateSnOffset()
r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust
}
@@ -156,7 +156,7 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
}
r.extLastSN = r.extSecondLastSN
r.updateSnOffset("drop")
r.updateSnOffset()
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
@@ -197,15 +197,6 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
if diff < 0 {
// out-of-order, look up sequence number offset cache
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
r.logger.Debugw(
"out-of-order packet",
"extHighestIncomingSN", r.extHighestIncomingSN,
"extLastSN", r.extLastSN,
"extSequenceNumber", extPkt.ExtSequenceNumber,
"snOffset", snOffset,
"error", err,
"outgoingSN", extPkt.ExtSequenceNumber-snOffset,
)
if err != nil {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
@@ -227,7 +218,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
r.logger.Errorw("could not exclude range", err, "sn", r.extHighestIncomingSN)
}
r.updateSnOffset("pad-drop")
r.updateSnOffset()
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
@@ -298,7 +289,7 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
r.extSecondLastSN = extLastSN - 1
r.extLastSN = extLastSN
r.snRangeMap.DecValue(r.extHighestIncomingSN, uint64(num))
r.updateSnOffset("pad")
r.updateSnOffset()
r.tsOffset -= extLastTS - r.extLastTS
r.extLastTS = extLastTS
@@ -314,16 +305,10 @@ func (r *RTPMunger) IsOnFrameBoundary() bool {
return r.lastMarker
}
func (r *RTPMunger) updateSnOffset(cause string) {
func (r *RTPMunger) updateSnOffset() {
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN + 1)
if err != nil {
r.logger.Errorw("could not get sequence number offset", err)
}
r.snOffset = snOffset
r.logger.Debugw(
"updating sequence number offset",
"cause", cause,
"extHighestIncomingSN", r.extHighestIncomingSN,
"snOffset", r.snOffset,
)
}
+2 -3
View File
@@ -144,8 +144,8 @@ func (s *sequencer) push(
s.extHighestSN = extModifiedSN
} else {
if diff < -int64(s.size) {
s.logger.Debugw(
"old packet, cannot be sequenced",
s.logger.Warnw(
"old packet, cannot be sequenced", nil,
"extHighestSN", s.extHighestSN,
"extIncomingSN", extIncomingSN,
"extModifiedSN", extModifiedSN,
@@ -189,7 +189,6 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui
s.Lock()
defer s.Unlock()
s.logger.Debugw("sequencer padding", "extHighestSN", s.extHighestSN, "startSN", extStartSNInclusive, "endSN", extEndSNInclusive)
if s.snRangeMap == nil {
return
}