better debug logs around negotiations

This commit is contained in:
David Zhao
2021-02-02 00:40:55 -08:00
parent 712d0da6cc
commit 70574deaf7
7 changed files with 186 additions and 31 deletions

View File

@@ -25,6 +25,7 @@ import (
type RTCClient struct {
id string
identity string
conn *websocket.Conn
PeerConn *webrtc.PeerConnection
// sid => track
@@ -173,13 +174,10 @@ func (c *RTCClient) Run() error {
})
// create a data channel, in order to work
dc, err := c.PeerConn.CreateDataChannel("_private", nil)
_, err := c.PeerConn.CreateDataChannel("_private", nil)
if err != nil {
return err
}
dc.OnOpen(func() {
logger.Debugw("data channel open")
})
// run the session
for {
@@ -190,6 +188,7 @@ func (c *RTCClient) Run() error {
switch msg := res.Message.(type) {
case *livekit.SignalResponse_Join:
c.id = msg.Join.Participant.Sid
c.identity = msg.Join.Participant.Identity
c.lock.Lock()
for _, p := range msg.Join.OtherParticipants {
@@ -310,6 +309,7 @@ func (c *RTCClient) ReadResponse() (*livekit.SignalResponse, error) {
}
}
// TODO: this function is not thread safe, need to cleanup
func (c *RTCClient) SubscribedTracks() map[string][]*webrtc.TrackRemote {
return c.subscribedTracks
}
@@ -517,11 +517,6 @@ func (c *RTCClient) negotiate() error {
})
}
type logEntry struct {
msg string
args []interface{}
}
func (c *RTCClient) processTrack(track *webrtc.TrackRemote) {
lastUpdate := time.Time{}
pId, trackId := rtc.UnpackTrackId(track.ID())

View File

@@ -103,6 +103,15 @@ func (t *MediaTrack) OnClose(f func()) {
// subscribes participant to current remoteTrack
// creates and add necessary forwarders and starts them
func (t *MediaTrack) AddSubscriber(participant types.Participant) error {
t.lock.RLock()
existingDt := t.downtracks[participant.ID()]
t.lock.RUnlock()
// don't subscribe to the same track multiple times
if existingDt != nil {
return nil
}
codec := t.codec
// pack ID to identify all publishedTracks
packedId := PackTrackId(t.participantId, t.id)
@@ -131,7 +140,7 @@ func (t *MediaTrack) AddSubscriber(participant types.Participant) error {
outTrack.OnBind(func() {
if rr := bufferFactory.GetOrNew(packetio.RTCPBufferPacket, outTrack.SSRC()).(*buffer.RTCPReader); rr != nil {
rr.OnPacket(func(pkt []byte) {
t.handleRTCP(outTrack, pkt)
t.handleRTCP(outTrack, participant.Identity(), pkt)
})
}
t.sendDownTrackBindingReports(participant.ID(), participant.RTCPChan())
@@ -152,7 +161,7 @@ func (t *MediaTrack) AddSubscriber(participant types.Participant) error {
if sender != nil {
logger.Debugw("removing peerconnection track",
"track", t.id,
"srcParticipant", t.participantId,
"participantId", t.participantId,
"destParticipant", participant.Identity())
if err := participant.PeerConnection().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
@@ -161,7 +170,7 @@ func (t *MediaTrack) AddSubscriber(participant types.Participant) error {
}
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
logger.Warnw("could not remove remoteTrack from forwarder",
"participant", participant.ID(),
"participant", participant.Identity(),
"err", err)
}
}
@@ -292,7 +301,7 @@ func (t *MediaTrack) forwardRTPWorker() {
}
}
func (t *MediaTrack) handleRTCP(dt *sfu.DownTrack, rtcpBuf []byte) {
func (t *MediaTrack) handleRTCP(dt *sfu.DownTrack, identity string, rtcpBuf []byte) {
defer Recover()
pkts, err := rtcp.Unmarshal(rtcpBuf)
if err != nil {
@@ -325,6 +334,7 @@ func (t *MediaTrack) handleRTCP(dt *sfu.DownTrack, rtcpBuf []byte) {
}
case *rtcp.TransportLayerNack:
logger.Debugw("forwarder got nack",
"participant", identity,
"packet", p)
var nackedPackets []uint16
for _, pair := range p.Nacks {

View File

@@ -10,6 +10,7 @@ import (
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pkg/errors"
"github.com/thoas/go-funk"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/routing"
@@ -121,7 +122,7 @@ func NewParticipant(identity string, pc types.PeerConnection, rs routing.Message
})
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
logger.Debugw("ICE connection state changed", "state", state.String())
//logger.Debugw("ICE connection state changed", "state", state.String())
if state == webrtc.ICEConnectionStateConnected {
participant.updateState(livekit.ParticipantInfo_ACTIVE)
} else if state == webrtc.ICEConnectionStateDisconnected {
@@ -210,13 +211,15 @@ func (p *ParticipantImpl) OnClose(callback func(types.Participant)) {
// Answer an offer from remote participant, used when clients make the initial connection
func (p *ParticipantImpl) Answer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) {
logger.Debugw("answering", "state", p.State().String())
if p.State() != livekit.ParticipantInfo_JOINING && p.negotiationState != negotiationStateClient {
// not in a valid state to continue
err = ErrUnexpectedNegotiation
return
}
logger.Debugw("answering client offer", "state", p.State().String(),
"participant", p.Identity())
if err = p.peerConn.SetRemoteDescription(sdp); err != nil {
return
}
@@ -283,7 +286,8 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
if sdp.Type != webrtc.SDPTypeAnswer {
return ErrUnexpectedOffer
}
logger.Debugw("setting remote answer")
logger.Debugw("setting participant answer",
"participant", p.Identity())
if err := p.peerConn.SetRemoteDescription(sdp); err != nil {
return errors.Wrap(err, "could not set remote description")
}
@@ -298,6 +302,8 @@ func (p *ParticipantImpl) HandleAnswer(sdp webrtc.SessionDescription) error {
// client requested negotiation, when it's able to, send a signal to let it
func (p *ParticipantImpl) HandleClientNegotiation() {
logger.Debugw("participant requested negotiation",
"participant", p.Identity())
// wait until client is able to request negotiation
p.negotiationCond.L.Lock()
for p.negotiationState != negotiationStateNone {
@@ -305,6 +311,9 @@ func (p *ParticipantImpl) HandleClientNegotiation() {
}
p.negotiationState = negotiationStateClient
p.negotiationCond.L.Unlock()
logger.Debugw("allowing participant to negotiate",
"participant", p.Identity())
p.responseSink.WriteMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Negotiate{
Negotiate: &livekit.NegotiationResponse{},
@@ -350,13 +359,14 @@ func (p *ParticipantImpl) Close() error {
// Subscribes otherPeer to all of the publishedTracks
func (p *ParticipantImpl) AddSubscriber(op types.Participant) error {
p.lock.RLock()
tracks := funk.Values(p.publishedTracks).([]types.PublishedTrack)
defer p.lock.RUnlock()
for _, track := range p.publishedTracks {
logger.Debugw("subscribing to remoteTrack",
"srcParticipant", p.Identity(),
"dstParticipant", op.Identity(),
"remoteTrack", track.ID())
logger.Debugw("subscribing new participant to tracks",
"srcParticipant", p.Identity(),
"dstParticipant", op.Identity())
for _, track := range tracks {
if err := track.AddSubscriber(op); err != nil {
return err
}
@@ -459,7 +469,7 @@ func (p *ParticipantImpl) negotiate() {
}
p.negotiationCond.L.Unlock()
logger.Debugw("starting negotiation", "participant", p.ID())
logger.Debugw("starting server negotiation", "participant", p.Identity())
offer, err := p.peerConn.CreateOffer(nil)
if err != nil {
logger.Errorw("could not create offer", "err", err)
@@ -478,7 +488,8 @@ func (p *ParticipantImpl) negotiate() {
return
}
logger.Debugw("sending available offer to participant")
logger.Debugw("sending offer to participant",
"participant", p.Identity())
err = p.responseSink.WriteMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: ToProtoSessionDescription(offer),

View File

@@ -257,7 +257,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
return
}
case *livekit.SignalRequest_AddTrack:
logger.Debugw("publishing track", "participant", participant.ID(),
logger.Debugw("add track request", "participant", participant.Identity(),
"track", msg.AddTrack.Cid)
participant.AddTrack(msg.AddTrack.Cid, msg.AddTrack.Name, msg.AddTrack.Type)
case *livekit.SignalRequest_Answer:

View File

@@ -30,6 +30,7 @@ const (
nodeId1 = "node-1"
nodeId2 = "node-2"
syncDelay = 100 * time.Millisecond
connectTimeout = 10 * time.Second
// if there are deadlocks, it's helpful to set a short test timeout (i.e. go test -timeout=30s)
// let connection timeout happen
@@ -40,8 +41,11 @@ var (
roomClient livekit.RoomService
)
func setupSingleNodeTest(roomName string) *service.LivekitServer {
func init() {
logger.InitDevelopment("")
}
func setupSingleNodeTest(roomName string) *service.LivekitServer {
s := createSingleNodeServer()
go func() {
s.Start()
@@ -58,7 +62,6 @@ func setupSingleNodeTest(roomName string) *service.LivekitServer {
}
func setupMultiNodeTest() (*service.LivekitServer, *service.LivekitServer) {
logger.InitDevelopment("")
s1 := createMultiNodeServer(nodeId1, defaultServerPort)
s2 := createMultiNodeServer(nodeId2, secondServerPort)
go s1.Start()
@@ -100,21 +103,23 @@ func waitForServerToStart(s *service.LivekitServer) {
}
}
func withTimeout(t *testing.T, description string, f func() bool) {
func withTimeout(t *testing.T, description string, f func() bool) bool {
ctx, _ := context.WithTimeout(context.Background(), connectTimeout)
for {
select {
case <-ctx.Done():
t.Fatal("timed out: " + description)
return false
case <-time.After(10 * time.Millisecond):
if f() {
return
return true
}
}
}
}
func waitUntilConnected(t *testing.T, clients ...*client.RTCClient) {
logger.Infow("waiting for clients to become connected")
wg := sync.WaitGroup{}
for i := range clients {
c := clients[i]
@@ -232,6 +237,18 @@ func createRoomToken() string {
return t
}
func stopWriters(writers ...*client.TrackWriter) {
for _, w := range writers {
w.Stop()
}
}
func stopClients(clients ...*client.RTCClient) {
for _, c := range clients {
c.Stop()
}
}
type StaticKeyProvider struct {
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/proto/livekit"
)
@@ -14,6 +15,9 @@ func TestMultiNodeRouting(t *testing.T) {
return
}
logger.Infow("---Starting TestMultiNodeRouting---")
defer logger.Infow("---Finishing TestMultiNodeRouting---")
s1, s2 := setupMultiNodeTest()
defer s1.Stop()
defer s2.Stop()
@@ -76,6 +80,9 @@ func TestConnectWithoutCreation(t *testing.T) {
t.SkipNow()
return
}
logger.Infow("---Starting TestConnectWithoutCreation---")
defer logger.Infow("---Finishing TestConnectWithoutCreation---")
s1, s2 := setupMultiNodeTest()
defer s1.Stop()
defer s2.Stop()
@@ -85,3 +92,21 @@ func TestConnectWithoutCreation(t *testing.T) {
c1.Stop()
}
// testing multiple scenarios rooms
func TestMultinodeScenarios(t *testing.T) {
if testing.Short() {
t.SkipNow()
return
}
logger.Infow("---Starting TestScenarios---")
defer logger.Infow("---Finishing TestScenarios---")
s1, s2 := setupMultiNodeTest()
scenarioPublishingUponJoining(t, defaultServerPort, secondServerPort)
defer s1.Stop()
defer s2.Stop()
}

View File

@@ -1,20 +1,117 @@
package test
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/livekit/livekit-server/cmd/cli/client"
"github.com/livekit/livekit-server/pkg/logger"
)
// a scenario with lots of clients connecting, publishing, and leaving at random periods
func scenarioRandom(t *testing.T) {
func scenarioPublishingUponJoining(t *testing.T, ports ...int) {
c1 := createRTCClient("puj_1", ports[rand.Intn(len(ports))])
c2 := createRTCClient("puj_2", ports[rand.Intn(len(ports))])
c3 := createRTCClient("puj_3", ports[rand.Intn(len(ports))])
defer stopClients(c1, c2, c3)
waitUntilConnected(t, c1, c2, c3)
// c1 and c2 publishing, c3 just receiving
writers := publishTracksForClients(t, c1, c2)
defer stopWriters(writers...)
logger.Infow("waiting to receive tracks from c1 and c2")
success := withTimeout(t, "c3 should receive tracks from both clients", func() bool {
tracks := c3.SubscribedTracks()
if len(tracks[c1.ID()]) != 2 {
return false
}
if len(tracks[c2.ID()]) != 2 {
return false
}
return true
})
if !success {
t.FailNow()
}
// after a delay, c2 reconnects, then publishing
time.Sleep(syncDelay)
c2.Stop()
logger.Infow("waiting for c2 tracks to be gone")
success = withTimeout(t, "c2 tracks should be gone", func() bool {
tracks := c3.SubscribedTracks()
if len(tracks[c1.ID()]) != 2 {
return false
}
if len(tracks[c2.ID()]) != 0 {
return false
}
if len(c1.SubscribedTracks()[c2.ID()]) != 0 {
return false
}
return true
})
if !success {
t.FailNow()
}
logger.Infow("c2 reconnecting")
c2 = createRTCClient("puj_2", ports[rand.Intn(len(ports))])
defer c2.Stop()
waitUntilConnected(t, c2)
writers = publishTracksForClients(t, c2)
defer stopWriters(writers...)
logger.Infow("waiting for reconnected c2 tracks to publish")
success = withTimeout(t, "new c2 tracks should be published again", func() bool {
tracks := c3.SubscribedTracks()
if len(tracks[c2.ID()]) != 2 {
return false
}
if len(c1.SubscribedTracks()[c2.ID()]) != 2 {
return false
}
return true
})
if !success {
t.FailNow()
}
}
// websocket reconnects
func scenarioWSReconnect(t *testing.T) {
c1 := createRTCClient("wsr_c1", defaultServerPort)
c2 := createRTCClient("wsr_c2", defaultServerPort)
c1 := createRTCClient("wsr_1", defaultServerPort)
c2 := createRTCClient("wsr_2", defaultServerPort)
waitUntilConnected(t, c1, c2)
// c1 publishes track, but disconnects websockets and reconnects
}
func publishTracksForClients(t *testing.T, clients ...*client.RTCClient) []*client.TrackWriter {
logger.Infow("publishing tracks for clients")
var writers []*client.TrackWriter
for i, _ := range clients {
c := clients[i]
tw, err := c.AddStaticTrack("audio/opus", "audio", "webcam")
if !assert.NoError(t, err) {
t.FailNow()
return nil
}
writers = append(writers, tw)
tw, err = c.AddStaticTrack("video/vp8", "video", "webcam")
if !assert.NoError(t, err) {
t.FailNow()
return nil
}
writers = append(writers, tw)
}
return writers
}