fix: avoid deadlocks when participant closing, adapt to ion-sfu changes

This commit is contained in:
David Zhao
2021-06-09 14:16:48 -07:00
parent db7d5fff52
commit b10d903d3e
2 changed files with 30 additions and 27 deletions
+28 -27
View File
@@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/livekit/protocol/utils"
"github.com/pion/ion-sfu/pkg/buffer"
"github.com/pion/ion-sfu/pkg/sfu"
"github.com/pion/ion-sfu/pkg/twcc"
@@ -12,8 +13,6 @@ import (
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/rtcerr"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -178,34 +177,36 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
t.params.Stats.SubSubscribedTrack(t.kind.String())
// ignore if the subscribing sub is not connected
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
return
}
// if the source has been terminated, we'll need to terminate all of the subscribedtracks
// however, if the dest sub has disconnected, then we can skip
sender := transceiver.Sender()
if sender == nil {
return
}
logger.Debugw("removing peerconnection track",
"track", t.params.TrackID,
"participantId", t.params.ParticipantID,
"destParticipant", sub.Identity())
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
// sub closing, can skip removing subscribedtracks
go func() {
// ignore if the subscribing sub is not connected
if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed {
return
}
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
logger.Warnw("could not remove remoteTrack from forwarder", err,
"sub", sub.Identity())
}
}
sub.RemoveSubscribedTrack(t.params.ParticipantID, subTrack)
sub.Negotiate()
// if the source has been terminated, we'll need to terminate all of the subscribedtracks
// however, if the dest sub has disconnected, then we can skip
sender := transceiver.Sender()
if sender == nil {
return
}
logger.Debugw("removing peerconnection track",
"track", t.params.TrackID,
"participantId", t.params.ParticipantID,
"destParticipant", sub.Identity())
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
// sub closing, can skip removing subscribedtracks
return
}
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
logger.Warnw("could not remove remoteTrack from forwarder", err,
"sub", sub.Identity())
}
}
sub.RemoveSubscribedTrack(t.params.ParticipantID, subTrack)
sub.Negotiate()
}()
})
t.subscribedTracks[sub.ID()] = subTrack
+2
View File
@@ -135,6 +135,7 @@ func createSingleNodeServer() *service.LivekitServer {
if err != nil {
panic(fmt.Sprintf("could not create config: %v", err))
}
conf.Development = true
currentNode, err := routing.NewLocalNode(conf)
currentNode.Id = utils.NewGuid(nodeId1)
@@ -165,6 +166,7 @@ func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer {
conf.RTC.UDPPort = port + 1
conf.RTC.TCPPort = port + 2
conf.Redis.Address = "localhost:6379"
conf.Development = true
currentNode, err := routing.NewLocalNode(conf)
if err != nil {