From 8a5f26b0648609054e72c03a5af053d7637c4ebf Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 19 Jan 2022 10:24:29 +0800 Subject: [PATCH] refine reuse transceiver (#348) --- pkg/rtc/mediatracksubscriptions.go | 78 +++++++++++++++--------------- pkg/rtc/participant.go | 8 +-- 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 2042ca2f0..535637429 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -188,51 +188,49 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code }) downTrack.OnCloseHandler(func() { - go func() { - t.subscribedTracksMu.Lock() - delete(t.subscribedTracks, subscriberID) - t.subscribedTracksMu.Unlock() + t.subscribedTracksMu.Lock() + delete(t.subscribedTracks, subscriberID) + t.subscribedTracksMu.Unlock() - t.maybeNotifyNoSubscribers() + t.maybeNotifyNoSubscribers() - t.params.Telemetry.TrackUnsubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto()) + t.params.Telemetry.TrackUnsubscribed(context.Background(), subscriberID, t.params.MediaTrack.ToProto()) - // ignore if the subscribing sub is not connected - if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { + // ignore if the subscribing sub is not connected + if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { + return + } + + // if the source has been terminated, we'll need to terminate all of the subscribedtracks + // however, if the dest sub has disconnected, then we can skip + if sender == nil { + return + } + t.params.Logger.Debugw("removing peerconnection track", + "track", t.params.MediaTrack.ID(), + "subscriber", sub.Identity(), + "subscriberID", subscriberID, + "kind", t.params.MediaTrack.Kind(), + ) + if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { + if err == webrtc.ErrConnectionClosed { + // sub closing, can skip removing subscribedtracks return } - - // if the source has been terminated, we'll need to terminate all of the subscribedtracks - // however, if the dest sub has disconnected, then we can skip - if sender == nil { - return - } - t.params.Logger.Debugw("removing peerconnection track", - "track", t.params.MediaTrack.ID(), - "subscriber", sub.Identity(), - "subscriberID", subscriberID, - "kind", t.params.MediaTrack.Kind(), - ) - if err := sub.SubscriberPC().RemoveTrack(sender); err != nil { - if err == webrtc.ErrConnectionClosed { - // sub closing, can skip removing subscribedtracks - return - } - if _, ok := err.(*rtcerr.InvalidStateError); !ok { - // most of these are safe to ignore, since the track state might have already - // been set to Inactive - t.params.Logger.Debugw("could not remove remoteTrack from forwarder", - "error", err, - "subscriber", sub.Identity(), - "subscriberID", subscriberID, - ) - } + if _, ok := err.(*rtcerr.InvalidStateError); !ok { + // most of these are safe to ignore, since the track state might have already + // been set to Inactive + t.params.Logger.Debugw("could not remove remoteTrack from forwarder", + "error", err, + "subscriber", sub.Identity(), + "subscriberID", subscriberID, + ) } + } - t.NotifySubscriberMaxQuality(subscriberID, livekit.VideoQuality_OFF) - sub.RemoveSubscribedTrack(subTrack) - sub.Negotiate() - }() + t.NotifySubscriberMaxQuality(subscriberID, livekit.VideoQuality_OFF) + sub.RemoveSubscribedTrack(subTrack) + sub.Negotiate() }) t.subscribedTracks[subscriberID] = subTrack @@ -259,7 +257,7 @@ func (t *MediaTrackSubscriptions) RemoveSubscriber(participantID livekit.Partici t.subscribedTracksMu.Unlock() if subTrack != nil { - go subTrack.DownTrack().Close() + subTrack.DownTrack().Close() } t.maybeNotifyNoSubscribers() @@ -274,7 +272,7 @@ func (t *MediaTrackSubscriptions) RemoveAllSubscribers() { t.subscribedTracksMu.Unlock() for _, subTrack := range subscribedTracks { - go subTrack.DownTrack().Close() + subTrack.DownTrack().Close() } t.maybeNotifyNoSubscribers() diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 605b0e199..58aa0c65d 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1489,15 +1489,17 @@ func (p *ParticipantImpl) rtcpSendWorker() { fwdPkts := make([]rtcp.Packet, 0, len(pkts)) for _, pkt := range pkts { - switch pkt.(type) { + switch packet := pkt.(type) { case *rtcp.PictureLossIndication: - mediaSSRC := pkt.(*rtcp.PictureLossIndication).MediaSSRC + mediaSSRC := packet.MediaSSRC if p.pliThrottle.canSend(mediaSSRC) { + p.params.Logger.Debugw("send pli", "ssrc", mediaSSRC) fwdPkts = append(fwdPkts, pkt) } case *rtcp.FullIntraRequest: - mediaSSRC := pkt.(*rtcp.FullIntraRequest).MediaSSRC + mediaSSRC := packet.MediaSSRC if p.pliThrottle.canSend(mediaSSRC) { + p.params.Logger.Debugw("send fir", "ssrc", mediaSSRC) fwdPkts = append(fwdPkts, pkt) } default: