diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index cca0445b6..239e0de72 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -1,6 +1,7 @@ package rtc import ( + "errors" "sync" "time" @@ -118,6 +119,11 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { return nil } + if t.receiver == nil { + // cannot add, no receiver + return errors.New("cannot subscribe without a receiver in place") + } + codec := t.receiver.Codec() // using DownTrack from ion-sfu @@ -192,15 +198,16 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { // adds a new RTP receiver to the track, returns true if this is a new track func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote) { //rid := track.RID() - buff, rtcpReader := bufferFactory.GetBufferPair(uint32(track.SSRC())) buff.OnFeedback(func(fb []rtcp.Packet) { + // feedback for the source RTCP t.rtcpCh <- fb }) if t.Kind() == livekit.TrackType_AUDIO { // TODO: audio level stuff } else if t.Kind() == livekit.TrackType_VIDEO { + // TODO: handle twcc //if t.twcc == nil { // t.twcc = twcc.NewTransportWideCCResponder(uint32(track.SSRC())) // t.twcc.OnFeedback(func(p rtcp.RawPacket) { @@ -238,12 +245,17 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.receiver.SetRTCPCh(t.rtcpCh) t.receiver.OnCloseHandler(func() { t.lock.Lock() - defer t.lock.Unlock() // source track closed if t.Kind() == livekit.TrackType_AUDIO { // TODO: remove audio level observer } t.receiver = nil + onclose := t.onClose + t.lock.Unlock() + t.RemoveAllSubscribers() + if onclose != nil { + onclose() + } }) } t.receiver.AddUpTrack(track, buff) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 6fc1a1057..5300e92ec 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -581,15 +581,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w p.handleTrackPublished(mt) } - // TODO: video tracks the current participant is subscribed to tends to freeze when the participant adds tracks - // to get around this, we'll trigger a resync on all tracks it's subscribed to - p.lock.RLock() - defer p.lock.RUnlock() - for _, tracks := range p.subscribedTracks { - for _, subTrack := range tracks { - subTrack.Resync() - } - } + p.resyncSubscriptions() } func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) { @@ -653,6 +645,8 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) { if p.IsReady() && p.onTrackUpdated != nil { p.onTrackUpdated(p, track) } + track.OnClose(nil) + go p.resyncSubscriptions() }) if p.onTrackPublished != nil { @@ -660,6 +654,18 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) { } } +func (p *ParticipantImpl) resyncSubscriptions() { + // TODO: video tracks the current participant is subscribed to tends to freeze when the participant adds/removes tracks + // to get around this, we'll trigger a resync on all tracks it's subscribed to + p.lock.RLock() + defer p.lock.RUnlock() + for _, tracks := range p.subscribedTracks { + for _, subTrack := range tracks { + subTrack.Resync() + } + } +} + // downTracksRTCPWorker sends SenderReports periodically when the participant is subscribed to // other publishedTracks in the room. func (p *ParticipantImpl) downTracksRTCPWorker() { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index f036a2e4e..dcac475eb 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -146,18 +146,19 @@ func (r *Room) Join(participant types.Participant) error { func (r *Room) RemoveParticipant(identity string) { r.lock.Lock() defer r.lock.Unlock() - - if p, ok := r.participants[identity]; ok { - // avoid blocking lock - go func() { - Recover() - // also stop connection if needed - p.Close() - }() + p, ok := r.participants[identity] + if !ok { + return } - delete(r.participants, identity) + p.OnTrackUpdated(nil) + p.OnTrackPublished(nil) + p.OnStateChange(nil) + + // close participant as well + go p.Close() + if len(r.participants) == 0 { r.leftAt.Store(time.Now().Unix()) }