diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 4514972e4..66a7ec613 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -199,7 +199,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { // // AddTrack will create a new transceiver or re-use an unused one // if the attributes match. This prevents SDP from bloating - // because of dormant transceivers buidling up. + // because of dormant transceivers building up. // sender, err = sub.SubscriberPC().AddTrack(downTrack) if err != nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 19142314a..5d68beef8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -472,12 +472,7 @@ func (p *ParticipantImpl) ICERestart() error { // AddSubscriber subscribes op to all publishedTracks func (p *ParticipantImpl) AddSubscriber(op types.Participant) (int, error) { - p.lock.RLock() - tracks := make([]types.PublishedTrack, 0, len(p.publishedTracks)) - for _, t := range p.publishedTracks { - tracks = append(tracks, t) - } - p.lock.RUnlock() + tracks := p.GetPublishedTracks() if len(tracks) == 0 { return 0, nil diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 6384229dc..21ccc3e39 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,7 +1,8 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//+build !wireinject +//go:build !wireinject +// +build !wireinject package service diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d9a33b314..d0d80e685 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -461,8 +461,9 @@ func (w *WebRTCReceiver) DeleteDownTrack(peerID string) { func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet) { if _, ok := p[0].(*rtcp.PictureLossIndication); ok { w.rtcpMu.Lock() - defer w.rtcpMu.Unlock() - if time.Now().UnixNano()-w.lastPli.get() < w.pliThrottle { + throttled := time.Now().UnixNano()-w.lastPli.get() < w.pliThrottle + w.rtcpMu.Unlock() + if throttled { return } w.lastPli.set(time.Now().UnixNano()) @@ -568,9 +569,12 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } w.downTrackMu.RLock() - if w.lbThreshold == 0 || len(w.downTracks)-len(w.free) < w.lbThreshold { + downTracks := w.downTracks + free := w.free + w.downTrackMu.RUnlock() + if w.lbThreshold == 0 || len(downTracks)-len(free) < w.lbThreshold { // serial - not enough down tracks for parallelization to outweigh overhead - for _, dt := range w.downTracks { + for _, dt := range downTracks { if dt != nil { w.writeRTP(layer, dt, pkt, pli) } @@ -578,7 +582,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } else { // parallel - enables much more efficient multi-core utilization start := uint64(0) - end := uint64(len(w.downTracks)) + end := uint64(len(downTracks)) // 100µs is enough to amortize the overhead and provide sufficient load balancing. // WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop. @@ -596,7 +600,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } for i := n - step; i < n && i < end; i++ { - if dt := w.downTracks[i]; dt != nil { + if dt := downTracks[i]; dt != nil { w.writeRTP(layer, dt, pkt, pli) } } @@ -605,7 +609,6 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } wg.Wait() } - w.downTrackMu.RUnlock() } }