diff --git a/go.mod b/go.mod index db5771db6..b46c58542 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a - github.com/livekit/protocol v1.3.3-0.20230202034647-c71216774a62 + github.com/livekit/protocol v1.3.3-0.20230206022348-f6d32e15b011 github.com/livekit/psrpc v0.2.5 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 diff --git a/go.sum b/go.sum index 8552da1c8..853439e8d 100644 --- a/go.sum +++ b/go.sum @@ -234,8 +234,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a h1:5UkGQpskXp7HcBmyrCwWtO7ygDWbqtjN09Yva4l/nyE= github.com/livekit/mediatransportutil v0.0.0-20230130133657-96cfb115473a/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= -github.com/livekit/protocol v1.3.3-0.20230202034647-c71216774a62 h1:wLkf7jiWtA0q+3y192KEkWIdKUrh+cXz/pBwVMZBwq4= -github.com/livekit/protocol v1.3.3-0.20230202034647-c71216774a62/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= +github.com/livekit/protocol v1.3.3-0.20230206022348-f6d32e15b011 h1:KUuxp1D8ok4tgQ5gzxfUSj0o0GFMy1zwveru1r5kk9w= +github.com/livekit/protocol v1.3.3-0.20230206022348-f6d32e15b011/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= github.com/livekit/psrpc v0.2.5 h1:+EZS78MGdBZxzCUwinDQ6pOeqPDURisrGtfyyqwUDSI= github.com/livekit/psrpc v0.2.5/go.mod h1:DyphtRRWvcIuCaldYg9VGpwGhu/HiKmNcysgpN6xKrM= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index b7cd5b406..9db0eb9ac 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -31,15 +31,16 @@ type MessageSource interface { } type ParticipantInit struct { - Identity livekit.ParticipantIdentity - Name livekit.ParticipantName - Reconnect bool - AutoSubscribe bool - Client *livekit.ClientInfo - Grants *auth.ClaimGrants - Region string - AdaptiveStream bool - ID livekit.ParticipantID + Identity livekit.ParticipantIdentity + Name livekit.ParticipantName + Reconnect bool + ReconnectReason livekit.ReconnectReason + AutoSubscribe bool + Client *livekit.ClientInfo + Grants *auth.ClaimGrants + Region string + AdaptiveStream bool + ID livekit.ParticipantID } type NewParticipantCallback func( @@ -116,13 +117,14 @@ func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionI Identity: string(pi.Identity), Name: string(pi.Name), // connection id is to allow the RTC node to identify where to route the message back to - ConnectionId: string(connectionID), - Reconnect: pi.Reconnect, - AutoSubscribe: pi.AutoSubscribe, - Client: pi.Client, - GrantsJson: string(claims), - AdaptiveStream: pi.AdaptiveStream, - ParticipantId: string(pi.ID), + ConnectionId: string(connectionID), + Reconnect: pi.Reconnect, + ReconnectReason: pi.ReconnectReason, + AutoSubscribe: pi.AutoSubscribe, + Client: pi.Client, + GrantsJson: string(claims), + AdaptiveStream: pi.AdaptiveStream, + ParticipantId: string(pi.ID), }, nil } @@ -133,14 +135,15 @@ func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (* } return &ParticipantInit{ - Identity: livekit.ParticipantIdentity(ss.Identity), - Name: livekit.ParticipantName(ss.Name), - Reconnect: ss.Reconnect, - Client: ss.Client, - AutoSubscribe: ss.AutoSubscribe, - Grants: claims, - Region: region, - AdaptiveStream: ss.AdaptiveStream, - ID: livekit.ParticipantID(ss.ParticipantId), + Identity: livekit.ParticipantIdentity(ss.Identity), + Name: livekit.ParticipantName(ss.Name), + Reconnect: ss.Reconnect, + ReconnectReason: ss.ReconnectReason, + Client: ss.Client, + AutoSubscribe: ss.AutoSubscribe, + Grants: claims, + Region: region, + AdaptiveStream: ss.AdaptiveStream, + ID: livekit.ParticipantID(ss.ParticipantId), }, nil } diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 13dc49221..1b7ebc259 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -158,7 +158,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * }) downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) { - go sub.UpdateRTT(rtt) + go sub.UpdateMediaRTT(rtt) }) downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 409386e3d..5ae250241 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -472,7 +472,7 @@ func (p *ParticipantImpl) HandleAnswer(answer webrtc.SessionDescription) { * 2. client send answer */ signalConnCost := time.Since(p.ConnectedAt()).Milliseconds() - p.TransportManager.UpdateRTT(uint32(signalConnCost), false) + p.TransportManager.UpdateSignalingRTT(uint32(signalConnCost)) p.TransportManager.HandleAnswer(answer) } @@ -750,7 +750,7 @@ func (p *ParticipantImpl) MigrateState() types.MigrateState { } // ICERestart restarts subscriber ICE connections -func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig) { +func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig, reason livekit.ReconnectReason) { p.clearDisconnectTimer() p.clearMigrationTimer() @@ -758,7 +758,7 @@ func (p *ParticipantImpl) ICERestart(iceConfig *livekit.ICEConfig) { t.(types.LocalMediaTrack).Restart() } - p.TransportManager.ICERestart(iceConfig) + p.TransportManager.ICERestart(iceConfig, reason == livekit.ReconnectReason_RR_PUBLISHER_FAILED || reason == livekit.ReconnectReason_RR_SUBSCRIBER_FAILED) } func (p *ParticipantImpl) OnICEConfigChanged(f func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig)) { @@ -919,7 +919,7 @@ func (p *ParticipantImpl) SubscriptionPermissionUpdate(publisherID livekit.Parti } } -func (p *ParticipantImpl) UpdateRTT(rtt uint32) { +func (p *ParticipantImpl) UpdateMediaRTT(rtt uint32) { now := time.Now() p.lock.Lock() if now.Sub(p.rttUpdatedAt) < rttUpdateInterval || p.lastRTT == rtt { @@ -929,7 +929,7 @@ func (p *ParticipantImpl) UpdateRTT(rtt uint32) { p.rttUpdatedAt = now p.lastRTT = rtt p.lock.Unlock() - p.TransportManager.UpdateRTT(rtt, true) + p.TransportManager.UpdateMediaRTT(rtt) for _, pt := range p.GetPublishedTracks() { pt.(types.LocalMediaTrack).SetRTT(rtt) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 78cefa9fc..2d6bb01e5 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -367,7 +367,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions return nil } -func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing.MessageSink, iceServers []*livekit.ICEServer) error { +func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing.MessageSink, iceServers []*livekit.ICEServer, reason livekit.ReconnectReason) error { // close previous sink, and link to new one p.CloseSignalConnection() p.SetResponseSink(responseSink) @@ -384,7 +384,7 @@ func (r *Room) ResumeParticipant(p types.LocalParticipant, responseSink routing. return err } - p.ICERestart(nil) + p.ICERestart(nil, reason) return nil } @@ -701,7 +701,7 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen participant.ICERestart(&livekit.ICEConfig{ PreferenceSubscriber: livekit.ICECandidateType(scenario.SwitchCandidateProtocol), PreferencePublisher: livekit.ICECandidateType(scenario.SwitchCandidateProtocol), - }) + }, livekit.ReconnectReason_RR_SWITCH_CANDIDATE) } return nil } diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index 163ee5ce6..dc49ade12 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -68,6 +68,11 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant pLogger.Warnw("could not simulate scenario", err, "simulate", msg.Simulate) } + + case *livekit.SignalRequest_PingReq: + if msg.PingReq.Rtt > 0 { + participant.UpdateSignalingRTT(uint32(msg.PingReq.Rtt)) + } } return nil } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 75f4210e5..c197cc3af 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -155,10 +155,12 @@ type PCTransport struct { lossyDCOpened bool onDataPacket func(kind livekit.DataPacket_Kind, data []byte) - iceStartedAt time.Time - iceConnectedAt time.Time - connectedAt time.Time - connectAfterICETimer *time.Timer // timer to wait for pc to connect after ice connected + iceStartedAt time.Time + iceConnectedAt time.Time + firstConnectedAt time.Time + connectedAt time.Time + connectAfterICETimer *time.Timer // timer to wait for pc to connect after ice connected + resetShortConnOnICERestart atomic.Bool onFullyEstablished func() @@ -419,19 +421,19 @@ func (t *PCTransport) setICEConnectedAt(at time.Time) { t.iceConnectedAt = at // set failure timer for dtls handshake - iceCost := at.Sub(t.iceStartedAt) - connTimeoutAfterICE := 3 * iceCost + iceDuration := at.Sub(t.iceStartedAt) + connTimeoutAfterICE := 3 * iceDuration if connTimeoutAfterICE < minConnectTimeoutAfterICE { connTimeoutAfterICE = minConnectTimeoutAfterICE } else if connTimeoutAfterICE > maxConnectTimeoutAfterICE { connTimeoutAfterICE = maxConnectTimeoutAfterICE } - t.params.Logger.Debugw("setting connection timer after ice connected", "timeout", connTimeoutAfterICE, "iceCost", iceCost) + t.params.Logger.Debugw("setting connection timer after ice connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration) t.connectAfterICETimer = time.AfterFunc(connTimeoutAfterICE, func() { state := t.pc.ConnectionState() // if pc is still checking or connected but not fully established after timeout, then fire connection fail if state != webrtc.PeerConnectionStateClosed && state != webrtc.PeerConnectionStateFailed && !t.isFullyEstablished() { - t.params.Logger.Infow("connect timeout after ICE connected", "timeout", connTimeoutAfterICE, "iceCost", iceCost) + t.params.Logger.Infow("connect timeout after ICE connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration) t.handleConnectionFailed() } }) @@ -439,6 +441,19 @@ func (t *PCTransport) setICEConnectedAt(at time.Time) { t.lock.Unlock() } +func (t *PCTransport) resetShortConn() { + t.params.Logger.Infow("resetting short connection on ICE restart") + t.lock.Lock() + t.iceStartedAt = time.Time{} + t.iceConnectedAt = time.Time{} + t.connectedAt = time.Time{} + if t.connectAfterICETimer != nil { + t.connectAfterICETimer.Stop() + t.connectAfterICETimer = nil + } + t.lock.Unlock() +} + func (t *PCTransport) isShortConnection(at time.Time) (bool, time.Duration) { t.lock.RLock() defer t.lock.RUnlock() @@ -478,12 +493,13 @@ func (t *PCTransport) logICECandidates() { func (t *PCTransport) setConnectedAt(at time.Time) bool { t.lock.Lock() - if !t.connectedAt.IsZero() { + t.connectedAt = at + if !t.firstConnectedAt.IsZero() { t.lock.Unlock() return false } - t.connectedAt = at + t.firstConnectedAt = at prometheus.ServiceOperationCounter.WithLabelValues("peer_connection", "success", "").Add(1) t.lock.Unlock() return true @@ -781,7 +797,7 @@ func (t *PCTransport) HasEverConnected() bool { t.lock.RLock() defer t.lock.RUnlock() - return !t.connectedAt.IsZero() + return !t.firstConnectedAt.IsZero() } func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error { @@ -1002,6 +1018,10 @@ func (t *PCTransport) ICERestart() { }) } +func (t *PCTransport) ResetShortConnOnICERestart() { + t.resetShortConnOnICERestart.Store(true) +} + func (t *PCTransport) OnStreamStateChange(f func(update *sfu.StreamStateUpdate) error) { if t.streamAllocator == nil { return @@ -1724,6 +1744,10 @@ func (t *PCTransport) handleRemoteOfferReceived(sd *webrtc.SessionDescription) e return nil } + if offerRestartICE && t.resetShortConnOnICERestart.CompareAndSwap(true, false) { + t.resetShortConn() + } + if err := t.setRemoteDescription(*sd); err != nil { return err } @@ -1766,6 +1790,10 @@ func (t *PCTransport) doICERestart() error { return nil } + if t.resetShortConnOnICERestart.CompareAndSwap(true, false) { + t.resetShortConn() + } + if t.negotiationState == NegotiationStateNone { return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true}) } diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 7c4a1842b..ce4c5cc48 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -70,7 +70,7 @@ type TransportManager struct { mediaLossProxy *MediaLossProxy udpLossUnstableCount uint32 - tcpRTT, udpRTT uint32 + signalingRTT, udpRTT uint32 onPublisherInitialConnected func() onSubscriberInitialConnected func() @@ -439,11 +439,15 @@ func (t *TransportManager) NegotiateSubscriber(force bool) { t.subscriber.Negotiate(force) } -func (t *TransportManager) ICERestart(iceConfig *livekit.ICEConfig) { +func (t *TransportManager) ICERestart(iceConfig *livekit.ICEConfig, resetShortConnection bool) { if iceConfig != nil { t.SetICEConfig(iceConfig) } + if resetShortConnection { + t.publisher.ResetShortConnOnICERestart() + t.subscriber.ResetShortConnOnICERestart() + } t.subscriber.ICERestart() } @@ -662,11 +666,11 @@ func (t *TransportManager) onMediaLossUpdate(loss uint8) { if loss >= uint8(255*udpLossFracUnstable/100) { t.udpLossUnstableCount |= 1 if bits.OnesCount32(t.udpLossUnstableCount) >= udpLossUnstableCountThreshold { - if t.udpRTT > 0 && t.tcpRTT < uint32(float32(t.udpRTT)*1.3) && t.tcpRTT < tcpGoodRTT { + if t.udpRTT > 0 && t.signalingRTT < uint32(float32(t.udpRTT)*1.3) && t.signalingRTT < tcpGoodRTT && time.Since(t.lastSignalAt) < iceFailedTimeout { t.udpLossUnstableCount = 0 t.lock.Unlock() - t.params.Logger.Infow("udp connection unstable, switch to tcp") + t.params.Logger.Infow("udp connection unstable, switch to tcp", "signalingRTT", t.signalingRTT) t.handleConnectionFailed(true) if t.onAnyTransportFailed != nil { t.onAnyTransportFailed() @@ -678,19 +682,19 @@ func (t *TransportManager) onMediaLossUpdate(loss uint8) { t.lock.Unlock() } -func (t *TransportManager) UpdateRTT(rtt uint32, isUDP bool) { - if isUDP { - if t.udpRTT == 0 { - t.udpRTT = rtt - } else { - t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2) - } - } else { - t.tcpRTT = rtt +func (t *TransportManager) UpdateSignalingRTT(rtt uint32) { + t.signalingRTT = rtt - // TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established - // within 5 * tcp rtt(at least 5s), means udp traffic might be block/dropped, switch to tcp. - // Currently, most cases reported is that ice connected but subsequent connection, so left the thinking for now. + // TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established + // within 5 * tcp rtt(at least 5s), means udp traffic might be block/dropped, switch to tcp. + // Currently, most cases reported is that ice connected but subsequent connection, so left the thinking for now. +} + +func (t *TransportManager) UpdateMediaRTT(rtt uint32) { + if t.udpRTT == 0 { + t.udpRTT = rtt + } else { + t.udpRTT = uint32(int(t.udpRTT) + (int(rtt)-int(t.udpRTT))/2) } } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 4eee66f8b..10654ed4c 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -262,7 +262,7 @@ type LocalParticipant interface { HandleAnswer(sdp webrtc.SessionDescription) Negotiate(force bool) - ICERestart(iceConfig *livekit.ICEConfig) + ICERestart(iceConfig *livekit.ICEConfig, reason livekit.ReconnectReason) AddTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) AddTransceiverFromTrackToSubscriber(trackLocal webrtc.TrackLocal, params AddTrackParams) (*webrtc.RTPSender, *webrtc.RTPTransceiver, error) RemoveTrackFromSubscriber(sender *webrtc.RTPSender) error @@ -317,7 +317,8 @@ type LocalParticipant interface { MigrateState() MigrateState SetMigrateInfo(previousOffer, previousAnswer *webrtc.SessionDescription, mediaTracks []*livekit.TrackPublishedResponse, dataChannels []*livekit.DataChannelInfo) - UpdateRTT(rtt uint32) + UpdateMediaRTT(rtt uint32) + UpdateSignalingRTT(rtt uint32) CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, downTrackState sfu.DownTrackState) UncacheDownTrack(rtpTransceiver *webrtc.RTPTransceiver) diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index d7355a871..98ae3f24d 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -301,10 +301,11 @@ type FakeLocalParticipant struct { hiddenReturnsOnCall map[int]struct { result1 bool } - ICERestartStub func(*livekit.ICEConfig) + ICERestartStub func(*livekit.ICEConfig, livekit.ReconnectReason) iCERestartMutex sync.RWMutex iCERestartArgsForCall []struct { arg1 *livekit.ICEConfig + arg2 livekit.ReconnectReason } IDStub func() livekit.ParticipantID iDMutex sync.RWMutex @@ -735,9 +736,14 @@ type FakeLocalParticipant struct { updateMediaLossReturnsOnCall map[int]struct { result1 error } - UpdateRTTStub func(uint32) - updateRTTMutex sync.RWMutex - updateRTTArgsForCall []struct { + UpdateMediaRTTStub func(uint32) + updateMediaRTTMutex sync.RWMutex + updateMediaRTTArgsForCall []struct { + arg1 uint32 + } + UpdateSignalingRTTStub func(uint32) + updateSignalingRTTMutex sync.RWMutex + updateSignalingRTTArgsForCall []struct { arg1 uint32 } UpdateSubscribedQualityStub func(livekit.NodeID, livekit.TrackID, []types.SubscribedCodecQuality) error @@ -2281,16 +2287,17 @@ func (fake *FakeLocalParticipant) HiddenReturnsOnCall(i int, result1 bool) { }{result1} } -func (fake *FakeLocalParticipant) ICERestart(arg1 *livekit.ICEConfig) { +func (fake *FakeLocalParticipant) ICERestart(arg1 *livekit.ICEConfig, arg2 livekit.ReconnectReason) { fake.iCERestartMutex.Lock() fake.iCERestartArgsForCall = append(fake.iCERestartArgsForCall, struct { arg1 *livekit.ICEConfig - }{arg1}) + arg2 livekit.ReconnectReason + }{arg1, arg2}) stub := fake.ICERestartStub - fake.recordInvocation("ICERestart", []interface{}{arg1}) + fake.recordInvocation("ICERestart", []interface{}{arg1, arg2}) fake.iCERestartMutex.Unlock() if stub != nil { - fake.ICERestartStub(arg1) + fake.ICERestartStub(arg1, arg2) } } @@ -2300,17 +2307,17 @@ func (fake *FakeLocalParticipant) ICERestartCallCount() int { return len(fake.iCERestartArgsForCall) } -func (fake *FakeLocalParticipant) ICERestartCalls(stub func(*livekit.ICEConfig)) { +func (fake *FakeLocalParticipant) ICERestartCalls(stub func(*livekit.ICEConfig, livekit.ReconnectReason)) { fake.iCERestartMutex.Lock() defer fake.iCERestartMutex.Unlock() fake.ICERestartStub = stub } -func (fake *FakeLocalParticipant) ICERestartArgsForCall(i int) *livekit.ICEConfig { +func (fake *FakeLocalParticipant) ICERestartArgsForCall(i int) (*livekit.ICEConfig, livekit.ReconnectReason) { fake.iCERestartMutex.RLock() defer fake.iCERestartMutex.RUnlock() argsForCall := fake.iCERestartArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeLocalParticipant) ID() livekit.ParticipantID { @@ -4708,35 +4715,67 @@ func (fake *FakeLocalParticipant) UpdateMediaLossReturnsOnCall(i int, result1 er }{result1} } -func (fake *FakeLocalParticipant) UpdateRTT(arg1 uint32) { - fake.updateRTTMutex.Lock() - fake.updateRTTArgsForCall = append(fake.updateRTTArgsForCall, struct { +func (fake *FakeLocalParticipant) UpdateMediaRTT(arg1 uint32) { + fake.updateMediaRTTMutex.Lock() + fake.updateMediaRTTArgsForCall = append(fake.updateMediaRTTArgsForCall, struct { arg1 uint32 }{arg1}) - stub := fake.UpdateRTTStub - fake.recordInvocation("UpdateRTT", []interface{}{arg1}) - fake.updateRTTMutex.Unlock() + stub := fake.UpdateMediaRTTStub + fake.recordInvocation("UpdateMediaRTT", []interface{}{arg1}) + fake.updateMediaRTTMutex.Unlock() if stub != nil { - fake.UpdateRTTStub(arg1) + fake.UpdateMediaRTTStub(arg1) } } -func (fake *FakeLocalParticipant) UpdateRTTCallCount() int { - fake.updateRTTMutex.RLock() - defer fake.updateRTTMutex.RUnlock() - return len(fake.updateRTTArgsForCall) +func (fake *FakeLocalParticipant) UpdateMediaRTTCallCount() int { + fake.updateMediaRTTMutex.RLock() + defer fake.updateMediaRTTMutex.RUnlock() + return len(fake.updateMediaRTTArgsForCall) } -func (fake *FakeLocalParticipant) UpdateRTTCalls(stub func(uint32)) { - fake.updateRTTMutex.Lock() - defer fake.updateRTTMutex.Unlock() - fake.UpdateRTTStub = stub +func (fake *FakeLocalParticipant) UpdateMediaRTTCalls(stub func(uint32)) { + fake.updateMediaRTTMutex.Lock() + defer fake.updateMediaRTTMutex.Unlock() + fake.UpdateMediaRTTStub = stub } -func (fake *FakeLocalParticipant) UpdateRTTArgsForCall(i int) uint32 { - fake.updateRTTMutex.RLock() - defer fake.updateRTTMutex.RUnlock() - argsForCall := fake.updateRTTArgsForCall[i] +func (fake *FakeLocalParticipant) UpdateMediaRTTArgsForCall(i int) uint32 { + fake.updateMediaRTTMutex.RLock() + defer fake.updateMediaRTTMutex.RUnlock() + argsForCall := fake.updateMediaRTTArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeLocalParticipant) UpdateSignalingRTT(arg1 uint32) { + fake.updateSignalingRTTMutex.Lock() + fake.updateSignalingRTTArgsForCall = append(fake.updateSignalingRTTArgsForCall, struct { + arg1 uint32 + }{arg1}) + stub := fake.UpdateSignalingRTTStub + fake.recordInvocation("UpdateSignalingRTT", []interface{}{arg1}) + fake.updateSignalingRTTMutex.Unlock() + if stub != nil { + fake.UpdateSignalingRTTStub(arg1) + } +} + +func (fake *FakeLocalParticipant) UpdateSignalingRTTCallCount() int { + fake.updateSignalingRTTMutex.RLock() + defer fake.updateSignalingRTTMutex.RUnlock() + return len(fake.updateSignalingRTTArgsForCall) +} + +func (fake *FakeLocalParticipant) UpdateSignalingRTTCalls(stub func(uint32)) { + fake.updateSignalingRTTMutex.Lock() + defer fake.updateSignalingRTTMutex.Unlock() + fake.UpdateSignalingRTTStub = stub +} + +func (fake *FakeLocalParticipant) UpdateSignalingRTTArgsForCall(i int) uint32 { + fake.updateSignalingRTTMutex.RLock() + defer fake.updateSignalingRTTMutex.RUnlock() + argsForCall := fake.updateSignalingRTTArgsForCall[i] return argsForCall.arg1 } @@ -5229,8 +5268,10 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.updateLastSeenSignalMutex.RUnlock() fake.updateMediaLossMutex.RLock() defer fake.updateMediaLossMutex.RUnlock() - fake.updateRTTMutex.RLock() - defer fake.updateRTTMutex.RUnlock() + fake.updateMediaRTTMutex.RLock() + defer fake.updateMediaRTTMutex.RUnlock() + fake.updateSignalingRTTMutex.RLock() + defer fake.updateSignalingRTTMutex.RUnlock() fake.updateSubscribedQualityMutex.RLock() defer fake.updateSubscribedQualityMutex.RUnlock() fake.updateSubscribedTrackSettingsMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 4715df802..b89dc8ae8 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -237,6 +237,7 @@ func (r *RoomManager) StartSession( "room", roomName, "nodeID", r.currentNode.Id, "participant", pi.Identity, + "reason", pi.ReconnectReason, ) iceConfig := r.getIceConfig(participant) if iceConfig == nil { @@ -244,11 +245,11 @@ func (r *RoomManager) StartSession( } if err = room.ResumeParticipant(participant, responseSink, r.iceServersForRoom(protoRoom, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS), - ); err != nil { + pi.ReconnectReason); err != nil { logger.Warnw("could not resume participant", err, "participant", pi.Identity) return err } - r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id)) + r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id), pi.ReconnectReason) go r.rtcSessionWorker(room, participant, requestSource) return nil } else { diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 4fda2596f..fb52611dd 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -98,6 +98,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic roomName := livekit.RoomName(r.FormValue("room")) reconnectParam := r.FormValue("reconnect") + reconnectReason, _ := strconv.Atoi(r.FormValue("reconnect_reason")) // 0 means unknown reason autoSubParam := r.FormValue("auto_subscribe") publishParam := r.FormValue("publish") adaptiveStreamParam := r.FormValue("adaptive_stream") @@ -139,13 +140,14 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic } pi = routing.ParticipantInit{ - Reconnect: boolValue(reconnectParam), - Identity: livekit.ParticipantIdentity(claims.Identity), - Name: livekit.ParticipantName(claims.Name), - AutoSubscribe: true, - Client: s.ParseClientInfo(r), - Grants: claims, - Region: region, + Reconnect: boolValue(reconnectParam), + ReconnectReason: livekit.ReconnectReason(reconnectReason), + Identity: livekit.ParticipantIdentity(claims.Identity), + Name: livekit.ParticipantName(claims.Name), + AutoSubscribe: true, + Client: s.ParseClientInfo(r), + Grants: claims, + Region: region, } if pi.Reconnect { pi.ID = livekit.ParticipantID(participantID) @@ -320,7 +322,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if signalStats != nil { signalStats.AddBytes(uint64(count), false) } - if _, ok := req.Message.(*livekit.SignalRequest_Ping); ok { + + switch m := req.Message.(type) { + case *livekit.SignalRequest_Ping: count, perr := sigConn.WriteResponse(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Pong{ // @@ -333,6 +337,18 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if perr == nil && signalStats != nil { signalStats.AddBytes(uint64(count), true) } + case *livekit.SignalRequest_PingReq: + count, perr := sigConn.WriteResponse(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_PongResp{ + PongResp: &livekit.Pong{ + LastPingTimestamp: m.PingReq.Timestamp, + Timestamp: time.Now().UnixMilli(), + }, + }, + }) + if perr == nil && signalStats != nil { + signalStats.AddBytes(uint64(count), true) + } } switch m := req.Message.(type) { diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 0017a146c..47f4744a0 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -130,11 +130,13 @@ func (t *telemetryService) ParticipantResumed( room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, + reason livekit.ReconnectReason, ) { t.enqueue(func() { ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_RESUMED, room, participant) ev.ClientMeta = &livekit.AnalyticsClientMeta{ - Node: string(nodeID), + Node: string(nodeID), + ReconnectReason: reason, } t.SendEvent(ctx, ev) }) diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index a5121bc10..666a46d46 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -58,13 +58,14 @@ type FakeTelemetryService struct { arg3 *livekit.ParticipantInfo arg4 bool } - ParticipantResumedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID) + ParticipantResumedStub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID, livekit.ReconnectReason) participantResumedMutex sync.RWMutex participantResumedArgsForCall []struct { arg1 context.Context arg2 *livekit.Room arg3 *livekit.ParticipantInfo arg4 livekit.NodeID + arg5 livekit.ReconnectReason } RoomEndedStub func(context.Context, *livekit.Room) roomEndedMutex sync.RWMutex @@ -419,19 +420,20 @@ func (fake *FakeTelemetryService) ParticipantLeftArgsForCall(i int) (context.Con return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 } -func (fake *FakeTelemetryService) ParticipantResumed(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 livekit.NodeID) { +func (fake *FakeTelemetryService) ParticipantResumed(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.ParticipantInfo, arg4 livekit.NodeID, arg5 livekit.ReconnectReason) { fake.participantResumedMutex.Lock() fake.participantResumedArgsForCall = append(fake.participantResumedArgsForCall, struct { arg1 context.Context arg2 *livekit.Room arg3 *livekit.ParticipantInfo arg4 livekit.NodeID - }{arg1, arg2, arg3, arg4}) + arg5 livekit.ReconnectReason + }{arg1, arg2, arg3, arg4, arg5}) stub := fake.ParticipantResumedStub - fake.recordInvocation("ParticipantResumed", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("ParticipantResumed", []interface{}{arg1, arg2, arg3, arg4, arg5}) fake.participantResumedMutex.Unlock() if stub != nil { - fake.ParticipantResumedStub(arg1, arg2, arg3, arg4) + fake.ParticipantResumedStub(arg1, arg2, arg3, arg4, arg5) } } @@ -441,17 +443,17 @@ func (fake *FakeTelemetryService) ParticipantResumedCallCount() int { return len(fake.participantResumedArgsForCall) } -func (fake *FakeTelemetryService) ParticipantResumedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID)) { +func (fake *FakeTelemetryService) ParticipantResumedCalls(stub func(context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID, livekit.ReconnectReason)) { fake.participantResumedMutex.Lock() defer fake.participantResumedMutex.Unlock() fake.ParticipantResumedStub = stub } -func (fake *FakeTelemetryService) ParticipantResumedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID) { +func (fake *FakeTelemetryService) ParticipantResumedArgsForCall(i int) (context.Context, *livekit.Room, *livekit.ParticipantInfo, livekit.NodeID, livekit.ReconnectReason) { fake.participantResumedMutex.RLock() defer fake.participantResumedMutex.RUnlock() argsForCall := fake.participantResumedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeTelemetryService) RoomEnded(arg1 context.Context, arg2 *livekit.Room) { diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 85b2d26d5..57d9e05c1 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -26,7 +26,7 @@ type TelemetryService interface { // ParticipantActive - a participant establishes media connection ParticipantActive(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, clientMeta *livekit.AnalyticsClientMeta) // ParticipantResumed - there has been an ICE restart or connection resume attempt - ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID) + ParticipantResumed(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, nodeID livekit.NodeID, reason livekit.ReconnectReason) // ParticipantLeft - the participant leaves the room, only sent if ParticipantActive has been called before ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo, shouldSendEvent bool) // TrackPublishRequested - a publication attempt has been received