diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 44f68fc69..cbf0e25b8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -909,13 +909,17 @@ func (p *ParticipantImpl) UpdateSubscribedTrackSettings(trackID livekit.TrackID, } func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) { + if !p.IsReady() { + // we have not sent a JoinResponse yet. metadata would be covered in JoinResponse + return + } if v, ok := p.updateCache.Get(pID); ok && v >= version { return } if f := p.params.GetParticipantInfo; f != nil { if info := f(pID); info != nil { - p.SendParticipantUpdate([]*livekit.ParticipantInfo{info}) + _ = p.SendParticipantUpdate([]*livekit.ParticipantInfo{info}) } } } @@ -1187,11 +1191,6 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) { // when the server has an offer for participant func (p *ParticipantImpl) onSubscriberOffer(offer webrtc.SessionDescription) error { - if p.State() == livekit.ParticipantInfo_DISCONNECTED { - // skip when disconnected - return nil - } - p.params.Logger.Infow("sending offer", "transport", livekit.SignalTarget_SUBSCRIBER) return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 37c661d65..9f3b4ff96 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -58,7 +58,6 @@ func TestIsReady(t *testing.T) { func TestTrackPublishing(t *testing.T) { t.Run("should send the correct events", func(t *testing.T) { p := newParticipantForTest("test") - p.state.Store(livekit.ParticipantInfo_ACTIVE) track := &typesfakes.FakeMediaTrack{} track.IDReturns("id") published := false @@ -184,6 +183,7 @@ func TestTrackPublishing(t *testing.T) { func TestOutOfOrderUpdates(t *testing.T) { p := newParticipantForTest("test") + p.updateState(livekit.ParticipantInfo_JOINED) p.SetMetadata("initial metadata") sink := p.getResponseSink().(*routingfakes.FakeMessageSink) pi1 := p.ToProto() @@ -483,6 +483,7 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p Logger: LoggerWithParticipant(logger.GetLogger(), identity, sid, false), }) p.isPublisher.Store(opts.publisher) + p.updateState(livekit.ParticipantInfo_ACTIVE) return p } diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index dc960e45f..37f0587e3 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -30,6 +30,7 @@ func (p *ParticipantImpl) SetResponseSink(sink routing.MessageSink) { } func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) error { + // update state prior to sending message, or the message would not be sent if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) } @@ -43,6 +44,10 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e } func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error { + if !p.IsReady() { + // avoid manipulating cache before it's ready + return nil + } p.updateLock.Lock() validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate)) for _, pi := range participantsToUpdate { @@ -173,7 +178,7 @@ func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) { } func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { - if p.State() == livekit.ParticipantInfo_DISCONNECTED { + if !p.IsReady() { return nil } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 8da91bed4..5cee4ca38 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -678,10 +678,10 @@ func (d *DownTrack) Close() { // Close track, flush used to indicate whether send blank frame to flush // decoder of client. -// 1. When transceiver is reused by other participant's video track, -// set flush=true to avoid previous video shows before previous stream is displayed. -// 2. in case of session migration, participant migrate from other node, video track should -// be resumed with same participant, set flush=false since we don't need to flush decoder. +// 1. When transceiver is reused by other participant's video track, +// set flush=true to avoid previous video shows before new stream is displayed. +// 2. in case of session migration, participant migrate from other node, video track should +// be resumed with same participant, set flush=false since we don't need to flush decoder. func (d *DownTrack) CloseWithFlush(flush bool) { if d.isClosed.Swap(true) { // already closed diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 255d189a7..018e25d6f 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -15,9 +15,7 @@ import ( dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" ) -// // Forwarder -// const ( FlagPauseOnDowngrade = true FlagFilterRTX = true diff --git a/pkg/sfu/prober.go b/pkg/sfu/prober.go index 8eb50710f..cde3dc49a 100644 --- a/pkg/sfu/prober.go +++ b/pkg/sfu/prober.go @@ -1,4 +1,3 @@ -// // Design of Prober // // Probing is used to check for existence of excess channel capacity. @@ -61,12 +60,12 @@ // estimated channel capacity + probing rate when actively probing). // // But, there a few significant challenges -// 1. Pacer will require buffering of forwarded packets. That means -// more memory, more CPU (have to make copy of packets) and -// more latency in the media stream. -// 2. Scalability concern as SFU may be handling hundreds of -// subscriber peer connections and each one processing the pacing -// loop at 5ms interval will add up. +// 1. Pacer will require buffering of forwarded packets. That means +// more memory, more CPU (have to make copy of packets) and +// more latency in the media stream. +// 2. Scalability concern as SFU may be handling hundreds of +// subscriber peer connections and each one processing the pacing +// loop at 5ms interval will add up. // // So, this module assumes that pacing is inherently provided by the // publishers for media streams. That is a reasonable assumption given @@ -85,25 +84,25 @@ // For example, probing at 5 Mbps for 1/2 second and sending 1000 byte // probe per iteration will wake up every 1.6 ms. That is very high, // but should last for 1/2 second or so. -// 5 Mbps over 1/2 second = 2.5 Mbps -// 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes -// 313 probes over 1/2 second = 1.6 ms between probes +// +// 5 Mbps over 1/2 second = 2.5 Mbps +// 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes +// 313 probes over 1/2 second = 1.6 ms between probes // // A few things to note -// 1. When a probe cluster is added, the expected media rate is provided. -// So, the wake-up interval takes that into account. For example, -// if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected -// to be provided by media traffic, the wake-up interval becomes 8 ms. -// 2. The amount of probing should actually be capped at some value to -// avoid too much self-induced congestion. It maybe something like 500 kbps. -// That will increase the wake-up interval to 16 ms in the above example. -// 3. In practice, the probing interval may also be shorter. Typically, -// it can be run for 2 - 3 RTTs to get a good measurement. For -// the longest hauls, RTT could be 250 ms or so leading to the probing -// window being long(ish). But, RTT should be much shorter especially if -// the subscriber peer connection of the client is able to connect to -// the nearest data center. -// +// 1. When a probe cluster is added, the expected media rate is provided. +// So, the wake-up interval takes that into account. For example, +// if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected +// to be provided by media traffic, the wake-up interval becomes 8 ms. +// 2. The amount of probing should actually be capped at some value to +// avoid too much self-induced congestion. It maybe something like 500 kbps. +// That will increase the wake-up interval to 16 ms in the above example. +// 3. In practice, the probing interval may also be shorter. Typically, +// it can be run for 2 - 3 RTTs to get a good measurement. For +// the longest hauls, RTT could be 250 ms or so leading to the probing +// window being long(ish). But, RTT should be much shorter especially if +// the subscriber peer connection of the client is able to connect to +// the nearest data center. package sfu import (