Improve dynamic simulcast, properly cleanup after pendingTracks (#84)

This commit is contained in:
David Zhao
2021-08-13 15:49:25 -07:00
committed by GitHub
parent 5713731363
commit 29edf4e8d9
7 changed files with 56 additions and 37 deletions

2
go.mod
View File

@@ -44,4 +44,4 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.6
replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.7

4
go.sum
View File

@@ -235,8 +235,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.6 h1:vA98RfuW3sSidV1rfK+/szGWgHFgki4Q4pomxsJS0i0=
github.com/livekit/ion-sfu v1.20.6/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA=
github.com/livekit/ion-sfu v1.20.7 h1:aAkdDC/cL7oGAfhhqltTecARdEnyUYhdDlfyX4QESB0=
github.com/livekit/ion-sfu v1.20.7/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA=
github.com/livekit/protocol v0.7.2 h1:4qwCkIFKhDYeyzp79lwb09/nwkjyjql3/o/Viifnyig=
github.com/livekit/protocol v0.7.2/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=

View File

@@ -160,6 +160,35 @@ func Build() error {
return nil
}
// builds binary that runs on linux amd64
func BuildLinux() error {
mg.Deps(Proto, generateWire)
if !checksummer.IsChanged() {
fmt.Println("up to date")
return nil
}
fmt.Println("building...")
if err := os.MkdirAll("bin", 0755); err != nil {
return err
}
cmd := exec.Command("go", "build", "-o", "../../bin/livekit-server-amd64")
cmd.Env = []string{
"GOOS=linux",
"GOARCH=amd64",
"HOME=" + os.Getenv("HOME"),
"GOPATH=" + os.Getenv("GOPATH"),
}
cmd.Dir = "cmd/server"
connectStd(cmd)
if err := cmd.Run(); err != nil {
return err
}
checksummer.WriteChecksum()
return nil
}
// builds docker image for LiveKit server
func Docker() error {
mg.Deps(Proto, generateWire)

View File

@@ -3,6 +3,7 @@ package rtc
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/livekit/protocol/utils"
@@ -12,7 +13,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/rtcerr"
"github.com/thoas/go-funk"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/logger"
@@ -37,6 +37,7 @@ type MediaTrack struct {
kind livekit.TrackType
codec webrtc.RTPCodecParameters
muted utils.AtomicFlag
numUpTracks uint32
simulcasted bool
// channel to send RTCP packets to the source
@@ -98,25 +99,17 @@ func (t *MediaTrack) IsMuted() bool {
func (t *MediaTrack) SetMuted(muted bool) {
t.muted.TrySet(muted)
// mute all of the subscribedtracks
t.lock.RLock()
if t.receiver != nil {
t.receiver.SetUpTrackPaused(muted)
}
// mute all of the subscribedtracks
for _, st := range t.subscribedTracks {
st.SetPublisherMuted(muted)
}
t.lock.RUnlock()
}
func (t *MediaTrack) SetSimulcastLayers(layers []livekit.VideoQuality) {
t.lock.RLock()
defer t.lock.RUnlock()
if t.receiver != nil {
layers16 := funk.Map(layers, func(l livekit.VideoQuality) uint16 {
return uint16(spatialLayerForQuality(l))
}).([]uint16)
t.receiver.SetAvailableLayers(layers16)
}
}
func (t *MediaTrack) OnClose(f func()) {
t.onClose = f
}
@@ -239,6 +232,10 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
return nil
}
func (t *MediaTrack) NumUpTracks() uint32 {
return atomic.LoadUint32(&t.numUpTracks)
}
// AddReceiver adds a new RTP receiver to the track
func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, twcc *twcc.Responder) {
t.lock.Lock()
@@ -286,7 +283,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
if t.receiver == nil {
t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.params.ParticipantID,
sfu.WithPliThrottle(0),
sfu.WithLoadBalanceThreshold(20))
sfu.WithLoadBalanceThreshold(20),
sfu.WithStreamTrackers())
t.receiver.SetRTCPCh(t.params.RTCPChan)
t.receiver.OnCloseHandler(func() {
t.lock.Lock()
@@ -304,6 +302,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
t.receiver.AddUpTrack(track, buff, t.shouldStartWithBestQuality())
// when RID is set, track is simulcasted
t.simulcasted = track.RID() != ""
atomic.AddUint32(&t.numUpTracks, 1)
buff.Bind(receiver.GetParameters(), buffer.Options{
MaxBitRate: t.params.ReceiverConfig.maxBitrate,

View File

@@ -731,15 +731,13 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
return
}
// delete pending track if it's not simulcasting
// TODO: we should delete it after adding three tracks
ti := p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), track.RID() == "")
if ti == nil {
return
}
// use existing mediatrack to handle simulcast
p.lock.Lock()
ti := p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), false)
if ti == nil {
p.lock.Unlock()
return
}
ptrack := p.publishedTracks[ti.Sid]
var mt *MediaTrack
@@ -772,6 +770,11 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
})
}
mt.AddReceiver(rtpReceiver, track, p.twcc)
// cleanup pendingTracks
if !mt.simulcasted || mt.NumUpTracks() == 3 {
_ = p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), true)
}
p.lock.Unlock()
if newTrack {
@@ -800,8 +803,6 @@ func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) {
}
func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackType, deleteAfter bool) *livekit.TrackInfo {
p.lock.Lock()
defer p.lock.Unlock()
ti := p.pendingTracks[clientId]
// then find the first one that matches type. with MediaStreamTrack, it's possible for the client id to

View File

@@ -93,7 +93,6 @@ type PublishedTrack interface {
Name() string
IsMuted() bool
SetMuted(muted bool)
SetSimulcastLayers(layers []livekit.VideoQuality)
AddSubscriber(participant Participant) error
RemoveSubscriber(participantId string)
IsSubscriber(subId string) bool

View File

@@ -476,16 +476,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
case *livekit.SignalRequest_Leave:
_ = participant.Close()
case *livekit.SignalRequest_Simulcast:
for _, track := range participant.GetPublishedTracks() {
if track.ID() == msg.Simulcast.TrackSid {
logger.Debugw("updating simulcast layers",
"participant", participant.Identity(),
"pID", participant.ID(),
"track", track.ID(),
"layers", msg.Simulcast.Layers)
track.SetSimulcastLayers(msg.Simulcast.Layers)
}
}
// deprecated
}
}
}