diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 9e1b86380..7603d793d 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/livekit/protocol/utils" "github.com/pion/ion-sfu/pkg/buffer" "github.com/pion/ion-sfu/pkg/sfu" "github.com/pion/ion-sfu/pkg/twcc" @@ -12,8 +13,6 @@ import ( "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/rtcerr" - "github.com/livekit/protocol/utils" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -178,34 +177,36 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { t.params.Stats.SubSubscribedTrack(t.kind.String()) - // ignore if the subscribing sub is not connected - if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { - return - } - - // if the source has been terminated, we'll need to terminate all of the subscribedtracks - // however, if the dest sub has disconnected, then we can skip - sender := transceiver.Sender() - if sender == nil { - return - } - logger.Debugw("removing peerconnection track", - "track", t.params.TrackID, - "participantId", t.params.ParticipantID, - "destParticipant", sub.Identity()) - if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { - if err == webrtc.ErrConnectionClosed { - // sub closing, can skip removing subscribedtracks + go func() { + // ignore if the subscribing sub is not connected + if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { return } - if _, ok := err.(*rtcerr.InvalidStateError); !ok { - logger.Warnw("could not remove remoteTrack from forwarder", err, - "sub", sub.Identity()) - } - } - sub.RemoveSubscribedTrack(t.params.ParticipantID, subTrack) - sub.Negotiate() + // if the source has been terminated, we'll need to terminate all of the subscribedtracks + // however, if the dest sub has disconnected, then we can skip + sender := transceiver.Sender() + if sender == nil { + return + } + logger.Debugw("removing peerconnection track", + "track", t.params.TrackID, + "participantId", t.params.ParticipantID, + "destParticipant", sub.Identity()) + if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { + if err == webrtc.ErrConnectionClosed { + // sub closing, can skip removing subscribedtracks + return + } + if _, ok := err.(*rtcerr.InvalidStateError); !ok { + logger.Warnw("could not remove remoteTrack from forwarder", err, + "sub", sub.Identity()) + } + } + + sub.RemoveSubscribedTrack(t.params.ParticipantID, subTrack) + sub.Negotiate() + }() }) t.subscribedTracks[sub.ID()] = subTrack diff --git a/test/integration_helpers.go b/test/integration_helpers.go index a8a2e38f7..3c05b95f2 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -135,6 +135,7 @@ func createSingleNodeServer() *service.LivekitServer { if err != nil { panic(fmt.Sprintf("could not create config: %v", err)) } + conf.Development = true currentNode, err := routing.NewLocalNode(conf) currentNode.Id = utils.NewGuid(nodeId1) @@ -165,6 +166,7 @@ func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer { conf.RTC.UDPPort = port + 1 conf.RTC.TCPPort = port + 2 conf.Redis.Address = "localhost:6379" + conf.Development = true currentNode, err := routing.NewLocalNode(conf) if err != nil {