mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 11:55:14 +00:00
WIP
This commit is contained in:
@@ -71,6 +71,11 @@ func NewSelectionForwarder(params SelectionForwarderParams) *SelectionForwarder
|
||||
}
|
||||
|
||||
func (f *SelectionForwarder) Start() {
|
||||
f.lock.Lock()
|
||||
for _, dt := range f.downtracks {
|
||||
dt.SetConnected()
|
||||
}
|
||||
f.lock.Unlock()
|
||||
go f.process()
|
||||
}
|
||||
|
||||
@@ -189,7 +194,6 @@ func (f *SelectionForwarder) updateForward() {
|
||||
}
|
||||
|
||||
func (f *SelectionForwarder) activeSource(source *sourceInfo) bool {
|
||||
f.params.Logger.Debugw("activating source", "trackID", source.receiver.TrackID())
|
||||
if len(f.idleDowntracks) == 0 {
|
||||
if len(f.downtracks) < f.params.ActiveDowntracks {
|
||||
dt := f.params.RequestDownTrack(source.receiver)
|
||||
@@ -205,6 +209,7 @@ func (f *SelectionForwarder) activeSource(source *sourceInfo) bool {
|
||||
f.idleDowntracks = f.idleDowntracks[1:]
|
||||
source.downtrack.ResetReceiver(source.receiver)
|
||||
source.receiver.AddDownTrack(source.downtrack)
|
||||
f.params.Logger.Debugw("activating source", "trackID", source.receiver.TrackID(), "downtrack", source.downtrack.ID())
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -149,6 +149,7 @@ type DownTrack struct {
|
||||
rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter
|
||||
absSendTimeID int
|
||||
dependencyDescriptorID int
|
||||
receiverLock sync.RWMutex
|
||||
receiver TrackReceiver
|
||||
transceiver *webrtc.RTPTransceiver
|
||||
writeStream webrtc.TrackLocalWriter
|
||||
@@ -240,7 +241,9 @@ func NewDownTrack(
|
||||
kind: kind,
|
||||
codec: codecs[0].RTPCodecCapability,
|
||||
}
|
||||
d.receiverLock.Lock()
|
||||
d.receiver = r
|
||||
d.receiverLock.Unlock()
|
||||
d.forwarder = NewForwarder(d.kind, d.logger)
|
||||
|
||||
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
|
||||
@@ -272,15 +275,15 @@ func NewDownTrack(
|
||||
}
|
||||
|
||||
func (d *DownTrack) ResetReceiver(r TrackReceiver) {
|
||||
d.bindLock.Lock()
|
||||
d.receiverLock.Lock()
|
||||
d.receiver = r
|
||||
d.bindLock.Unlock()
|
||||
d.receiverLock.Unlock()
|
||||
// TODO: log stats
|
||||
}
|
||||
|
||||
func (d *DownTrack) getReceiver() TrackReceiver {
|
||||
d.bindLock.Lock()
|
||||
defer d.bindLock.Unlock()
|
||||
d.receiverLock.RLock()
|
||||
defer d.receiverLock.RUnlock()
|
||||
return d.receiver
|
||||
}
|
||||
|
||||
@@ -727,7 +730,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
|
||||
|
||||
d.bound.Store(false)
|
||||
d.logger.Debugw("closing sender", "kind", d.kind)
|
||||
d.receiver.DeleteDownTrack(d.subscriberID)
|
||||
d.getReceiver().DeleteDownTrack(d.subscriberID)
|
||||
|
||||
if d.rtcpReader != nil {
|
||||
logger.Infow("downtrack close rtcp reader")
|
||||
|
||||
@@ -366,6 +366,7 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error {
|
||||
track.TrackInfoAvailable()
|
||||
|
||||
w.downTrackSpreader.Store(track)
|
||||
w.logger.Debugw("added downtrack", "downtrack", track.ID())
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user