mirror of
https://github.com/livekit/livekit.git
synced 2026-05-18 19:55:44 +00:00
cleaner resync
This commit is contained in:
+14
-2
@@ -1,6 +1,7 @@
|
||||
package rtc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -118,6 +119,11 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if t.receiver == nil {
|
||||
// cannot add, no receiver
|
||||
return errors.New("cannot subscribe without a receiver in place")
|
||||
}
|
||||
|
||||
codec := t.receiver.Codec()
|
||||
|
||||
// using DownTrack from ion-sfu
|
||||
@@ -192,15 +198,16 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
|
||||
// adds a new RTP receiver to the track, returns true if this is a new track
|
||||
func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote) {
|
||||
//rid := track.RID()
|
||||
|
||||
buff, rtcpReader := bufferFactory.GetBufferPair(uint32(track.SSRC()))
|
||||
buff.OnFeedback(func(fb []rtcp.Packet) {
|
||||
// feedback for the source RTCP
|
||||
t.rtcpCh <- fb
|
||||
})
|
||||
|
||||
if t.Kind() == livekit.TrackType_AUDIO {
|
||||
// TODO: audio level stuff
|
||||
} else if t.Kind() == livekit.TrackType_VIDEO {
|
||||
// TODO: handle twcc
|
||||
//if t.twcc == nil {
|
||||
// t.twcc = twcc.NewTransportWideCCResponder(uint32(track.SSRC()))
|
||||
// t.twcc.OnFeedback(func(p rtcp.RawPacket) {
|
||||
@@ -238,12 +245,17 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
|
||||
t.receiver.SetRTCPCh(t.rtcpCh)
|
||||
t.receiver.OnCloseHandler(func() {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
// source track closed
|
||||
if t.Kind() == livekit.TrackType_AUDIO {
|
||||
// TODO: remove audio level observer
|
||||
}
|
||||
t.receiver = nil
|
||||
onclose := t.onClose
|
||||
t.lock.Unlock()
|
||||
t.RemoveAllSubscribers()
|
||||
if onclose != nil {
|
||||
onclose()
|
||||
}
|
||||
})
|
||||
}
|
||||
t.receiver.AddUpTrack(track, buff)
|
||||
|
||||
+15
-9
@@ -581,15 +581,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
|
||||
p.handleTrackPublished(mt)
|
||||
}
|
||||
|
||||
// TODO: video tracks the current participant is subscribed to tends to freeze when the participant adds tracks
|
||||
// to get around this, we'll trigger a resync on all tracks it's subscribed to
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
for _, tracks := range p.subscribedTracks {
|
||||
for _, subTrack := range tracks {
|
||||
subTrack.Resync()
|
||||
}
|
||||
}
|
||||
p.resyncSubscriptions()
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) {
|
||||
@@ -653,6 +645,8 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) {
|
||||
if p.IsReady() && p.onTrackUpdated != nil {
|
||||
p.onTrackUpdated(p, track)
|
||||
}
|
||||
track.OnClose(nil)
|
||||
go p.resyncSubscriptions()
|
||||
})
|
||||
|
||||
if p.onTrackPublished != nil {
|
||||
@@ -660,6 +654,18 @@ func (p *ParticipantImpl) handleTrackPublished(track types.PublishedTrack) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) resyncSubscriptions() {
|
||||
// TODO: video tracks the current participant is subscribed to tends to freeze when the participant adds/removes tracks
|
||||
// to get around this, we'll trigger a resync on all tracks it's subscribed to
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
for _, tracks := range p.subscribedTracks {
|
||||
for _, subTrack := range tracks {
|
||||
subTrack.Resync()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// downTracksRTCPWorker sends SenderReports periodically when the participant is subscribed to
|
||||
// other publishedTracks in the room.
|
||||
func (p *ParticipantImpl) downTracksRTCPWorker() {
|
||||
|
||||
+10
-9
@@ -146,18 +146,19 @@ func (r *Room) Join(participant types.Participant) error {
|
||||
func (r *Room) RemoveParticipant(identity string) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if p, ok := r.participants[identity]; ok {
|
||||
// avoid blocking lock
|
||||
go func() {
|
||||
Recover()
|
||||
// also stop connection if needed
|
||||
p.Close()
|
||||
}()
|
||||
p, ok := r.participants[identity]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(r.participants, identity)
|
||||
|
||||
p.OnTrackUpdated(nil)
|
||||
p.OnTrackPublished(nil)
|
||||
p.OnStateChange(nil)
|
||||
|
||||
// close participant as well
|
||||
go p.Close()
|
||||
|
||||
if len(r.participants) == 0 {
|
||||
r.leftAt.Store(time.Now().Unix())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user