From 29edf4e8d91d95ed8ce30447ac09f0f99f64d28a Mon Sep 17 00:00:00 2001 From: David Zhao Date: Fri, 13 Aug 2021 15:49:25 -0700 Subject: [PATCH] Improve dynamic simulcast, properly cleanup after pendingTracks (#84) --- go.mod | 2 +- go.sum | 4 ++-- magefile.go | 29 +++++++++++++++++++++++++++++ pkg/rtc/mediatrack.go | 27 +++++++++++++-------------- pkg/rtc/participant.go | 19 ++++++++++--------- pkg/rtc/types/interfaces.go | 1 - pkg/service/roommanager.go | 11 +---------- 7 files changed, 56 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 0a87b48ff..a51c9446f 100644 --- a/go.mod +++ b/go.mod @@ -44,4 +44,4 @@ require ( gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) -replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.6 +replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.7 diff --git a/go.sum b/go.sum index e371d0681..8009c296d 100644 --- a/go.sum +++ b/go.sum @@ -235,8 +235,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/ion-sfu v1.20.6 h1:vA98RfuW3sSidV1rfK+/szGWgHFgki4Q4pomxsJS0i0= -github.com/livekit/ion-sfu v1.20.6/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA= +github.com/livekit/ion-sfu v1.20.7 h1:aAkdDC/cL7oGAfhhqltTecARdEnyUYhdDlfyX4QESB0= +github.com/livekit/ion-sfu v1.20.7/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA= github.com/livekit/protocol v0.7.2 h1:4qwCkIFKhDYeyzp79lwb09/nwkjyjql3/o/Viifnyig= github.com/livekit/protocol v0.7.2/go.mod h1:Vk04t1uIJa+U2L5SeANEmDl6ebjc9tKVi4kk3CpqW74= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= diff --git a/magefile.go b/magefile.go index 7a20ea406..953b4a28a 100644 --- a/magefile.go +++ b/magefile.go @@ -160,6 +160,35 @@ func Build() error { return nil } +// builds binary that runs on linux amd64 +func BuildLinux() error { + mg.Deps(Proto, generateWire) + if !checksummer.IsChanged() { + fmt.Println("up to date") + return nil + } + + fmt.Println("building...") + if err := os.MkdirAll("bin", 0755); err != nil { + return err + } + cmd := exec.Command("go", "build", "-o", "../../bin/livekit-server-amd64") + cmd.Env = []string{ + "GOOS=linux", + "GOARCH=amd64", + "HOME=" + os.Getenv("HOME"), + "GOPATH=" + os.Getenv("GOPATH"), + } + cmd.Dir = "cmd/server" + connectStd(cmd) + if err := cmd.Run(); err != nil { + return err + } + + checksummer.WriteChecksum() + return nil +} + // builds docker image for LiveKit server func Docker() error { mg.Deps(Proto, generateWire) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index ea864d94c..2eeff2aa4 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -3,6 +3,7 @@ package rtc import ( "errors" "sync" + "sync/atomic" "time" "github.com/livekit/protocol/utils" @@ -12,7 +13,6 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/rtcerr" - "github.com/thoas/go-funk" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/logger" @@ -37,6 +37,7 @@ type MediaTrack struct { kind livekit.TrackType codec webrtc.RTPCodecParameters muted utils.AtomicFlag + numUpTracks uint32 simulcasted bool // channel to send RTCP packets to the source @@ -98,25 +99,17 @@ func (t *MediaTrack) IsMuted() bool { func (t *MediaTrack) SetMuted(muted bool) { t.muted.TrySet(muted) - // mute all of the subscribedtracks t.lock.RLock() + if t.receiver != nil { + t.receiver.SetUpTrackPaused(muted) + } + // mute all of the subscribedtracks for _, st := range t.subscribedTracks { st.SetPublisherMuted(muted) } t.lock.RUnlock() } -func (t *MediaTrack) SetSimulcastLayers(layers []livekit.VideoQuality) { - t.lock.RLock() - defer t.lock.RUnlock() - if t.receiver != nil { - layers16 := funk.Map(layers, func(l livekit.VideoQuality) uint16 { - return uint16(spatialLayerForQuality(l)) - }).([]uint16) - t.receiver.SetAvailableLayers(layers16) - } -} - func (t *MediaTrack) OnClose(f func()) { t.onClose = f } @@ -239,6 +232,10 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { return nil } +func (t *MediaTrack) NumUpTracks() uint32 { + return atomic.LoadUint32(&t.numUpTracks) +} + // AddReceiver adds a new RTP receiver to the track func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, twcc *twcc.Responder) { t.lock.Lock() @@ -286,7 +283,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra if t.receiver == nil { t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.params.ParticipantID, sfu.WithPliThrottle(0), - sfu.WithLoadBalanceThreshold(20)) + sfu.WithLoadBalanceThreshold(20), + sfu.WithStreamTrackers()) t.receiver.SetRTCPCh(t.params.RTCPChan) t.receiver.OnCloseHandler(func() { t.lock.Lock() @@ -304,6 +302,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.receiver.AddUpTrack(track, buff, t.shouldStartWithBestQuality()) // when RID is set, track is simulcasted t.simulcasted = track.RID() != "" + atomic.AddUint32(&t.numUpTracks, 1) buff.Bind(receiver.GetParameters(), buffer.Options{ MaxBitRate: t.params.ReceiverConfig.maxBitrate, diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 727e8d29d..f7910e1d3 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -731,15 +731,13 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w return } - // delete pending track if it's not simulcasting - // TODO: we should delete it after adding three tracks - ti := p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), track.RID() == "") - if ti == nil { - return - } - // use existing mediatrack to handle simulcast p.lock.Lock() + ti := p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), false) + if ti == nil { + p.lock.Unlock() + return + } ptrack := p.publishedTracks[ti.Sid] var mt *MediaTrack @@ -772,6 +770,11 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w }) } mt.AddReceiver(rtpReceiver, track, p.twcc) + + // cleanup pendingTracks + if !mt.simulcasted || mt.NumUpTracks() == 3 { + _ = p.getPendingTrack(track.ID(), ToProtoTrackKind(track.Kind()), true) + } p.lock.Unlock() if newTrack { @@ -800,8 +803,6 @@ func (p *ParticipantImpl) onDataChannel(dc *webrtc.DataChannel) { } func (p *ParticipantImpl) getPendingTrack(clientId string, kind livekit.TrackType, deleteAfter bool) *livekit.TrackInfo { - p.lock.Lock() - defer p.lock.Unlock() ti := p.pendingTracks[clientId] // then find the first one that matches type. with MediaStreamTrack, it's possible for the client id to diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index ba08fb63c..a6b0e5151 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -93,7 +93,6 @@ type PublishedTrack interface { Name() string IsMuted() bool SetMuted(muted bool) - SetSimulcastLayers(layers []livekit.VideoQuality) AddSubscriber(participant Participant) error RemoveSubscriber(participantId string) IsSubscriber(subId string) bool diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 25fbc9ec2..379c1a017 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -476,16 +476,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici case *livekit.SignalRequest_Leave: _ = participant.Close() case *livekit.SignalRequest_Simulcast: - for _, track := range participant.GetPublishedTracks() { - if track.ID() == msg.Simulcast.TrackSid { - logger.Debugw("updating simulcast layers", - "participant", participant.Identity(), - "pID", participant.ID(), - "track", track.ID(), - "layers", msg.Simulcast.Layers) - track.SetSimulcastLayers(msg.Simulcast.Layers) - } - } + // deprecated } } }