Add reconnect reason and signal rtt calculation (#1381)

* Add connect reason and signal rtt calculate

* Update protocol

* solve comment
This commit is contained in:
cnderrauber
2023-02-06 11:12:25 +08:00
committed by GitHub
parent d67cdb6141
commit 8b6dab780c
16 changed files with 221 additions and 118 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a
github.com/livekit/protocol v1.3.3-0.20230202034647-c71216774a62
github.com/livekit/protocol v1.3.3-0.20230206022348-f6d32e15b011
github.com/livekit/psrpc v0.2.5
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3
+2 -2
View File
@@ -234,8 +234,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a h1:5UkGQpskXp7HcBmyrCwWtO7ygDWbqtjN09Yva4l/nyE=
github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
github.com/livekit/protocol v1.3.3-0.20230202034647-c71216774a62 h1:wLkf7jiWtA0q+3y192KEkWIdKUrh+cXz/pBwVMZBwq4=
github.com/livekit/protocol v1.3.3-0.20230202034647-c71216774a62/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/protocol v1.3.3-0.20230206022348-f6d32e15b011 h1:KUuxp1D8ok4tgQ5gzxfUSj0o0GFMy1zwveru1r5kk9w=
github.com/livekit/protocol v1.3.3-0.20230206022348-f6d32e15b011/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/psrpc v0.2.5 h1:+EZS78MGdBZxzCUwinDQ6pOeqPDURisrGtfyyqwUDSI=
github.com/livekit/psrpc v0.2.5/go.mod h1:DyphtRRWvcIuCaldYg9VGpwGhu/HiKmNcysgpN6xKrM=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=
+28 -25
View File
@@ -31,15 +31,16 @@ type MessageSource interface {
}
type ParticipantInit struct {
Identity livekit.ParticipantIdentity
Name livekit.ParticipantName
Reconnect bool
AutoSubscribe bool
Client *livekit.ClientInfo
Grants *auth.ClaimGrants
Region string
AdaptiveStream bool
ID livekit.ParticipantID
Identity livekit.ParticipantIdentity
Name livekit.ParticipantName
Reconnect bool
ReconnectReason livekit.ReconnectReason
AutoSubscribe bool
Client *livekit.ClientInfo
Grants *auth.ClaimGrants
Region string
AdaptiveStream bool
ID livekit.ParticipantID
}
type NewParticipantCallback func(
@@ -116,13 +117,14 @@ func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionI
Identity: string(pi.Identity),
Name: string(pi.Name),
// connection id is to allow the RTC node to identify where to route the message back to
ConnectionId: string(connectionID),
Reconnect: pi.Reconnect,
AutoSubscribe: pi.AutoSubscribe,
Client: pi.Client,
GrantsJson: string(claims),
AdaptiveStream: pi.AdaptiveStream,
ParticipantId: string(pi.ID),
ConnectionId: string(connectionID),
Reconnect: pi.Reconnect,
ReconnectReason: pi.ReconnectReason,
AutoSubscribe: pi.AutoSubscribe,
Client: pi.Client,
GrantsJson: string(claims),
AdaptiveStream: pi.AdaptiveStream,
ParticipantId: string(pi.ID),
}, nil
}
@@ -133,14 +135,15 @@ func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*
}
return &ParticipantInit{
Identity: livekit.ParticipantIdentity(ss.Identity),
Name: livekit.ParticipantName(ss.Name),
Reconnect: ss.Reconnect,
Client: ss.Client,
AutoSubscribe: ss.AutoSubscribe,
Grants: claims,
Region: region,
AdaptiveStream: ss.AdaptiveStream,
ID: livekit.ParticipantID(ss.ParticipantId),
Identity: livekit.ParticipantIdentity(ss.Identity),
Name: livekit.ParticipantName(ss.Name),
Reconnect: ss.Reconnect,
ReconnectReason: ss.ReconnectReason,
Client: ss.Client,
AutoSubscribe: ss.AutoSubscribe,
Grants: claims,
Region: region,
AdaptiveStream: ss.AdaptiveStream,
ID: livekit.ParticipantID(ss.ParticipantId),
}, nil
}
+1 -1
View File
@@ -158,7 +158,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
})
downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) {
go sub.UpdateRTT(rtt)
go sub.UpdateMediaRTT(rtt)
})
downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
+5 -5
View File
@@ -472,7 +472,7 @@ func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) {
* 2. client send answer
*/
signalConnCost := time.Since(p.ConnectedAt()).Milliseconds()
p.TransportManager.UpdateRTT(uint32(signalConnCost), false)
p.TransportManager.UpdateSignalingRTT(uint32(signalConnCost))
p.TransportManager.HandleAnswer(answer)
}
@@ -750,7 +750,7 @@ func (p *ParticipantImpl) MigrateState() types.MigrateState {
}
// ICERestart restarts subscriber ICE connections
func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig) {
func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig, reason livekit.ReconnectReason) {
p.clearDisconnectTimer()
p.clearMigrationTimer()
@@ -758,7 +758,7 @@ func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig) {
t.(types.LocalMediaTrack).Restart()
}
p.TransportManager.ICERestart(iceConfig)
p.TransportManager.ICERestart(iceConfig, reason == livekit.ReconnectReason_RR_PUBLISHER_FAILED || reason == livekit.ReconnectReason_RR_SUBSCRIBER_FAILED)
}
func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)) {
@@ -919,7 +919,7 @@ func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.Parti
}
}
func (p *ParticipantImpl) UpdateRTT(rtt uint32) {
func (p *ParticipantImpl) UpdateMediaRTT(rtt uint32) {
now := time.Now()
p.lock.Lock()
if now.Sub(p.rttUpdatedAt) < rttUpdateInterval || p.lastRTT == rtt {
@@ -929,7 +929,7 @@ func (p *ParticipantImpl) UpdateRTT(rtt uint32) {
p.rttUpdatedAt = now
p.lastRTT = rtt
p.lock.Unlock()
p.TransportManager.UpdateRTT(rtt, true)
p.TransportManager.UpdateMediaRTT(rtt)
for _, pt := range p.GetPublishedTracks() {
pt.(types.LocalMediaTrack).SetRTT(rtt)
+3 -3
View File
@@ -367,7 +367,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
return nil
}
func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing.MessageSink, iceServers []*livekit.ICEServer) error {
func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing.MessageSink, iceServers []*livekit.ICEServer, reason livekit.ReconnectReason) error {
// close previous sink, and link to new one
p.CloseSignalConnection()
p.SetResponseSink(responseSink)
@@ -384,7 +384,7 @@ func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing.
return err
}
p.ICERestart(nil)
p.ICERestart(nil, reason)
return nil
}
@@ -701,7 +701,7 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen
participant.ICERestart(&livekit.ICEConfig{
PreferenceSubscriber: livekit.ICECandidateType(scenario.SwitchCandidateProtocol),
PreferencePublisher: livekit.ICECandidateType(scenario.SwitchCandidateProtocol),
})
}, livekit.ReconnectReason_RR_SWITCH_CANDIDATE)
}
return nil
}
+5
View File
@@ -68,6 +68,11 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
pLogger.Warnw("could not simulate scenario", err,
"simulate", msg.Simulate)
}
case *livekit.SignalRequest_PingReq:
if msg.PingReq.Rtt > 0 {
participant.UpdateSignalingRTT(uint32(msg.PingReq.Rtt))
}
}
return nil
}
+39 -11
View File
@@ -155,10 +155,12 @@ type PCTransport struct {
lossyDCOpened bool
onDataPacket func(kind livekit.DataPacket_Kind, data []byte)
iceStartedAt time.Time
iceConnectedAt time.Time
connectedAt time.Time
connectAfterICETimer *time.Timer // timer to wait for pc to connect after ice connected
iceStartedAt time.Time
iceConnectedAt time.Time
firstConnectedAt time.Time
connectedAt time.Time
connectAfterICETimer *time.Timer // timer to wait for pc to connect after ice connected
resetShortConnOnICERestart atomic.Bool
onFullyEstablished func()
@@ -419,19 +421,19 @@ func (t *PCTransport) setICEConnectedAt(at time.Time) {
t.iceConnectedAt = at
// set failure timer for dtls handshake
iceCost := at.Sub(t.iceStartedAt)
connTimeoutAfterICE := 3 * iceCost
iceDuration := at.Sub(t.iceStartedAt)
connTimeoutAfterICE := 3 * iceDuration
if connTimeoutAfterICE < minConnectTimeoutAfterICE {
connTimeoutAfterICE = minConnectTimeoutAfterICE
} else if connTimeoutAfterICE > maxConnectTimeoutAfterICE {
connTimeoutAfterICE = maxConnectTimeoutAfterICE
}
t.params.Logger.Debugw("setting connection timer after ice connected", "timeout", connTimeoutAfterICE, "iceCost", iceCost)
t.params.Logger.Debugw("setting connection timer after ice connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration)
t.connectAfterICETimer = time.AfterFunc(connTimeoutAfterICE, func() {
state := t.pc.ConnectionState()
// if pc is still checking or connected but not fully established after timeout, then fire connection fail
if state != webrtc.PeerConnectionStateClosed && state != webrtc.PeerConnectionStateFailed && !t.isFullyEstablished() {
t.params.Logger.Infow("connect timeout after ICE connected", "timeout", connTimeoutAfterICE, "iceCost", iceCost)
t.params.Logger.Infow("connect timeout after ICE connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration)
t.handleConnectionFailed()
}
})
@@ -439,6 +441,19 @@ func (t *PCTransport) setICEConnectedAt(at time.Time) {
t.lock.Unlock()
}
func (t *PCTransport) resetShortConn() {
t.params.Logger.Infow("resetting short connection on ICE restart")
t.lock.Lock()
t.iceStartedAt = time.Time{}
t.iceConnectedAt = time.Time{}
t.connectedAt = time.Time{}
if t.connectAfterICETimer != nil {
t.connectAfterICETimer.Stop()
t.connectAfterICETimer = nil
}
t.lock.Unlock()
}
func (t *PCTransport) isShortConnection(at time.Time) (bool, time.Duration) {
t.lock.RLock()
defer t.lock.RUnlock()
@@ -478,12 +493,13 @@ func (t *PCTransport) logICECandidates() {
func (t *PCTransport) setConnectedAt(at time.Time) bool {
t.lock.Lock()
if !t.connectedAt.IsZero() {
t.connectedAt = at
if !t.firstConnectedAt.IsZero() {
t.lock.Unlock()
return false
}
t.connectedAt = at
t.firstConnectedAt = at
prometheus.ServiceOperationCounter.WithLabelValues("peer_connection", "success", "").Add(1)
t.lock.Unlock()
return true
@@ -781,7 +797,7 @@ func (t *PCTransport) HasEverConnected() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return !t.connectedAt.IsZero()
return !t.firstConnectedAt.IsZero()
}
func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error {
@@ -1002,6 +1018,10 @@ func (t *PCTransport) ICERestart() {
})
}
func (t *PCTransport) ResetShortConnOnICERestart() {
t.resetShortConnOnICERestart.Store(true)
}
func (t *PCTransport) OnStreamStateChange(f func(update *sfu.StreamStateUpdate) error) {
if t.streamAllocator == nil {
return
@@ -1724,6 +1744,10 @@ func (t *PCTransport) handleRemoteOfferReceived(sd *webrtc.SessionDescription) e
return nil
}
if offerRestartICE && t.resetShortConnOnICERestart.CompareAndSwap(true, false) {
t.resetShortConn()
}
if err := t.setRemoteDescription(*sd); err != nil {
return err
}
@@ -1766,6 +1790,10 @@ func (t *PCTransport) doICERestart() error {
return nil
}
if t.resetShortConnOnICERestart.CompareAndSwap(true, false) {
t.resetShortConn()
}
if t.negotiationState == NegotiationStateNone {
return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true})
}
+20 -16
View File
@@ -70,7 +70,7 @@ type TransportManager struct {
mediaLossProxy *MediaLossProxy
udpLossUnstableCount uint32
tcpRTT, udpRTT uint32
signalingRTT, udpRTT uint32
onPublisherInitialConnected func()
onSubscriberInitialConnected func()
@@ -439,11 +439,15 @@ func (t *TransportManager) NegotiateSubscriber(force bool) {
t.subscriber.Negotiate(force)
}
func (t *TransportManager) ICERestart(iceConfig *livekit.ICEConfig) {
func (t *TransportManager) ICERestart(iceConfig *livekit.ICEConfig, resetShortConnection bool) {
if iceConfig != nil {
t.SetICEConfig(iceConfig)
}
if resetShortConnection {
t.publisher.ResetShortConnOnICERestart()
t.subscriber.ResetShortConnOnICERestart()
}
t.subscriber.ICERestart()
}
@@ -662,11 +666,11 @@ func (t *TransportManager) onMediaLossUpdate(loss uint8) {
if loss >= uint8(255*udpLossFracUnstable/100) {
t.udpLossUnstableCount |= 1
if bits.OnesCount32(t.udpLossUnstableCount) >= udpLossUnstableCountThreshold {
if t.udpRTT > 0 && t.tcpRTT < uint32(float32(t.udpRTT)*1.3) && t.tcpRTT < tcpGoodRTT {
if t.udpRTT > 0 && t.signalingRTT < uint32(float32(t.udpRTT)*1.3) && t.signalingRTT < tcpGoodRTT && time.Since(t.lastSignalAt) < iceFailedTimeout {
t.udpLossUnstableCount = 0
t.lock.Unlock()
t.params.Logger.Infow("udp connection unstable, switch to tcp")
t.params.Logger.Infow("udp connection unstable, switch to tcp", "signalingRTT", t.signalingRTT)
t.handleConnectionFailed(true)
if t.onAnyTransportFailed != nil {
t.onAnyTransportFailed()
@@ -678,19 +682,19 @@ func (t *TransportManager) onMediaLossUpdate(loss uint8) {
t.lock.Unlock()
}
func (t *TransportManager) UpdateRTT(rtt uint32, isUDP bool) {
if isUDP {
if t.udpRTT == 0 {
t.udpRTT = rtt
} else {
t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2)
}
} else {
t.tcpRTT = rtt
func (t *TransportManager) UpdateSignalingRTT(rtt uint32) {
t.signalingRTT = rtt
// TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established
// within 5 * tcp rtt(at least 5s), means udp traffic might be block/dropped, switch to tcp.
// Currently, most cases reported is that ice connected but subsequent connection, so left the thinking for now.
// TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established
// within 5 * tcp rtt(at least 5s), means udp traffic might be block/dropped, switch to tcp.
// Currently, most cases reported is that ice connected but subsequent connection, so left the thinking for now.
}
func (t *TransportManager) UpdateMediaRTT(rtt uint32) {
if t.udpRTT == 0 {
t.udpRTT = rtt
} else {
t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2)
}
}
+3 -2
View File
@@ -262,7 +262,7 @@ type LocalParticipant interface {
HandleAnswer(sdp webrtc.SessionDescription)
Negotiate(force bool)
ICERestart(iceConfig *livekit.ICEConfig)
ICERestart(iceConfig *livekit.ICEConfig, reason livekit.ReconnectReason)
AddTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error)
RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error
@@ -317,7 +317,8 @@ type LocalParticipant interface {
MigrateState() MigrateState
SetMigrateInfo(previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo)
UpdateRTT(rtt uint32)
UpdateMediaRTT(rtt uint32)
UpdateSignalingRTT(rtt uint32)
CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, downTrackState sfu.DownTrackState)
UncacheDownTrack(rtpTransceiver *webrtc.RTPTransceiver)
@@ -301,10 +301,11 @@ type FakeLocalParticipant struct {
hiddenReturnsOnCall map[int]struct {
result1 bool
}
ICERestartStub func(*livekit.ICEConfig)
ICERestartStub func(*livekit.ICEConfig, livekit.ReconnectReason)
iCERestartMutex sync.RWMutex
iCERestartArgsForCall []struct {
arg1 *livekit.ICEConfig
arg2 livekit.ReconnectReason
}
IDStub func() livekit.ParticipantID
iDMutex sync.RWMutex
@@ -735,9 +736,14 @@ type FakeLocalParticipant struct {
updateMediaLossReturnsOnCall map[int]struct {
result1 error
}
UpdateRTTStub func(uint32)
updateRTTMutex sync.RWMutex
updateRTTArgsForCall []struct {
UpdateMediaRTTStub func(uint32)
updateMediaRTTMutex sync.RWMutex
updateMediaRTTArgsForCall []struct {
arg1 uint32
}
UpdateSignalingRTTStub func(uint32)
updateSignalingRTTMutex sync.RWMutex
updateSignalingRTTArgsForCall []struct {
arg1 uint32
}
UpdateSubscribedQualityStub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error
@@ -2281,16 +2287,17 @@ func (fake *FakeLocalParticipant) HiddenReturnsOnCall(i int, result1 bool) {
}{result1}
}
func (fake *FakeLocalParticipant) ICERestart(arg1 *livekit.ICEConfig) {
func (fake *FakeLocalParticipant) ICERestart(arg1 *livekit.ICEConfig, arg2 livekit.ReconnectReason) {
fake.iCERestartMutex.Lock()
fake.iCERestartArgsForCall = append(fake.iCERestartArgsForCall, struct {
arg1 *livekit.ICEConfig
}{arg1})
arg2 livekit.ReconnectReason
}{arg1, arg2})
stub := fake.ICERestartStub
fake.recordInvocation("ICERestart", []interface{}{arg1})
fake.recordInvocation("ICERestart", []interface{}{arg1, arg2})
fake.iCERestartMutex.Unlock()
if stub != nil {
fake.ICERestartStub(arg1)
fake.ICERestartStub(arg1, arg2)
}
}
@@ -2300,17 +2307,17 @@ func (fake *FakeLocalParticipant) ICERestartCallCount() int {
return len(fake.iCERestartArgsForCall)
}
func (fake *FakeLocalParticipant) ICERestartCalls(stub func(*livekit.ICEConfig)) {
func (fake *FakeLocalParticipant) ICERestartCalls(stub func(*livekit.ICEConfig, livekit.ReconnectReason)) {
fake.iCERestartMutex.Lock()
defer fake.iCERestartMutex.Unlock()
fake.ICERestartStub = stub
}
func (fake *FakeLocalParticipant) ICERestartArgsForCall(i int) *livekit.ICEConfig {
func (fake *FakeLocalParticipant) ICERestartArgsForCall(i int) (*livekit.ICEConfig, livekit.ReconnectReason) {
fake.iCERestartMutex.RLock()
defer fake.iCERestartMutex.RUnlock()
argsForCall := fake.iCERestartArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) ID() livekit.ParticipantID {
@@ -4708,35 +4715,67 @@ func (fake *FakeLocalParticipant) UpdateMediaLossReturnsOnCall(i int, result1 er
}{result1}
}
func (fake *FakeLocalParticipant) UpdateRTT(arg1 uint32) {
fake.updateRTTMutex.Lock()
fake.updateRTTArgsForCall = append(fake.updateRTTArgsForCall, struct {
func (fake *FakeLocalParticipant) UpdateMediaRTT(arg1 uint32) {
fake.updateMediaRTTMutex.Lock()
fake.updateMediaRTTArgsForCall = append(fake.updateMediaRTTArgsForCall, struct {
arg1 uint32
}{arg1})
stub := fake.UpdateRTTStub
fake.recordInvocation("UpdateRTT", []interface{}{arg1})
fake.updateRTTMutex.Unlock()
stub := fake.UpdateMediaRTTStub
fake.recordInvocation("UpdateMediaRTT", []interface{}{arg1})
fake.updateMediaRTTMutex.Unlock()
if stub != nil {
fake.UpdateRTTStub(arg1)
fake.UpdateMediaRTTStub(arg1)
}
}
func (fake *FakeLocalParticipant) UpdateRTTCallCount() int {
fake.updateRTTMutex.RLock()
defer fake.updateRTTMutex.RUnlock()
return len(fake.updateRTTArgsForCall)
func (fake *FakeLocalParticipant) UpdateMediaRTTCallCount() int {
fake.updateMediaRTTMutex.RLock()
defer fake.updateMediaRTTMutex.RUnlock()
return len(fake.updateMediaRTTArgsForCall)
}
func (fake *FakeLocalParticipant) UpdateRTTCalls(stub func(uint32)) {
fake.updateRTTMutex.Lock()
defer fake.updateRTTMutex.Unlock()
fake.UpdateRTTStub = stub
func (fake *FakeLocalParticipant) UpdateMediaRTTCalls(stub func(uint32)) {
fake.updateMediaRTTMutex.Lock()
defer fake.updateMediaRTTMutex.Unlock()
fake.UpdateMediaRTTStub = stub
}
func (fake *FakeLocalParticipant) UpdateRTTArgsForCall(i int) uint32 {
fake.updateRTTMutex.RLock()
defer fake.updateRTTMutex.RUnlock()
argsForCall := fake.updateRTTArgsForCall[i]
func (fake *FakeLocalParticipant) UpdateMediaRTTArgsForCall(i int) uint32 {
fake.updateMediaRTTMutex.RLock()
defer fake.updateMediaRTTMutex.RUnlock()
argsForCall := fake.updateMediaRTTArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeLocalParticipant) UpdateSignalingRTT(arg1 uint32) {
fake.updateSignalingRTTMutex.Lock()
fake.updateSignalingRTTArgsForCall = append(fake.updateSignalingRTTArgsForCall, struct {
arg1 uint32
}{arg1})
stub := fake.UpdateSignalingRTTStub
fake.recordInvocation("UpdateSignalingRTT", []interface{}{arg1})
fake.updateSignalingRTTMutex.Unlock()
if stub != nil {
fake.UpdateSignalingRTTStub(arg1)
}
}
func (fake *FakeLocalParticipant) UpdateSignalingRTTCallCount() int {
fake.updateSignalingRTTMutex.RLock()
defer fake.updateSignalingRTTMutex.RUnlock()
return len(fake.updateSignalingRTTArgsForCall)
}
func (fake *FakeLocalParticipant) UpdateSignalingRTTCalls(stub func(uint32)) {
fake.updateSignalingRTTMutex.Lock()
defer fake.updateSignalingRTTMutex.Unlock()
fake.UpdateSignalingRTTStub = stub
}
func (fake *FakeLocalParticipant) UpdateSignalingRTTArgsForCall(i int) uint32 {
fake.updateSignalingRTTMutex.RLock()
defer fake.updateSignalingRTTMutex.RUnlock()
argsForCall := fake.updateSignalingRTTArgsForCall[i]
return argsForCall.arg1
}
@@ -5229,8 +5268,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.updateLastSeenSignalMutex.RUnlock()
fake.updateMediaLossMutex.RLock()
defer fake.updateMediaLossMutex.RUnlock()
fake.updateRTTMutex.RLock()
defer fake.updateRTTMutex.RUnlock()
fake.updateMediaRTTMutex.RLock()
defer fake.updateMediaRTTMutex.RUnlock()
fake.updateSignalingRTTMutex.RLock()
defer fake.updateSignalingRTTMutex.RUnlock()
fake.updateSubscribedQualityMutex.RLock()
defer fake.updateSubscribedQualityMutex.RUnlock()
fake.updateSubscribedTrackSettingsMutex.RLock()
+3 -2
View File
@@ -237,6 +237,7 @@ func (r *RoomManager) StartSession(
"room", roomName,
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
"reason", pi.ReconnectReason,
)
iceConfig := r.getIceConfig(participant)
if iceConfig == nil {
@@ -244,11 +245,11 @@ func (r *RoomManager) StartSession(
}
if err = room.ResumeParticipant(participant, responseSink,
r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS),
); err != nil {
pi.ReconnectReason); err != nil {
logger.Warnw("could not resume participant", err, "participant", pi.Identity)
return err
}
r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id))
r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id), pi.ReconnectReason)
go r.rtcSessionWorker(room, participant, requestSource)
return nil
} else {
+24 -8
View File
@@ -98,6 +98,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic
roomName := livekit.RoomName(r.FormValue("room"))
reconnectParam := r.FormValue("reconnect")
reconnectReason, _ := strconv.Atoi(r.FormValue("reconnect_reason")) // 0 means unknown reason
autoSubParam := r.FormValue("auto_subscribe")
publishParam := r.FormValue("publish")
adaptiveStreamParam := r.FormValue("adaptive_stream")
@@ -139,13 +140,14 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic
}
pi = routing.ParticipantInit{
Reconnect: boolValue(reconnectParam),
Identity: livekit.ParticipantIdentity(claims.Identity),
Name: livekit.ParticipantName(claims.Name),
AutoSubscribe: true,
Client: s.ParseClientInfo(r),
Grants: claims,
Region: region,
Reconnect: boolValue(reconnectParam),
ReconnectReason: livekit.ReconnectReason(reconnectReason),
Identity: livekit.ParticipantIdentity(claims.Identity),
Name: livekit.ParticipantName(claims.Name),
AutoSubscribe: true,
Client: s.ParseClientInfo(r),
Grants: claims,
Region: region,
}
if pi.Reconnect {
pi.ID = livekit.ParticipantID(participantID)
@@ -320,7 +322,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if signalStats != nil {
signalStats.AddBytes(uint64(count), false)
}
if _, ok := req.Message.(*livekit.SignalRequest_Ping); ok {
switch m := req.Message.(type) {
case *livekit.SignalRequest_Ping:
count, perr := sigConn.WriteResponse(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Pong{
//
@@ -333,6 +337,18 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if perr == nil && signalStats != nil {
signalStats.AddBytes(uint64(count), true)
}
case *livekit.SignalRequest_PingReq:
count, perr := sigConn.WriteResponse(&livekit.SignalResponse{
Message: &livekit.SignalResponse_PongResp{
PongResp: &livekit.Pong{
LastPingTimestamp: m.PingReq.Timestamp,
Timestamp: time.Now().UnixMilli(),
},
},
})
if perr == nil && signalStats != nil {
signalStats.AddBytes(uint64(count), true)
}
}
switch m := req.Message.(type) {
+3 -1
View File
@@ -130,11 +130,13 @@ func (t *telemetryService) ParticipantResumed(
room *livekit.Room,
participant *livekit.ParticipantInfo,
nodeID livekit.NodeID,
reason livekit.ReconnectReason,
) {
t.enqueue(func() {
ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_RESUMED, room, participant)
ev.ClientMeta = &livekit.AnalyticsClientMeta{
Node: string(nodeID),
Node: string(nodeID),
ReconnectReason: reason,
}
t.SendEvent(ctx, ev)
})
@@ -58,13 +58,14 @@ type FakeTelemetryService struct {
arg3 *livekit.ParticipantInfo
arg4 bool
}
ParticipantResumedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID)
ParticipantResumedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID, livekit.ReconnectReason)
participantResumedMutex sync.RWMutex
participantResumedArgsForCall []struct {
arg1 context.Context
arg2 *livekit.Room
arg3 *livekit.ParticipantInfo
arg4 livekit.NodeID
arg5 livekit.ReconnectReason
}
RoomEndedStub func(context.Context, *livekit.Room)
roomEndedMutex sync.RWMutex
@@ -419,19 +420,20 @@ func (fake *FakeTelemetryService) ParticipantLeftArgsForCall(i int) (context.Con
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeTelemetryService) ParticipantResumed(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 livekit.NodeID) {
func (fake *FakeTelemetryService) ParticipantResumed(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 livekit.NodeID, arg5 livekit.ReconnectReason) {
fake.participantResumedMutex.Lock()
fake.participantResumedArgsForCall = append(fake.participantResumedArgsForCall, struct {
arg1 context.Context
arg2 *livekit.Room
arg3 *livekit.ParticipantInfo
arg4 livekit.NodeID
}{arg1, arg2, arg3, arg4})
arg5 livekit.ReconnectReason
}{arg1, arg2, arg3, arg4, arg5})
stub := fake.ParticipantResumedStub
fake.recordInvocation("ParticipantResumed", []interface{}{arg1, arg2, arg3, arg4})
fake.recordInvocation("ParticipantResumed", []interface{}{arg1, arg2, arg3, arg4, arg5})
fake.participantResumedMutex.Unlock()
if stub != nil {
fake.ParticipantResumedStub(arg1, arg2, arg3, arg4)
fake.ParticipantResumedStub(arg1, arg2, arg3, arg4, arg5)
}
}
@@ -441,17 +443,17 @@ func (fake *FakeTelemetryService) ParticipantResumedCallCount() int {
return len(fake.participantResumedArgsForCall)
}
func (fake *FakeTelemetryService) ParticipantResumedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID)) {
func (fake *FakeTelemetryService) ParticipantResumedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID, livekit.ReconnectReason)) {
fake.participantResumedMutex.Lock()
defer fake.participantResumedMutex.Unlock()
fake.ParticipantResumedStub = stub
}
func (fake *FakeTelemetryService) ParticipantResumedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID) {
func (fake *FakeTelemetryService) ParticipantResumedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID, livekit.ReconnectReason) {
fake.participantResumedMutex.RLock()
defer fake.participantResumedMutex.RUnlock()
argsForCall := fake.participantResumedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5
}
func (fake *FakeTelemetryService) RoomEnded(arg1 context.Context, arg2 *livekit.Room) {
+1 -1
View File
@@ -26,7 +26,7 @@ type TelemetryService interface {
// ParticipantActive - a participant establishes media connection
ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta)
// ParticipantResumed - there has been an ICE restart or connection resume attempt
ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID)
ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason)
// ParticipantLeft - the participant leaves the room, only sent if ParticipantActive has been called before
ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool)
// TrackPublishRequested - a publication attempt has been received