collect inbound and outbound packet stats

This commit is contained in:
David Zhao
2021-05-11 00:31:18 -07:00
parent 23238898ed
commit 9616149a88
9 changed files with 345 additions and 104 deletions
+2
View File
@@ -16,12 +16,14 @@ require (
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pion/ice/v2 v2.1.7
github.com/pion/interceptor v0.0.12
github.com/pion/ion-sfu v1.10.0
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.6.5
github.com/pion/sdp/v3 v3.0.4
github.com/pion/stun v0.3.5
github.com/pion/transport v0.12.3
github.com/pion/turn/v2 v2.0.5
github.com/pion/webrtc/v3 v3.0.29
github.com/pkg/errors v0.9.1
+41 -39
View File
@@ -29,44 +29,43 @@ var (
// MediaTrack represents a WebRTC track that needs to be forwarded
// Implements the PublishedTrack interface
type MediaTrack struct {
id string
participantId string
muted utils.AtomicFlag
ssrc webrtc.SSRC
name string
streamID string
kind livekit.TrackType
codec webrtc.RTPCodecParameters
bufferFactory *buffer.Factory
receiverConf ReceiverConfig
audioConf config.AudioConfig
onClose func()
params MediaTrackParams
ssrc webrtc.SSRC
name string
streamID string
kind livekit.TrackType
codec webrtc.RTPCodecParameters
muted utils.AtomicFlag
// channel to send RTCP packets to the source
rtcpCh chan []rtcp.Packet
lock sync.RWMutex
lock sync.RWMutex
// map of target participantId -> *SubscribedTrack
subscribedTracks map[string]*SubscribedTrack
twcc *twcc.Responder
audioLevel *AudioLevel
receiver sfu.Receiver
//lastNack int64
lastPLI time.Time
lastPLI time.Time
onClose func()
}
func NewMediaTrack(trackId string, pId string, rtcpCh chan []rtcp.Packet, track *webrtc.TrackRemote, bufferFactory *buffer.Factory, rc ReceiverConfig, ac config.AudioConfig) *MediaTrack {
type MediaTrackParams struct {
TrackID string
ParticipantID string
RTCPChan chan []rtcp.Packet
BufferFactory *buffer.Factory
ReceiverConfig ReceiverConfig
AudioConfig config.AudioConfig
Stats *StatsReporter
}
func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTrack {
t := &MediaTrack{
id: trackId,
participantId: pId,
params: params,
ssrc: track.SSRC(),
streamID: track.StreamID(),
kind: ToProtoTrackKind(track.Kind()),
codec: track.Codec(),
bufferFactory: bufferFactory,
receiverConf: rc,
audioConf: ac,
rtcpCh: rtcpCh,
lock: sync.RWMutex{},
subscribedTracks: make(map[string]*SubscribedTrack),
}
@@ -78,7 +77,7 @@ func (t *MediaTrack) Start() {
}
func (t *MediaTrack) ID() string {
return t.id
return t.params.TrackID
}
func (t *MediaTrack) Kind() livekit.TrackType {
@@ -130,11 +129,11 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
}
// using DownTrack from ion-sfu
streamId := t.participantId
streamId := t.params.ParticipantID
if sub.ProtocolVersion().SupportsPackedStreamId() {
// when possible, pack both IDs in streamID to allow new streams to be generated
// react-native-webrtc still uses stream based APIs and require this
streamId = PackStreamID(t.participantId, t.ID())
streamId = PackStreamID(t.params.ParticipantID, t.ID())
}
receiver := NewWrappedReceiver(t.receiver, t.ID(), streamId)
downTrack, err := sfu.NewDownTrack(webrtc.RTPCodecCapability{
@@ -143,7 +142,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
Channels: codec.Channels,
SDPFmtpLine: codec.SDPFmtpLine,
RTCPFeedback: feedbackTypes,
}, receiver, t.bufferFactory, sub.ID(), t.receiverConf.packetBufferSize)
}, receiver, t.params.BufferFactory, sub.ID(), t.params.ReceiverConfig.packetBufferSize)
if err != nil {
return err
}
@@ -179,8 +178,8 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
return
}
logger.Debugw("removing peerconnection track",
"track", t.id,
"participantId", t.participantId,
"track", t.params.TrackID,
"participantId", t.params.ParticipantID,
"destParticipant", sub.Identity())
if err := sub.SubscriberPC().RemoveTrack(sender); err != nil {
if err == webrtc.ErrConnectionClosed {
@@ -194,14 +193,14 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
}
}
sub.RemoveSubscribedTrack(t.participantId, subTrack)
sub.RemoveSubscribedTrack(t.params.ParticipantID, subTrack)
sub.Negotiate()
})
t.subscribedTracks[sub.ID()] = subTrack
t.receiver.AddDownTrack(downTrack, true)
sub.AddSubscribedTrack(t.participantId, subTrack)
sub.AddSubscribedTrack(t.params.ParticipantID, subTrack)
sub.Negotiate()
return nil
@@ -213,14 +212,17 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
defer t.lock.Unlock()
//rid := track.RID()
buff, rtcpReader := t.bufferFactory.GetBufferPair(uint32(track.SSRC()))
buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC()))
buff.OnFeedback(func(fb []rtcp.Packet) {
if t.params.Stats != nil {
t.params.Stats.incoming.HandleRTCP(fb)
}
// feedback for the source RTCP
t.rtcpCh <- fb
t.params.RTCPChan <- fb
})
if t.Kind() == livekit.TrackType_AUDIO {
t.audioLevel = NewAudioLevel(t.audioConf.ActiveLevel, t.audioConf.MinPercentile)
t.audioLevel = NewAudioLevel(t.params.AudioConfig.ActiveLevel, t.params.AudioConfig.MinPercentile)
buff.OnAudioLevel(func(level uint8) {
t.audioLevel.Observe(level)
})
@@ -250,8 +252,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
})
if t.receiver == nil {
t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.participantId)
t.receiver.SetRTCPCh(t.rtcpCh)
t.receiver = sfu.NewWebRTCReceiver(receiver, track, t.params.ParticipantID)
t.receiver.SetRTCPCh(t.params.RTCPChan)
t.receiver.OnCloseHandler(func() {
t.lock.Lock()
t.receiver = nil
@@ -266,7 +268,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
t.receiver.AddUpTrack(track, buff, true)
buff.Bind(receiver.GetParameters(), buffer.Options{
MaxBitRate: t.receiverConf.maxBitrate,
MaxBitRate: t.params.ReceiverConfig.maxBitrate,
})
}
@@ -282,7 +284,7 @@ func (t *MediaTrack) RemoveSubscriber(participantId string) {
}
func (t *MediaTrack) RemoveAllSubscribers() {
logger.Debugw("removing all subscribers", "track", t.id)
logger.Debugw("removing all subscribers", "track", t.params.TrackID)
t.lock.RLock()
defer t.lock.RUnlock()
for _, subTrack := range t.subscribedTracks {
+63 -37
View File
@@ -29,19 +29,25 @@ const (
sdBatchSize = 15
)
type ParticipantParams struct {
Identity string
Config *WebRTCConfig
Sink routing.MessageSink
AudioConfig config.AudioConfig
ProtocolVersion types.ProtocolVersion
Stats *StatsReporter
}
type ParticipantImpl struct {
id string
publisher *PCTransport
subscriber *PCTransport
responseSink routing.MessageSink
audioConfig config.AudioConfig
isClosed utils.AtomicFlag
conf *WebRTCConfig
identity string
permission *livekit.ParticipantPermission
state atomic.Value // livekit.ParticipantInfo_State
rtcpCh chan []rtcp.Packet
protocolVersion types.ProtocolVersion
params ParticipantParams
id string
publisher *PCTransport
subscriber *PCTransport
isClosed utils.AtomicFlag
permission *livekit.ParticipantPermission
state atomic.Value // livekit.ParticipantInfo_State
rtcpCh chan []rtcp.Packet
// reliable and unreliable data channels
reliableDC *webrtc.DataChannel
lossyDC *webrtc.DataChannel
@@ -74,16 +80,12 @@ type ParticipantImpl struct {
onClose func(types.Participant)
}
func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink, ac config.AudioConfig, pv types.ProtocolVersion) (*ParticipantImpl, error) {
func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
// TODO: check to ensure params are valid, id and identity can't be empty
p := &ParticipantImpl{
params: params,
id: utils.NewGuid(utils.ParticipantPrefix),
identity: identity,
responseSink: rs,
audioConfig: ac,
conf: conf,
protocolVersion: pv,
rtcpCh: make(chan []rtcp.Packet, 50),
subscribedTracks: make(map[string][]types.SubscribedTrack),
lock: sync.RWMutex{},
@@ -94,11 +96,19 @@ func NewParticipant(identity string, conf *WebRTCConfig, rs routing.MessageSink,
p.state.Store(livekit.ParticipantInfo_JOINING)
var err error
p.publisher, err = NewPCTransport(livekit.SignalTarget_PUBLISHER, conf)
p.publisher, err = NewPCTransport(TransportParams{
Target: livekit.SignalTarget_PUBLISHER,
Config: params.Config,
Stats: p.params.Stats,
})
if err != nil {
return nil, err
}
p.subscriber, err = NewPCTransport(livekit.SignalTarget_SUBSCRIBER, conf)
p.subscriber, err = NewPCTransport(TransportParams{
Target: livekit.SignalTarget_SUBSCRIBER,
Config: params.Config,
Stats: p.params.Stats,
})
if err != nil {
return nil, err
}
@@ -130,7 +140,7 @@ func (p *ParticipantImpl) ID() string {
}
func (p *ParticipantImpl) Identity() string {
return p.identity
return p.params.Identity
}
func (p *ParticipantImpl) State() livekit.ParticipantInfo_State {
@@ -138,7 +148,7 @@ func (p *ParticipantImpl) State() livekit.ParticipantInfo_State {
}
func (p *ParticipantImpl) ProtocolVersion() types.ProtocolVersion {
return p.protocolVersion
return p.params.ProtocolVersion
}
func (p *ParticipantImpl) IsReady() bool {
@@ -170,7 +180,7 @@ func (p *ParticipantImpl) RTCPChan() chan []rtcp.Packet {
func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
info := &livekit.ParticipantInfo{
Sid: p.id,
Identity: p.identity,
Identity: p.params.Identity,
Metadata: p.metadata,
State: p.State(),
JoinedAt: p.ConnectedAt().Unix(),
@@ -185,11 +195,11 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
}
func (p *ParticipantImpl) GetResponseSink() routing.MessageSink {
return p.responseSink
return p.params.Sink
}
func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) {
p.responseSink = sink
p.params.Sink = sink
}
func (p *ParticipantImpl) SubscriberMediaEngine() *webrtc.MediaEngine {
@@ -197,6 +207,7 @@ func (p *ParticipantImpl) SubscriberMediaEngine() *webrtc.MediaEngine {
}
// callbacks for clients
func (p *ParticipantImpl) OnTrackPublished(callback func(types.Participant, types.PublishedTrack)) {
p.onTrackPublished = callback
}
@@ -247,11 +258,14 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
"participant", p.Identity(),
//"sdp", sdp.SDP,
)
p.writeMessage(&livekit.SignalResponse{
err = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Answer{
Answer: ToProtoSessionDescription(answer),
},
})
if err != nil {
return
}
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
@@ -259,7 +273,8 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
return
}
// client intends to publish track, this function records track details and schedules negotiation
// AddTrack is called when client intends to publish track.
// records track details and lets client know it's ok to proceed
func (p *ParticipantImpl) AddTrack(clientId, name string, trackType livekit.TrackType) {
p.lock.Lock()
defer p.lock.Unlock()
@@ -276,7 +291,7 @@ func (p *ParticipantImpl) AddTrack(clientId, name string, trackType livekit.Trac
}
p.pendingTracks[clientId] = ti
p.writeMessage(&livekit.SignalResponse{
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_TrackPublished{
TrackPublished: &livekit.TrackPublishedResponse{
Cid: clientId,
@@ -361,7 +376,7 @@ func (p *ParticipantImpl) Close() error {
p.publisher.pc.OnICECandidate(nil)
// ensure this is synchronized
p.lock.RLock()
p.responseSink.Close()
p.params.Sink.Close()
onClose := p.onClose
p.lock.RUnlock()
if onClose != nil {
@@ -422,6 +437,7 @@ func (p *ParticipantImpl) RemoveSubscriber(participantId string) {
}
// signal connection methods
func (p *ParticipantImpl) SendJoinResponse(roomInfo *livekit.Room, otherParticipants []types.Participant, iceServers []*livekit.ICEServer) error {
// send Join response
return p.writeMessage(&livekit.SignalResponse{
@@ -500,7 +516,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackId string, muted bool) {
if currentMuted != track.IsMuted() && p.onTrackUpdated != nil {
logger.Debugw("mute status changed",
"participant", p.identity,
"participant", p.Identity(),
"track", trackId,
"muted", track.IsMuted())
p.onTrackUpdated(p, track)
@@ -552,7 +568,7 @@ func (p *ParticipantImpl) GetSubscribedTracks() []types.SubscribedTrack {
return subscribed
}
// add a track to the participant's subscribed list
// AddSubscribedTrack adds a track to the participant's subscribed list
func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.SubscribedTrack) {
logger.Debugw("added subscribedTrack", "srcParticipant", pubId,
"participant", p.Identity())
@@ -561,7 +577,7 @@ func (p *ParticipantImpl) AddSubscribedTrack(pubId string, subTrack types.Subscr
p.lock.Unlock()
}
// remove a track to the participant's subscribed list
// RemoveSubscribedTrack removes a track to the participant's subscribed list
func (p *ParticipantImpl) RemoveSubscribedTrack(pubId string, subTrack types.SubscribedTrack) {
logger.Debugw("removed subscribedTrack", "srcParticipant", pubId,
"participant", p.Identity())
@@ -611,13 +627,13 @@ func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error {
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
return nil
}
sink := p.responseSink
sink := p.params.Sink
err := sink.WriteMessage(msg)
if err != nil {
logger.Warnw("could not send message to participant",
"error", err,
"id", p.ID(),
"participant", p.identity,
"participant", p.Identity(),
"message", fmt.Sprintf("%T", msg.Message))
return err
}
@@ -672,7 +688,15 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
if trk, ok := ptrack.(*MediaTrack); ok {
mt = trk
} else {
mt = NewMediaTrack(ti.Sid, p.id, p.rtcpCh, track, p.conf.BufferFactory, p.conf.Receiver, p.audioConfig)
mt = NewMediaTrack(track, MediaTrackParams{
TrackID: ti.Sid,
ParticipantID: p.id,
RTCPChan: p.rtcpCh,
BufferFactory: p.params.Config.BufferFactory,
ReceiverConfig: p.params.Config.Receiver,
AudioConfig: p.params.AudioConfig,
Stats: p.params.Stats,
})
mt.name = ti.Name
newTrack = true
}
@@ -680,7 +704,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
if p.twcc == nil {
p.twcc = twcc.NewTransportWideCCResponder(uint32(track.SSRC()))
p.twcc.OnFeedback(func(pkt rtcp.RawPacket) {
p.publisher.pc.WriteRTCP([]rtcp.Packet{&pkt})
_ = p.publisher.pc.WriteRTCP([]rtcp.Packet{&pkt})
})
}
mt.AddReceiver(rtpReceiver, track, p.twcc)
@@ -806,7 +830,9 @@ func (p *ParticipantImpl) handlePublisherICEStateChange(state webrtc.ICEConnecti
if state == webrtc.ICEConnectionStateConnected {
p.updateState(livekit.ParticipantInfo_ACTIVE)
} else if state == webrtc.ICEConnectionStateDisconnected || state == webrtc.ICEConnectionStateFailed {
go p.Close()
go func() {
_ = p.Close()
}()
}
}
+9 -9
View File
@@ -95,7 +95,7 @@ func TestTrackPublishing(t *testing.T) {
p := newParticipantForTest("test")
//track := &typesfakes.FakePublishedTrack{}
//track.IDReturns("id")
sink := p.responseSink.(*routingfakes.FakeMessageSink)
sink := p.params.Sink.(*routingfakes.FakeMessageSink)
p.AddTrack("cid", "webcam", livekit.TrackType_VIDEO)
assert.Equal(t, 1, sink.WriteMessageCallCount())
res := sink.WriteMessageArgsForCall(0).(*livekit.SignalResponse)
@@ -110,7 +110,7 @@ func TestTrackPublishing(t *testing.T) {
p := newParticipantForTest("test")
//track := &typesfakes.FakePublishedTrack{}
//track.IDReturns("id")
sink := p.responseSink.(*routingfakes.FakeMessageSink)
sink := p.params.Sink.(*routingfakes.FakeMessageSink)
p.AddTrack("cid", "webcam", livekit.TrackType_VIDEO)
p.AddTrack("cid", "duplicate", livekit.TrackType_AUDIO)
@@ -123,7 +123,7 @@ func TestDisconnectTiming(t *testing.T) {
t.Run("Negotiate doesn't panic after channel closed", func(t *testing.T) {
p := newParticipantForTest("test")
msg := routing.NewMessageChannel()
p.responseSink = msg
p.params.Sink = msg
go func() {
for msg := range msg.ReadChan() {
t.Log("received message from chan", msg)
@@ -153,11 +153,11 @@ func newParticipantForTest(identity string) *ParticipantImpl {
if err != nil {
panic(err)
}
p, _ := NewParticipant(
identity,
rtcConf,
&routingfakes.FakeMessageSink{},
config.AudioConfig{},
0)
p, _ := NewParticipant(ParticipantParams{
Identity: identity,
Config: rtcConf,
Sink: &routingfakes.FakeMessageSink{},
ProtocolVersion: 0,
})
return p
}
+33 -5
View File
@@ -13,7 +13,8 @@ import (
)
const (
DefaultEmptyTimeout = 5 * 60 // 5 mins
DefaultEmptyTimeout = 5 * 60 // 5m
DefaultRoomDepartureGrace = 20
)
type Room struct {
@@ -33,6 +34,10 @@ type Room struct {
audioUpdateInterval uint32
lastActiveSpeakers []*livekit.SpeakerInfo
// aggregate stats
incomingStats PacketStats
outgoingStats PacketStats
onParticipantChanged func(p types.Participant)
onClose func()
}
@@ -199,6 +204,12 @@ func (r *Room) RemoveParticipant(identity string) {
// close participant as well
_ = p.Close()
// add participant stats
if pi, ok := p.(*ParticipantImpl); ok && pi.params.Stats != nil {
r.incomingStats.addFrom(pi.params.Stats.incoming)
r.outgoingStats.addFrom(pi.params.Stats.outgoing)
}
if len(r.participants) == 0 {
r.leftAt.Store(time.Now().Unix())
}
@@ -252,25 +263,42 @@ func (r *Room) CloseIfEmpty() {
return
}
timeout := r.EmptyTimeout
var elapsed int64
if r.FirstJoinedAt() > 0 {
// compute elasped from last departure
// exit 20s after
elapsed = time.Now().Unix() - r.LastLeftAt()
if timeout > DefaultRoomDepartureGrace {
timeout = DefaultRoomDepartureGrace
}
} else {
elapsed = time.Now().Unix() - r.CreationTime
}
if elapsed >= int64(r.EmptyTimeout) {
if elapsed >= int64(timeout) {
r.Close()
}
}
func (r *Room) Close() {
if !r.isClosed.TrySet(true) {
return
}
logger.Infow("closing room", "room", r.Sid, "name", r.Name)
if r.isClosed.TrySet(true) && r.onClose != nil {
if r.onClose != nil {
r.onClose()
}
}
func (r *Room) GetIncomingStats() PacketStats {
return r.incomingStats
}
func (r *Room) GetOutgoingStats() PacketStats {
return r.outgoingStats
}
func (r *Room) OnClose(f func()) {
r.onClose = f
}
@@ -316,7 +344,7 @@ func (r *Room) onTrackAdded(participant types.Participant, track types.Published
}
}
func (r *Room) onTrackUpdated(p types.Participant, track types.PublishedTrack) {
func (r *Room) onTrackUpdated(p types.Participant, _ types.PublishedTrack) {
// send track updates to everyone, especially if track was updated by admin
r.broadcastParticipantState(p, false)
if r.onParticipantChanged != nil {
+138
View File
@@ -0,0 +1,138 @@
package rtc
import (
"io"
"sync/atomic"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/transport/packetio"
)
type PacketStats struct {
roomName string
identity string
kind string // incoming or outgoing
PacketCount uint64 `json:"packetCount"`
NackCount uint64 `json:"nackCount"`
PLICount uint64 `json:"pliCount"`
FIRCount uint64 `json:"firCount"`
}
func newPacketStats(room, identity, kind string) *PacketStats {
return &PacketStats{
roomName: room,
identity: identity,
kind: kind,
}
}
func (s *PacketStats) IncrementPackets(count uint64) {
atomic.AddUint64(&s.PacketCount, count)
}
func (s *PacketStats) IncrementNack(count uint64) {
atomic.AddUint64(&s.NackCount, count)
}
func (s *PacketStats) IncrementPLI(count uint64) {
atomic.AddUint64(&s.PLICount, count)
}
func (s *PacketStats) IncrementFIR(count uint64) {
atomic.AddUint64(&s.FIRCount, count)
}
func (s *PacketStats) HandleRTCP(pkts []rtcp.Packet) {
for _, rtcpPacket := range pkts {
switch rtcpPacket.(type) {
case *rtcp.TransportLayerNack:
s.IncrementNack(1)
case *rtcp.PictureLossIndication:
s.IncrementPLI(1)
case *rtcp.FullIntraRequest:
s.IncrementFIR(1)
}
}
}
func (s *PacketStats) addFrom(o *PacketStats) {
s.PacketCount += atomic.LoadUint64(&o.PacketCount)
s.NackCount += atomic.LoadUint64(&o.NackCount)
s.PLICount += atomic.LoadUint64(&o.PLICount)
s.FIRCount += atomic.LoadUint64(&o.FIRCount)
}
type StatsReporter struct {
incoming *PacketStats
outgoing *PacketStats
}
func NewStatsReporter(roomName, identity string) *StatsReporter {
return &StatsReporter{
incoming: newPacketStats(roomName, identity, "incoming"),
outgoing: newPacketStats(roomName, identity, "outgoing"),
}
}
// StatsBufferWrapper wraps a buffer factory so we could get information on
// incoming packets
type StatsBufferWrapper struct {
createBufferFunc func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser
stats *PacketStats
}
func (w *StatsBufferWrapper) CreateBuffer(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser {
writer := w.createBufferFunc(packetType, ssrc)
if packetType == packetio.RTPBufferPacket {
// wrap this in a counter class
return &rtpReporterWriter{
ReadWriteCloser: writer,
stats: w.stats,
}
}
return writer
}
type rtpReporterWriter struct {
io.ReadWriteCloser
stats *PacketStats
}
func (w *rtpReporterWriter) Write(p []byte) (n int, err error) {
w.stats.IncrementPackets(1)
return w.ReadWriteCloser.Write(p)
}
// StatsInterceptor is created for each participant to keep of track of outgoing stats
// it adheres to Pion interceptor interface
type StatsInterceptor struct {
interceptor.NoOp
reporter *StatsReporter
}
func NewStatsInterceptor(reporter *StatsReporter) *StatsInterceptor {
return &StatsInterceptor{
reporter: reporter,
}
}
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
s.reporter.outgoing.HandleRTCP(pkts)
return writer.Write(pkts, attributes)
})
}
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
s.reporter.outgoing.IncrementPackets(1)
return writer.Write(header, payload, attributes)
})
}
+29 -7
View File
@@ -7,6 +7,7 @@ import (
"github.com/bep/debounce"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
livekit "github.com/livekit/livekit-server/proto"
@@ -37,10 +38,16 @@ type PCTransport struct {
negotiationState atomic.Value
}
func newPeerConnection(target livekit.SignalTarget, conf *WebRTCConfig) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
type TransportParams struct {
Target livekit.SignalTarget
Config *WebRTCConfig
Stats *StatsReporter
}
func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) {
var me *webrtc.MediaEngine
var err error
if target == livekit.SignalTarget_PUBLISHER {
if params.Target == livekit.SignalTarget_PUBLISHER {
me, err = createPubMediaEngine()
} else {
me, err = createSubMediaEngine()
@@ -48,16 +55,31 @@ func newPeerConnection(target livekit.SignalTarget, conf *WebRTCConfig) (*webrtc
if err != nil {
return nil, nil, err
}
se := conf.SettingEngine
se := params.Config.SettingEngine
se.DisableMediaEngineCopy(true)
if params.Stats != nil && se.BufferFactory != nil {
wrapper := &StatsBufferWrapper{
createBufferFunc: se.BufferFactory,
stats: params.Stats.incoming,
}
se.BufferFactory = wrapper.CreateBuffer
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(me), webrtc.WithSettingEngine(se))
pc, err := api.NewPeerConnection(conf.Configuration)
ir := &interceptor.Registry{}
if params.Stats != nil {
ir.Add(NewStatsInterceptor(params.Stats))
}
api := webrtc.NewAPI(
webrtc.WithMediaEngine(me),
webrtc.WithSettingEngine(se),
webrtc.WithInterceptorRegistry(ir),
)
pc, err := api.NewPeerConnection(params.Config.Configuration)
return pc, me, err
}
func NewPCTransport(target livekit.SignalTarget, conf *WebRTCConfig) (*PCTransport, error) {
pc, me, err := newPeerConnection(target, conf)
func NewPCTransport(params TransportParams) (*PCTransport, error) {
pc, me, err := newPeerConnection(params)
if err != nil {
return nil, err
}
+22 -5
View File
@@ -233,7 +233,14 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
if pi.UsePlanB {
rtcConf.Configuration.SDPSemantics = webrtc.SDPSemanticsPlanB
}
participant, err = rtc.NewParticipant(pi.Identity, &rtcConf, responseSink, r.config.Audio, pv)
participant, err = rtc.NewParticipant(rtc.ParticipantParams{
Identity: pi.Identity,
Config: &rtcConf,
Sink: responseSink,
AudioConfig: r.config.Audio,
ProtocolVersion: pv,
Stats: rtc.NewStatsReporter(roomName, pi.Identity),
})
if err != nil {
logger.Errorw("could not create participant", "error", err)
return
@@ -277,6 +284,11 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
if err := r.DeleteRoom(roomName); err != nil {
logger.Errorw("could not delete room", "error", err)
}
// print stats
logger.Infow("room closed",
"incomingStats", room.GetIncomingStats(),
"outgoingStats", room.GetOutgoingStats(),
)
})
room.OnParticipantChanged(func(p types.Participant) {
var err error
@@ -303,7 +315,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
"participant", participant.Identity(),
"room", room.Name,
)
participant.Close()
_ = participant.Close()
}()
defer rtc.Recover()
@@ -356,9 +368,14 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
participant.SetTrackMuted(msg.Mute.Sid, msg.Mute.Muted)
case *livekit.SignalRequest_Subscription:
if participant.CanSubscribe() {
room.UpdateSubscriptions(participant, msg.Subscription)
if err := room.UpdateSubscriptions(participant, msg.Subscription); err != nil {
logger.Warnw("could not update subscription",
"participant", participant.Identity(),
"tracks", msg.Subscription.TrackSids,
"subscribe", msg.Subscription.Subscribe)
}
} else {
logger.Warnw("rejected participant subscription",
logger.Infow("rejected participant subscription",
"participant", participant.Identity(),
"tracks", msg.Subscription.TrackSids)
}
@@ -373,7 +390,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
}
}
case *livekit.SignalRequest_Leave:
participant.Close()
_ = participant.Close()
}
}
}
+8 -2
View File
@@ -103,12 +103,18 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
conf := rtc.WebRTCConfig{
Configuration: rtcConf,
}
c.publisher, err = rtc.NewPCTransport(livekit.SignalTarget_PUBLISHER, &conf)
c.publisher, err = rtc.NewPCTransport(rtc.TransportParams{
Target: livekit.SignalTarget_PUBLISHER,
Config: &conf,
})
if err != nil {
return nil, err
}
// intentionally use publisher transport to have codecs pre-registered
c.subscriber, err = rtc.NewPCTransport(livekit.SignalTarget_PUBLISHER, &conf)
c.subscriber, err = rtc.NewPCTransport(rtc.TransportParams{
Target: livekit.SignalTarget_PUBLISHER,
Config: &conf,
})
if err != nil {
return nil, err
}