diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 20945edf4..f035361e8 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -41,6 +41,8 @@ type WrappedReceiverParams struct { } type WrappedReceiver struct { + lock sync.Mutex + sfu.TrackReceiver params WrappedReceiverParams receivers []sfu.TrackReceiver @@ -91,33 +93,44 @@ func (r *WrappedReceiver) StreamID() string { // DetermineReceiver determines the receiver of negotiated codec and return ready state of the receiver func (r *WrappedReceiver) DetermineReceiver(codec webrtc.RTPCodecCapability) bool { + r.lock.Lock() r.determinedCodec = codec + + var trackReceiver sfu.TrackReceiver for _, receiver := range r.receivers { if c := receiver.Codec(); strings.EqualFold(c.MimeType, codec.MimeType) { - r.TrackReceiver = receiver + trackReceiver = receiver break } else if strings.EqualFold(c.MimeType, sfu.MimeTypeAudioRed) && strings.EqualFold(codec.MimeType, webrtc.MimeTypeOpus) { // audio opus/red can match opus only - r.TrackReceiver = receiver.GetPrimaryReceiverForRed() + trackReceiver = receiver.GetPrimaryReceiverForRed() break } else if strings.EqualFold(c.MimeType, webrtc.MimeTypeOpus) && strings.EqualFold(codec.MimeType, sfu.MimeTypeAudioRed) { - r.TrackReceiver = receiver.GetRedReceiver() + trackReceiver = receiver.GetRedReceiver() break } } - if r.TrackReceiver == nil { + if trackReceiver == nil { r.params.Logger.Errorw("can't determine receiver for codec", nil, "codec", codec.MimeType) if len(r.receivers) > 0 { - r.TrackReceiver = r.receivers[0] + trackReceiver = r.receivers[0] } } - if r.TrackReceiver != nil { - for _, f := range r.onReadyCallbacks { - r.TrackReceiver.AddOnReady(f) - } - r.onReadyCallbacks = nil + r.TrackReceiver = trackReceiver - if d, ok := r.TrackReceiver.(*DummyReceiver); ok { + var onReadyCallbacks []func() + if trackReceiver != nil { + onReadyCallbacks = r.onReadyCallbacks + r.onReadyCallbacks = nil + } + r.lock.Unlock() + + if trackReceiver != nil { + for _, f := range onReadyCallbacks { + trackReceiver.AddOnReady(f) + } + + if d, ok := trackReceiver.(*DummyReceiver); ok { return d.IsReady() } return true @@ -130,17 +143,26 @@ func (r *WrappedReceiver) Codecs() []webrtc.RTPCodecParameters { } func (r *WrappedReceiver) DeleteDownTrack(participantID livekit.ParticipantID) { - if r.TrackReceiver != nil { - r.TrackReceiver.DeleteDownTrack(participantID) + r.lock.Lock() + trackReceiver := r.TrackReceiver + r.lock.Unlock() + + if trackReceiver != nil { + trackReceiver.DeleteDownTrack(participantID) } } func (r *WrappedReceiver) AddOnReady(f func()) { - if r.TrackReceiver != nil { - r.TrackReceiver.AddOnReady(f) - } else { + r.lock.Lock() + trackReceiver := r.TrackReceiver + if trackReceiver == nil { r.onReadyCallbacks = append(r.onReadyCallbacks, f) + r.lock.Unlock() + return } + r.lock.Unlock() + + trackReceiver.AddOnReady(f) } // --------------------------------------------