From 70574deaf71d5d0d96f9d08e6ec18cbc21dd66da Mon Sep 17 00:00:00 2001 From: David Zhao Date: Tue, 2 Feb 2021 00:40:55 -0800 Subject: [PATCH] better debug logs around negotiations --- cmd/cli/client/client.go | 13 ++--- pkg/rtc/mediatrack.go | 18 +++++-- pkg/rtc/participant.go | 31 +++++++---- pkg/service/roommanager.go | 2 +- test/integration_helpers.go | 25 +++++++-- test/multinode_test.go | 25 +++++++++ test/scenarios.go | 103 ++++++++++++++++++++++++++++++++++-- 7 files changed, 186 insertions(+), 31 deletions(-) diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index 9157a6bd3..a31ef2be6 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -25,6 +25,7 @@ import ( type RTCClient struct { id string + identity string conn *websocket.Conn PeerConn *webrtc.PeerConnection // sid => track @@ -173,13 +174,10 @@ func (c *RTCClient) Run() error { }) // create a data channel, in order to work - dc, err := c.PeerConn.CreateDataChannel("_private", nil) + _, err := c.PeerConn.CreateDataChannel("_private", nil) if err != nil { return err } - dc.OnOpen(func() { - logger.Debugw("data channel open") - }) // run the session for { @@ -190,6 +188,7 @@ func (c *RTCClient) Run() error { switch msg := res.Message.(type) { case *livekit.SignalResponse_Join: c.id = msg.Join.Participant.Sid + c.identity = msg.Join.Participant.Identity c.lock.Lock() for _, p := range msg.Join.OtherParticipants { @@ -310,6 +309,7 @@ func (c *RTCClient) ReadResponse() (*livekit.SignalResponse, error) { } } +// TODO: this function is not thread safe, need to cleanup func (c *RTCClient) SubscribedTracks() map[string][]*webrtc.TrackRemote { return c.subscribedTracks } @@ -517,11 +517,6 @@ func (c *RTCClient) negotiate() error { }) } -type logEntry struct { - msg string - args []interface{} -} - func (c *RTCClient) processTrack(track *webrtc.TrackRemote) { lastUpdate := time.Time{} pId, trackId := rtc.UnpackTrackId(track.ID()) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index f45185879..afa2af05f 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -103,6 +103,15 @@ func (t *MediaTrack) OnClose(f func()) { // subscribes participant to current remoteTrack // creates and add necessary forwarders and starts them func (t *MediaTrack) AddSubscriber(participant types.Participant) error { + t.lock.RLock() + existingDt := t.downtracks[participant.ID()] + t.lock.RUnlock() + + // don't subscribe to the same track multiple times + if existingDt != nil { + return nil + } + codec := t.codec // pack ID to identify all publishedTracks packedId := PackTrackId(t.participantId, t.id) @@ -131,7 +140,7 @@ func (t *MediaTrack) AddSubscriber(participant types.Participant) error { outTrack.OnBind(func() { if rr := bufferFactory.GetOrNew(packetio.RTCPBufferPacket, outTrack.SSRC()).(*buffer.RTCPReader); rr != nil { rr.OnPacket(func(pkt []byte) { - t.handleRTCP(outTrack, pkt) + t.handleRTCP(outTrack, participant.Identity(), pkt) }) } t.sendDownTrackBindingReports(participant.ID(), participant.RTCPChan()) @@ -152,7 +161,7 @@ func (t *MediaTrack) AddSubscriber(participant types.Participant) error { if sender != nil { logger.Debugw("removing peerconnection track", "track", t.id, - "srcParticipant", t.participantId, + "participantId", t.participantId, "destParticipant", participant.Identity()) if err := participant.PeerConnection().RemoveTrack(sender); err != nil { if err == webrtc.ErrConnectionClosed { @@ -161,7 +170,7 @@ func (t *MediaTrack) AddSubscriber(participant types.Participant) error { } if _, ok := err.(*rtcerr.InvalidStateError); !ok { logger.Warnw("could not remove remoteTrack from forwarder", - "participant", participant.ID(), + "participant", participant.Identity(), "err", err) } } @@ -292,7 +301,7 @@ func (t *MediaTrack) forwardRTPWorker() { } } -func (t *MediaTrack) handleRTCP(dt *sfu.DownTrack, rtcpBuf []byte) { +func (t *MediaTrack) handleRTCP(dt *sfu.DownTrack, identity string, rtcpBuf []byte) { defer Recover() pkts, err := rtcp.Unmarshal(rtcpBuf) if err != nil { @@ -325,6 +334,7 @@ func (t *MediaTrack) handleRTCP(dt *sfu.DownTrack, rtcpBuf []byte) { } case *rtcp.TransportLayerNack: logger.Debugw("forwarder got nack", + "participant", identity, "packet", p) var nackedPackets []uint16 for _, pair := range p.Nacks { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 462fa2755..a6ff4cf3d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -10,6 +10,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pkg/errors" + "github.com/thoas/go-funk" "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/routing" @@ -121,7 +122,7 @@ func NewParticipant(identity string, pc types.PeerConnection, rs routing.Message }) pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { - logger.Debugw("ICE connection state changed", "state", state.String()) + //logger.Debugw("ICE connection state changed", "state", state.String()) if state == webrtc.ICEConnectionStateConnected { participant.updateState(livekit.ParticipantInfo_ACTIVE) } else if state == webrtc.ICEConnectionStateDisconnected { @@ -210,13 +211,15 @@ func (p *ParticipantImpl) OnClose(callback func(types.Participant)) { // Answer an offer from remote participant, used when clients make the initial connection func (p *ParticipantImpl) Answer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) { - logger.Debugw("answering", "state", p.State().String()) if p.State() != livekit.ParticipantInfo_JOINING && p.negotiationState != negotiationStateClient { // not in a valid state to continue err = ErrUnexpectedNegotiation return } + logger.Debugw("answering client offer", "state", p.State().String(), + "participant", p.Identity()) + if err = p.peerConn.SetRemoteDescription(sdp); err != nil { return } @@ -283,7 +286,8 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { if sdp.Type != webrtc.SDPTypeAnswer { return ErrUnexpectedOffer } - logger.Debugw("setting remote answer") + logger.Debugw("setting participant answer", + "participant", p.Identity()) if err := p.peerConn.SetRemoteDescription(sdp); err != nil { return errors.Wrap(err, "could not set remote description") } @@ -298,6 +302,8 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error { // client requested negotiation, when it's able to, send a signal to let it func (p *ParticipantImpl) HandleClientNegotiation() { + logger.Debugw("participant requested negotiation", + "participant", p.Identity()) // wait until client is able to request negotiation p.negotiationCond.L.Lock() for p.negotiationState != negotiationStateNone { @@ -305,6 +311,9 @@ func (p *ParticipantImpl) HandleClientNegotiation() { } p.negotiationState = negotiationStateClient p.negotiationCond.L.Unlock() + + logger.Debugw("allowing participant to negotiate", + "participant", p.Identity()) p.responseSink.WriteMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Negotiate{ Negotiate: &livekit.NegotiationResponse{}, @@ -350,13 +359,14 @@ func (p *ParticipantImpl) Close() error { // Subscribes otherPeer to all of the publishedTracks func (p *ParticipantImpl) AddSubscriber(op types.Participant) error { p.lock.RLock() + tracks := funk.Values(p.publishedTracks).([]types.PublishedTrack) defer p.lock.RUnlock() - for _, track := range p.publishedTracks { - logger.Debugw("subscribing to remoteTrack", - "srcParticipant", p.Identity(), - "dstParticipant", op.Identity(), - "remoteTrack", track.ID()) + logger.Debugw("subscribing new participant to tracks", + "srcParticipant", p.Identity(), + "dstParticipant", op.Identity()) + + for _, track := range tracks { if err := track.AddSubscriber(op); err != nil { return err } @@ -459,7 +469,7 @@ func (p *ParticipantImpl) negotiate() { } p.negotiationCond.L.Unlock() - logger.Debugw("starting negotiation", "participant", p.ID()) + logger.Debugw("starting server negotiation", "participant", p.Identity()) offer, err := p.peerConn.CreateOffer(nil) if err != nil { logger.Errorw("could not create offer", "err", err) @@ -478,7 +488,8 @@ func (p *ParticipantImpl) negotiate() { return } - logger.Debugw("sending available offer to participant") + logger.Debugw("sending offer to participant", + "participant", p.Identity()) err = p.responseSink.WriteMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ Offer: ToProtoSessionDescription(offer), diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 7bc22b2ba..3d007e7be 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -257,7 +257,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici return } case *livekit.SignalRequest_AddTrack: - logger.Debugw("publishing track", "participant", participant.ID(), + logger.Debugw("add track request", "participant", participant.Identity(), "track", msg.AddTrack.Cid) participant.AddTrack(msg.AddTrack.Cid, msg.AddTrack.Name, msg.AddTrack.Type) case *livekit.SignalRequest_Answer: diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 45e4fc1a3..88abfbb16 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -30,6 +30,7 @@ const ( nodeId1 = "node-1" nodeId2 = "node-2" + syncDelay = 100 * time.Millisecond connectTimeout = 10 * time.Second // if there are deadlocks, it's helpful to set a short test timeout (i.e. go test -timeout=30s) // let connection timeout happen @@ -40,8 +41,11 @@ var ( roomClient livekit.RoomService ) -func setupSingleNodeTest(roomName string) *service.LivekitServer { +func init() { logger.InitDevelopment("") +} + +func setupSingleNodeTest(roomName string) *service.LivekitServer { s := createSingleNodeServer() go func() { s.Start() @@ -58,7 +62,6 @@ func setupSingleNodeTest(roomName string) *service.LivekitServer { } func setupMultiNodeTest() (*service.LivekitServer, *service.LivekitServer) { - logger.InitDevelopment("") s1 := createMultiNodeServer(nodeId1, defaultServerPort) s2 := createMultiNodeServer(nodeId2, secondServerPort) go s1.Start() @@ -100,21 +103,23 @@ func waitForServerToStart(s *service.LivekitServer) { } } -func withTimeout(t *testing.T, description string, f func() bool) { +func withTimeout(t *testing.T, description string, f func() bool) bool { ctx, _ := context.WithTimeout(context.Background(), connectTimeout) for { select { case <-ctx.Done(): t.Fatal("timed out: " + description) + return false case <-time.After(10 * time.Millisecond): if f() { - return + return true } } } } func waitUntilConnected(t *testing.T, clients ...*client.RTCClient) { + logger.Infow("waiting for clients to become connected") wg := sync.WaitGroup{} for i := range clients { c := clients[i] @@ -232,6 +237,18 @@ func createRoomToken() string { return t } +func stopWriters(writers ...*client.TrackWriter) { + for _, w := range writers { + w.Stop() + } +} + +func stopClients(clients ...*client.RTCClient) { + for _, c := range clients { + c.Stop() + } +} + type StaticKeyProvider struct { } diff --git a/test/multinode_test.go b/test/multinode_test.go index 03b5ca094..7640b1560 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/proto/livekit" ) @@ -14,6 +15,9 @@ func TestMultiNodeRouting(t *testing.T) { return } + logger.Infow("---Starting TestMultiNodeRouting---") + defer logger.Infow("---Finishing TestMultiNodeRouting---") + s1, s2 := setupMultiNodeTest() defer s1.Stop() defer s2.Stop() @@ -76,6 +80,9 @@ func TestConnectWithoutCreation(t *testing.T) { t.SkipNow() return } + logger.Infow("---Starting TestConnectWithoutCreation---") + defer logger.Infow("---Finishing TestConnectWithoutCreation---") + s1, s2 := setupMultiNodeTest() defer s1.Stop() defer s2.Stop() @@ -85,3 +92,21 @@ func TestConnectWithoutCreation(t *testing.T) { c1.Stop() } + +// testing multiple scenarios rooms +func TestMultinodeScenarios(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + logger.Infow("---Starting TestScenarios---") + defer logger.Infow("---Finishing TestScenarios---") + + s1, s2 := setupMultiNodeTest() + + scenarioPublishingUponJoining(t, defaultServerPort, secondServerPort) + + defer s1.Stop() + defer s2.Stop() +} diff --git a/test/scenarios.go b/test/scenarios.go index ac7b0cd1d..7d8668b01 100644 --- a/test/scenarios.go +++ b/test/scenarios.go @@ -1,20 +1,117 @@ package test import ( + "math/rand" "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/livekit/livekit-server/cmd/cli/client" + "github.com/livekit/livekit-server/pkg/logger" ) // a scenario with lots of clients connecting, publishing, and leaving at random periods -func scenarioRandom(t *testing.T) { +func scenarioPublishingUponJoining(t *testing.T, ports ...int) { + c1 := createRTCClient("puj_1", ports[rand.Intn(len(ports))]) + c2 := createRTCClient("puj_2", ports[rand.Intn(len(ports))]) + c3 := createRTCClient("puj_3", ports[rand.Intn(len(ports))]) + defer stopClients(c1, c2, c3) + waitUntilConnected(t, c1, c2, c3) + + // c1 and c2 publishing, c3 just receiving + writers := publishTracksForClients(t, c1, c2) + defer stopWriters(writers...) + + logger.Infow("waiting to receive tracks from c1 and c2") + success := withTimeout(t, "c3 should receive tracks from both clients", func() bool { + tracks := c3.SubscribedTracks() + if len(tracks[c1.ID()]) != 2 { + return false + } + if len(tracks[c2.ID()]) != 2 { + return false + } + return true + }) + + if !success { + t.FailNow() + } + + // after a delay, c2 reconnects, then publishing + time.Sleep(syncDelay) + c2.Stop() + + logger.Infow("waiting for c2 tracks to be gone") + success = withTimeout(t, "c2 tracks should be gone", func() bool { + tracks := c3.SubscribedTracks() + if len(tracks[c1.ID()]) != 2 { + return false + } + if len(tracks[c2.ID()]) != 0 { + return false + } + if len(c1.SubscribedTracks()[c2.ID()]) != 0 { + return false + } + return true + }) + if !success { + t.FailNow() + } + + logger.Infow("c2 reconnecting") + c2 = createRTCClient("puj_2", ports[rand.Intn(len(ports))]) + defer c2.Stop() + waitUntilConnected(t, c2) + writers = publishTracksForClients(t, c2) + defer stopWriters(writers...) + + logger.Infow("waiting for reconnected c2 tracks to publish") + success = withTimeout(t, "new c2 tracks should be published again", func() bool { + tracks := c3.SubscribedTracks() + if len(tracks[c2.ID()]) != 2 { + return false + } + if len(c1.SubscribedTracks()[c2.ID()]) != 2 { + return false + } + return true + }) + if !success { + t.FailNow() + } } // websocket reconnects func scenarioWSReconnect(t *testing.T) { - c1 := createRTCClient("wsr_c1", defaultServerPort) - c2 := createRTCClient("wsr_c2", defaultServerPort) + c1 := createRTCClient("wsr_1", defaultServerPort) + c2 := createRTCClient("wsr_2", defaultServerPort) waitUntilConnected(t, c1, c2) // c1 publishes track, but disconnects websockets and reconnects } + +func publishTracksForClients(t *testing.T, clients ...*client.RTCClient) []*client.TrackWriter { + logger.Infow("publishing tracks for clients") + var writers []*client.TrackWriter + for i, _ := range clients { + c := clients[i] + tw, err := c.AddStaticTrack("audio/opus", "audio", "webcam") + if !assert.NoError(t, err) { + t.FailNow() + return nil + } + writers = append(writers, tw) + tw, err = c.AddStaticTrack("video/vp8", "video", "webcam") + if !assert.NoError(t, err) { + t.FailNow() + return nil + } + writers = append(writers, tw) + } + return writers +}