From b85ff8f063703c8fa6b6561d49a7dd179781a097 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 4 Sep 2023 12:39:14 +0800 Subject: [PATCH 01/13] Support non-SVC AV1 track publishing (#2030) --- pkg/sfu/downtrack.go | 5 ++++- pkg/sfu/forwarder.go | 38 +++++++++++++++++++++++++------------- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 44414d7c8..f126847bc 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -677,7 +677,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { return err } - extensions := []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: tp.ddBytes}} + var extensions []pacer.ExtensionData + if tp.ddBytes != nil { + extensions = []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: tp.ddBytes}} + } if d.playoutDelayExtID != 0 && !d.playoudDelayAcked.Load() { if val := d.playoutDelayBytes.Load(); val != nil { extensions = append(extensions, pacer.ExtensionData{ID: uint8(d.playoutDelayExtID), Payload: val.([]byte)}) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 9421f238c..567465ce9 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -273,6 +273,15 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ } f.codec = codec + ddAvailable := func(exts []webrtc.RTPHeaderExtensionParameter) bool { + for _, ext := range exts { + if ext.URI == dd.ExtensionURI { + return true + } + } + return false + } + switch strings.ToLower(codec.MimeType) { case "video/vp8": f.codecMunger = codecmunger.NewVP8FromNull(f.codecMunger, f.logger) @@ -289,15 +298,8 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ f.vls = videolayerselector.NewSimulcast(f.logger) } case "video/vp9": - isDDAvailable := false - searchDone: - for _, ext := range extensions { - switch ext.URI { - case dd.ExtensionURI: - isDDAvailable = true - break searchDone - } - } + isDDAvailable := ddAvailable(extensions) + if isDDAvailable { if f.vls != nil { f.vls = videolayerselector.NewDependencyDescriptorFromNull(f.vls) @@ -314,12 +316,22 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ // SVC-TODO: Support for VP9 simulcast. When DD is not available, have to pick selector based on VP9 SVC or Simulcast case "video/av1": // DD-TODO : we only enable dd layer selector for av1/vp9 now, in the future we can enable it for vp8 too - if f.vls != nil { - f.vls = videolayerselector.NewDependencyDescriptorFromNull(f.vls) + + isDDAvailable := ddAvailable(extensions) + if isDDAvailable { + if f.vls != nil { + f.vls = videolayerselector.NewDependencyDescriptorFromNull(f.vls) + } else { + f.vls = videolayerselector.NewDependencyDescriptor(f.logger) + } } else { - f.vls = videolayerselector.NewDependencyDescriptor(f.logger) + if f.vls != nil { + f.vls = videolayerselector.NewSimulcastFromNull(f.vls) + } else { + f.vls = videolayerselector.NewSimulcast(f.logger) + } } - // SVC-TODO: Support for AV1 Simulcast or just single spatial layer - won't have DD in that case + // SVC-TODO: Support for AV1 Simulcast } } From e922ae5f2319b6d99d9333946aa6927dd26308f3 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 4 Sep 2023 22:01:07 +0530 Subject: [PATCH 02/13] Filter out noisy error (#2032) --- pkg/rtc/transport.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 47464e6b9..b671649cd 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -29,6 +29,7 @@ import ( "github.com/pion/interceptor/pkg/gcc" "github.com/pion/interceptor/pkg/twcc" "github.com/pion/rtcp" + "github.com/pion/sctp" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/pkg/errors" @@ -831,7 +832,9 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni } dcErrorHandler := func(err error) { - t.params.Logger.Errorw(dc.Label()+" data channel error", err) + if !errors.Is(err, sctp.ErrResetPacketInStateNotExist) { + t.params.Logger.Errorw(dc.Label()+" data channel error", err) + } } t.lock.Lock() From 95edad6aab321ebaf6bac4d62ea90fe230144043 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 4 Sep 2023 21:16:25 -0700 Subject: [PATCH 03/13] Update actions/checkout action to v4 (#2031) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/buildtest.yaml | 2 +- .github/workflows/docker.yaml | 2 +- .github/workflows/release.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/buildtest.yaml b/.github/workflows/buildtest.yaml index 9694df071..192750b6a 100644 --- a/.github/workflows/buildtest.yaml +++ b/.github/workflows/buildtest.yaml @@ -25,7 +25,7 @@ jobs: test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: shogo82148/actions-setup-redis@v1 with: redis-version: "6.x" diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index e08c7f1ad..920157bfa 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -25,7 +25,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Docker meta id: meta uses: docker/metadata-action@v4 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 7e0110d0a..4da1bfecc 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -28,7 +28,7 @@ jobs: release: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Fetch all tags run: git fetch --force --tags From 1590b966869e52f38c992346f95bfad5331e10ea Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 5 Sep 2023 12:00:00 +0530 Subject: [PATCH 04/13] Need to set reference layer when starting with dummy packets. (#2034) Dummy packets are used at start to trigger Pion's OnTrack. --- pkg/sfu/buffer/rtpstats.go | 4 +--- pkg/sfu/downtrack.go | 7 +++++-- pkg/sfu/forwarder.go | 9 +++++++++ 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 357b50902..9bacf08f7 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -644,8 +644,6 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 if extHighestSNOverridden < r.sequenceNumber.GetExtendedStart() { // it is possible that the `LastSequenceNumber` in the receiver report is before the starting // sequence number when dummy packets are used to trigger Pion's OnTrack path. - r.lastRRTime = time.Now() - r.lastRR = rr return } @@ -1211,7 +1209,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { } func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo { - if !r.params.IsReceiverReportDriven { + if !r.params.IsReceiverReportDriven || r.lastRRTime.IsZero() { return nil } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index f126847bc..53e6f327e 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1786,8 +1786,11 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { AbsSendTimeExtID: uint8(d.absSendTimeExtID), TransportWideExtID: uint8(d.transportWideExtID), WriteStream: d.writeStream, - Metadata: sendPacketMetadata{}, - OnSent: d.packetSent, + Metadata: sendPacketMetadata{ + // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only + isPadding: true, + }, + OnSent: d.packetSent, }) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 567465ce9..b64824ca9 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1454,6 +1454,15 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "referenceLayerSpatial", f.referenceLayerSpatial, ) return nil + } else if f.referenceLayerSpatial == buffer.InvalidLayerSpatial { + f.referenceLayerSpatial = layer + f.logger.Debugw( + "catch up forwarding", + "sequenceNumber", extPkt.Packet.SequenceNumber, + "timestamp", extPkt.Packet.Timestamp, + "layer", layer, + "referenceLayerSpatial", f.referenceLayerSpatial, + ) } logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) { From 9b9298b9270d558c9b7cca9cf7d4faf540f4be25 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 6 Sep 2023 10:18:20 +0800 Subject: [PATCH 05/13] Add batch i/o to improve throughput (#2033) * Add batch i/o to improve throughput * remove empty line * Solve comments * Change rtcconfig.UDPPort to PortRange * Fix test --- cmd/server/commands.go | 5 +++-- cmd/server/main.go | 3 ++- config-sample.yaml | 11 ++++++++--- go.mod | 2 +- go.sum | 4 ++-- pkg/config/config.go | 3 +-- pkg/rtc/participant_internal_test.go | 1 - pkg/service/roommanager.go | 2 +- pkg/service/server.go | 2 +- test/integration_helpers.go | 3 ++- 10 files changed, 21 insertions(+), 15 deletions(-) diff --git a/cmd/server/commands.go b/cmd/server/commands.go index 99499a50f..624af6345 100644 --- a/cmd/server/commands.go +++ b/cmd/server/commands.go @@ -55,8 +55,9 @@ func printPorts(c *cli.Context) error { if conf.RTC.TCPPort != 0 { tcpPorts = append(tcpPorts, fmt.Sprintf("%d - ICE/TCP", conf.RTC.TCPPort)) } - if conf.RTC.UDPPort != 0 { - udpPorts = append(udpPorts, fmt.Sprintf("%d - ICE/UDP", conf.RTC.UDPPort)) + if conf.RTC.UDPPort.Valid() { + portStr, _ := conf.RTC.UDPPort.MarshalYAML() + udpPorts = append(udpPorts, fmt.Sprintf("%s - ICE/UDP", portStr)) } else { udpPorts = append(udpPorts, fmt.Sprintf("%d-%d - ICE/UDP range", conf.RTC.ICEPortRangeStart, conf.RTC.ICEPortRangeEnd)) } diff --git a/cmd/server/main.go b/cmd/server/main.go index 4d37142fa..149d4cb23 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" + "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/config" @@ -206,7 +207,7 @@ func getConfig(c *cli.Context) (*config.Config, error) { if c.String("config") == "" && c.String("config-body") == "" && conf.Development { // use single port UDP when no config is provided - conf.RTC.UDPPort = 7882 + conf.RTC.UDPPort = rtcconfig.PortRange{Start: 7882} conf.RTC.ICEPortRangeStart = 0 conf.RTC.ICEPortRangeEnd = 0 logger.Infow("starting in development mode") diff --git a/config-sample.yaml b/config-sample.yaml index c800abd16..0b44b11bf 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -43,10 +43,10 @@ rtc: # that maps to an external one use_external_ip: true # # when set, LiveKit will attempt to use a UDP mux so all UDP traffic goes through - # # a single port. This simplifies deployment, but mux will become an overhead for - # # highly trafficked deployments. + # # listed port(s). To maximize system performance, we recommend using a range of ports + # # greater or equal to the number of vCPUs on the machine. # # port_range_start & end must not be set for this config to take effect - # udp_port: 7882 + # udp_port: 7882-7892 # # when set to true, server will use a lite ice agent, that will speed up ice connection, but # # might cause connect issue if server running behind NAT. # use_ice_lite: true @@ -107,6 +107,11 @@ rtc: # # are disabled and clients don't ACK opened peer connections, only reliable, ordered delivery # # will be available. # strict_acks: true + # # enable batch write to merge network write system calls to reduce cpu usage. Outgoing packets + # # will be queued until length of queue equal to `batch_size` or time elapsed since last write exceeds `max_flush_interval`. + # batch_io: + # batch_size: 128 + # max_flush_interval: 2ms # when enabled, LiveKit will expose prometheus metrics on :6789/metrics # prometheus_port: 6789 diff --git a/go.mod b/go.mod index 6150a1681..b0fb0813d 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 + github.com/livekit/mediatransportutil v0.0.0-20230905070227-1decac7a3c61 github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 diff --git a/go.sum b/go.sum index 071770a9f..1b551c5ea 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0 h1:cHNvPzn6VHFcsHx8ZC9LwU/4jj22mW3LILrNg/y5A6I= -github.com/livekit/mediatransportutil v0.0.0-20230823131232-12f579dc9af0/go.mod h1:xirUXW8xnLGmfCwUeAv/nj1VGo1OO1BmgxrYP7jK/14= +github.com/livekit/mediatransportutil v0.0.0-20230905070227-1decac7a3c61 h1:B03f6QRPP8ESgthh4FdV6RTrW+ukEePMi4uDdAnJxKQ= +github.com/livekit/mediatransportutil v0.0.0-20230905070227-1decac7a3c61/go.mod h1:njHoM3uTstzLF5N/FyzZ96+TYdJkgeyRbCWBWBiyS2s= github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ= github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= diff --git a/pkg/config/config.go b/pkg/config/config.go index b8dc6044e..e2f165cf8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -308,7 +308,6 @@ var DefaultConfig = Config{ RTCConfig: rtcconfig.RTCConfig{ UseExternalIP: false, TCPPort: 7881, - UDPPort: 0, ICEPortRangeStart: 0, ICEPortRangeEnd: 0, STUNServers: []string{}, @@ -802,7 +801,7 @@ func (conf *Config) updateFromCLI(c *cli.Context, baseFlags []cli.Flag) error { conf.RTC.NodeIP = c.String("node-ip") } if c.IsSet("udp-port") { - conf.RTC.UDPPort = uint32(c.Int("udp-port")) + conf.RTC.UDPPort = rtcconfig.PortRange{Start: (c.Int("udp-port"))} } if c.IsSet("bind") { conf.BindAddresses = c.StringSlice("bind") diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 55f773879..1454a27d6 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -661,7 +661,6 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p } conf, _ := config.NewConfig("", true, nil, nil) // disable mux, it doesn't play too well with unit test - conf.RTC.UDPPort = 0 conf.RTC.TCPPort = 0 rtcConf, err := NewWebRTCConfig(conf) if err != nil { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 6393e55be..9f26d1abf 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -556,7 +556,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa _ = r.refreshToken(participant) tokenTicker := time.NewTicker(tokenRefreshInterval) defer tokenTicker.Stop() - stateCheckTicker := time.NewTicker(time.Millisecond * 50) + stateCheckTicker := time.NewTicker(time.Millisecond * 500) defer stateCheckTicker.Stop() for { select { diff --git a/pkg/service/server.go b/pkg/service/server.go index 693315994..c67555614 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -220,7 +220,7 @@ func (s *LivekitServer) Start() error { if s.config.RTC.TCPPort != 0 { values = append(values, "rtc.portTCP", s.config.RTC.TCPPort) } - if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort != 0 { + if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort.Valid() { values = append(values, "rtc.portUDP", s.config.RTC.UDPPort) } else { values = append(values, diff --git a/test/integration_helpers.go b/test/integration_helpers.go index 633c6a934..7a11b353e 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -31,6 +31,7 @@ import ( "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/pkg/testutils" testclient "github.com/livekit/livekit-server/test/client" + "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -178,7 +179,7 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer { panic(fmt.Sprintf("could not create config: %v", err)) } conf.Port = port - conf.RTC.UDPPort = port + 1 + conf.RTC.UDPPort = rtcconfig.PortRange{Start: int(port) + 1} conf.RTC.TCPPort = port + 2 conf.Redis.Address = "localhost:6379" conf.Keys = map[string]string{testApiKey: testApiSecret} From c122c20f49d3ff6cf0d3bce6cfdcc6646c6090c9 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 6 Sep 2023 08:27:16 +0530 Subject: [PATCH 06/13] Do not re-pause a paused track. (#2037) --- pkg/sfu/forwarder.go | 10 ++++++++-- pkg/sfu/streamallocator/streamallocator.go | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index b64824ca9..0b7320991 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -811,7 +811,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b if f.provisional.muted || f.provisional.pubMuted { f.provisional.allocatedLayer = buffer.InvalidLayer return VideoTransition{ - From: f.vls.GetTarget(), + From: existingTargetLayer, To: f.provisional.allocatedLayer, BandwidthDelta: -getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), } @@ -1300,12 +1300,18 @@ func (f *Forwarder) Pause(availableLayers []int32, brs Bitrates) VideoAllocation f.lock.Lock() defer f.lock.Unlock() + existingTargetLayer := f.vls.GetTarget() + if !existingTargetLayer.IsValid() { + // already paused + return f.lastAllocation + } + maxLayer := f.vls.GetMax() maxSeenLayer := f.vls.GetMaxSeen() optimalBandwidthNeeded := getOptimalBandwidthNeeded(f.muted, f.pubMuted, maxSeenLayer.Spatial, brs, maxLayer) alloc := VideoAllocation{ BandwidthRequested: 0, - BandwidthDelta: 0 - getBandwidthNeeded(brs, f.vls.GetTarget(), f.lastAllocation.BandwidthRequested), + BandwidthDelta: 0 - getBandwidthNeeded(brs, existingTargetLayer, f.lastAllocation.BandwidthRequested), Bitrates: brs, BandwidthNeeded: optimalBandwidthNeeded, TargetLayer: buffer.InvalidLayer, diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index f9baf7e5c..91ad018cc 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -883,8 +883,8 @@ func (s *StreamAllocator) allocateTrack(track *Track) { return } - // already streaming at some layer and transition is not requesting any change, i. e. BandwidthDelta == 0 - if transition.From.IsValid() && transition.BandwidthDelta == 0 { + // a no-op transition + if transition.From == transition.To { return } From b38b51dad8bebfa81cb267a268d087c9fd04dd6b Mon Sep 17 00:00:00 2001 From: Trey Hakanson Date: Tue, 5 Sep 2023 20:07:52 -0700 Subject: [PATCH 07/13] Integrate updated TWCC responder (#2038) Integrates the updated TWCC responder based on `pion/interceptor` from https://github.com/livekit/mediatransportutil/pull/25 --- go.mod | 6 +++--- go.sum | 10 ++++++---- pkg/rtc/participant.go | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index b0fb0813d..dc465c9d7 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230905070227-1decac7a3c61 + github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216 github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 @@ -27,7 +27,7 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/pion/dtls/v2 v2.2.7 github.com/pion/ice/v2 v2.3.11 - github.com/pion/interceptor v0.1.17 + github.com/pion/interceptor v0.1.18 github.com/pion/rtcp v1.2.10 github.com/pion/rtp v1.8.1 github.com/pion/sctp v1.8.7 @@ -97,7 +97,7 @@ require ( golang.org/x/crypto v0.12.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.14.0 // indirect - golang.org/x/sys v0.11.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.12.0 // indirect golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect diff --git a/go.sum b/go.sum index 1b551c5ea..40045977d 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230905070227-1decac7a3c61 h1:B03f6QRPP8ESgthh4FdV6RTrW+ukEePMi4uDdAnJxKQ= -github.com/livekit/mediatransportutil v0.0.0-20230905070227-1decac7a3c61/go.mod h1:njHoM3uTstzLF5N/FyzZ96+TYdJkgeyRbCWBWBiyS2s= +github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216 h1:gwGhhhx+vUXR1dZqpKBautkx7qJAXvgCdQxgluBiUqc= +github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ= github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= @@ -188,8 +188,9 @@ github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ github.com/pion/ice/v2 v2.3.10/go.mod h1:hHGCibDfmXGqukayQw979xEctASp2Pe5Oe0iDU8pRus= github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= -github.com/pion/interceptor v0.1.17 h1:prJtgwFh/gB8zMqGZoOgJPHivOwVAp61i2aG61Du/1w= github.com/pion/interceptor v0.1.17/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI= +github.com/pion/interceptor v0.1.18 h1:Hk26334NUQeUcJNR27YHYKT+sWNhhegQ9KFz5Nn6yMQ= +github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.7/go.mod h1:4iP2UbeFhLI/vWju/bw6ZfwjJzk0z8DNValjGxR/dD8= @@ -382,8 +383,9 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 005ff5141..6a7ccf919 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1713,8 +1713,8 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei ssrc := uint32(track.SSRC()) if p.twcc == nil { p.twcc = twcc.NewTransportWideCCResponder(ssrc) - p.twcc.OnFeedback(func(pkt rtcp.RawPacket) { - p.postRtcp([]rtcp.Packet{&pkt}) + p.twcc.OnFeedback(func(pkts []rtcp.Packet) { + p.postRtcp(pkts) }) } p.pendingTracksLock.Unlock() From 75ffb25d772440a8f64fe94836d330cb2e9cbbfd Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 6 Sep 2023 14:17:49 +0800 Subject: [PATCH 08/13] Parse PortRange of UDPPort from cli flag (#2039) --- cmd/server/main.go | 4 ++-- go.mod | 2 +- go.sum | 2 ++ pkg/config/config.go | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 149d4cb23..c53f01640 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -71,9 +71,9 @@ var baseFlags = []cli.Flag{ Usage: "IP address of the current node, used to advertise to clients. Automatically determined by default", EnvVars: []string{"NODE_IP"}, }, - &cli.IntFlag{ + &cli.StringFlag{ Name: "udp-port", - Usage: "Single UDP port to use for WebRTC traffic", + Usage: "UDP port(s) to use for WebRTC traffic", EnvVars: []string{"UDP_PORT"}, }, &cli.StringFlag{ diff --git a/go.mod b/go.mod index dc465c9d7..69a3398cd 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216 + github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 github.com/livekit/psrpc v0.3.3 github.com/mackerelio/go-osstat v0.2.4 diff --git a/go.sum b/go.sum index 40045977d..b43ba3bb2 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216 h1:gwGhhhx+vUXR1dZqpKBautkx7qJAXvgCdQxgluBiUqc= github.com/livekit/mediatransportutil v0.0.0-20230905085142-e1fcf8eae216/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= +github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg= +github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86 h1:QEzGhfIOmGdRw17xIldbYzb1MTsYuVfXSqz8FTyfjWQ= github.com/livekit/protocol v1.6.2-0.20230828184341-dfb5162c7c86/go.mod h1:/JuO+G/btZ5gNwX2+901L6za3UvVO6DHRXHsv8kkLsU= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= diff --git a/pkg/config/config.go b/pkg/config/config.go index e2f165cf8..1f3ab2aba 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -801,7 +801,7 @@ func (conf *Config) updateFromCLI(c *cli.Context, baseFlags []cli.Flag) error { conf.RTC.NodeIP = c.String("node-ip") } if c.IsSet("udp-port") { - conf.RTC.UDPPort = rtcconfig.PortRange{Start: (c.Int("udp-port"))} + conf.RTC.UDPPort.UnmarshalString(c.String("udp-port")) } if c.IsSet("bind") { conf.BindAddresses = c.StringSlice("bind") From ce4554beb1d0a53c763e754c9b590e16ab46aff9 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 5 Sep 2023 23:19:42 -0700 Subject: [PATCH 09/13] Update go deps (#1991) Generated by renovateBot Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 17 +++++++++-------- go.sum | 31 ++++++++++++++++++------------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 69a3398cd..c4c021f8d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 - github.com/hashicorp/golang-lru/v2 v2.0.5 + github.com/hashicorp/golang-lru/v2 v2.0.6 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f @@ -38,15 +38,15 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/redis/go-redis/v9 v9.1.0 - github.com/rs/cors v1.9.0 + github.com/rs/cors v1.10.0 github.com/stretchr/testify v1.8.4 github.com/thoas/go-funk v0.9.3 github.com/twitchtv/twirp v8.1.3+incompatible - github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 + github.com/ua-parser/uap-go v0.0.0-20230823213814-f77b3e91e9dc github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.3.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 @@ -68,6 +68,7 @@ require ( github.com/google/uuid v1.3.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.4 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect @@ -94,12 +95,12 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.10.0 // indirect go.uber.org/zap v1.25.0 // indirect - golang.org/x/crypto v0.12.0 // indirect + golang.org/x/crypto v0.13.0 // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.14.0 // indirect + golang.org/x/net v0.15.0 // indirect golang.org/x/sys v0.12.0 // indirect - golang.org/x/text v0.12.0 // indirect - golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect + golang.org/x/text v0.13.0 // indirect + golang.org/x/tools v0.13.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect google.golang.org/grpc v1.57.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index b43ba3bb2..b9c1c5afd 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,10 @@ github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZn github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= -github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.6 h1:3xi/Cafd1NaoEnS/yDssIiuVeDVywU0QdFGl3aQaQHM= +github.com/hashicorp/golang-lru/v2 v2.0.6/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= @@ -244,8 +246,8 @@ github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0Niuqvtf github.com/redis/go-redis/v9 v9.1.0/go.mod h1:urWj3He21Dj5k4TK1y59xH8Uj6ATueP8AH1cY3lZl4c= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= -github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/cors v1.10.0 h1:62NOS1h+r8p1mW6FM0FSB0exioXLhd/sh15KpjWBZ+8= +github.com/rs/cors v1.10.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= @@ -268,8 +270,8 @@ github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU= github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= -github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3 h1:YsXCA7ZdgFMgwDpNpYj4y2WPRVrOVVDAkQlFc477T54= -github.com/ua-parser/uap-go v0.0.0-20230823192112-f8d2018c34f3/go.mod h1:OBcG9bn7sHtXgarhUEb3OfCnNsgtGnkVf41ilSZ3K3E= +github.com/ua-parser/uap-go v0.0.0-20230823213814-f77b3e91e9dc h1:iT5lwxf894PiMq7cnMMQg/7VOD1pxmu//gQuHWAFy4s= +github.com/ua-parser/uap-go v0.0.0-20230823213814-f77b3e91e9dc/go.mod h1:BUbeWZiieNxAuuADTBNb3/aeje6on3DhU3rpWsQSB1E= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/urfave/negroni/v3 v3.0.0 h1:Vo8CeZfu1lFR9gW8GnAb6dOGCJyijfil9j/jKKc/JhU= @@ -296,10 +298,11 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= -golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ= -golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -333,8 +336,9 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -410,8 +414,9 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= @@ -419,8 +424,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 h1:Vve/L0v7CXXuxUmaMGIEK/dEeq7uiqb5qBgQrZzIE7E= -golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= +golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= From 0fffaf32825dae53d46f9d6dddf29d8d53c46960 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 7 Sep 2023 13:25:09 +0530 Subject: [PATCH 10/13] Some small optimisations (#2042) * WIP commit * WIP commit * WIP commit * Revert unintended delete --- pkg/sfu/downtrack.go | 23 +++++---- pkg/sfu/forwarder.go | 6 +-- pkg/sfu/rtpmunger.go | 108 ++++++++++++++++++++++++------------------- 3 files changed, 77 insertions(+), 60 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 53e6f327e..d48b98460 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -237,6 +237,7 @@ type DownTrack struct { isClosed atomic.Bool connected atomic.Bool bindAndConnectedOnce atomic.Bool + writable atomic.Bool rtpStats *buffer.RTPStats @@ -420,7 +421,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.forwarder.DetermineCodec(d.codec, d.params.Receiver.HeaderExtensions()) d.params.Logger.Debugw("downtrack bound") - d.onBindAndConnected() + d.onBindAndConnectedChange() return codec, nil } @@ -429,6 +430,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, // because a track has been stopped. func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error { d.bound.Store(false) + d.onBindAndConnectedChange() return nil } @@ -600,7 +602,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { return } - if d.connected.Load() { + if d.writable.Load() { d.params.Logger.Debugw("sending PLI for layer lock", "generation", generation, "layer", layer) d.params.Receiver.SendPLI(layer, false) d.rtpStats.UpdateLayerLockPliAndTime(1) @@ -608,7 +610,7 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { <-ticker.C - if generation != d.keyFrameRequestGeneration.Load() || !d.bound.Load() { + if generation != d.keyFrameRequestGeneration.Load() || !d.writable.Load() { return } } @@ -643,7 +645,7 @@ func (d *DownTrack) maxLayerNotifierWorker() { // WriteRTP writes an RTP Packet to the DownTrack func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { - if !d.bound.Load() || !d.connected.Load() { + if !d.writable.Load() { return nil } @@ -720,6 +722,10 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { // WritePaddingRTP tries to write as many padding only RTP packets as necessary // to satisfy given size to the DownTrack func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMarker bool) int { + if !d.writable.Load() { + return 0 + } + if !d.rtpStats.IsActive() && !paddingOnMute { return 0 } @@ -1224,8 +1230,8 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { done := make(chan struct{}) go func() { - // don't send if nothing has been sent - if !d.rtpStats.IsActive() { + // don't send if not writable OR nothing has been sent + if !d.writable.Load() || !d.rtpStats.IsActive() { close(done) return } @@ -1492,7 +1498,7 @@ func (d *DownTrack) handleRTCP(bytes []byte) { func (d *DownTrack) SetConnected() { if !d.connected.Swap(true) { - d.onBindAndConnected() + d.onBindAndConnectedChange() } } @@ -1710,7 +1716,7 @@ func (d *DownTrack) GetAndResetBytesSent() (uint32, uint32) { return d.bytesSent.Swap(0), d.bytesRetransmitted.Swap(0) } -func (d *DownTrack) onBindAndConnected() { +func (d *DownTrack) onBindAndConnectedChange() { if d.connected.Load() && d.bound.Load() && !d.bindAndConnectedOnce.Swap(true) { if d.kind == webrtc.RTPCodecTypeVideo { _, layer := d.forwarder.CheckSync() @@ -1723,6 +1729,7 @@ func (d *DownTrack) onBindAndConnected() { go d.sendPaddingOnMute() } } + d.writable.Store(d.connected.Load() && d.bound.Load()) } func (d *DownTrack) sendPaddingOnMute() { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 0b7320991..49c56bc2f 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1622,9 +1622,6 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // should be called with lock held func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) (*TranslationParams, error) { - if tp == nil { - tp = &TranslationParams{} - } if f.lastSSRC != extPkt.Packet.SSRC { if err := f.processSourceSwitch(extPkt, layer); err != nil { tp.shouldDrop = true @@ -1649,7 +1646,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i // should be called with lock held func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) { - return f.getTranslationParamsCommon(extPkt, layer, nil) + return f.getTranslationParamsCommon(extPkt, layer, &TranslationParams{}) } // should be called with lock held @@ -1661,7 +1658,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in } tp := &TranslationParams{} - if !f.vls.GetTarget().IsValid() { // stream is paused by streamallocator tp.shouldDrop = true diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index c0ce6ae6a..8176cfe81 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -72,9 +72,12 @@ type RTPMunger struct { extLastSN uint64 extSecondLastSN uint64 - extLastTS uint64 - tsOffset uint64 - lastMarker bool + snOffset uint64 + + extLastTS uint64 + tsOffset uint64 + + lastMarker bool extRtxGateSn uint64 isInRtxGateRegion bool @@ -88,12 +91,11 @@ func NewRTPMunger(logger logger.Logger) *RTPMunger { } func (r *RTPMunger) DebugInfo() map[string]interface{} { - snOffset, _ := r.snRangeMap.GetValue(r.extHighestIncomingSN + 1) return map[string]interface{}{ "ExtHighestIncomingSN": r.extHighestIncomingSN, "ExtLastSN": r.extLastSN, "ExtSecondLastSN": r.extSecondLastSN, - "SNOffset": snOffset, + "SNOffset": r.snOffset, "ExtLastTS": r.extLastTS, "TSOffset": r.tsOffset, "LastMarker": r.lastMarker, @@ -116,14 +118,20 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) { func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 + r.extLastSN = extPkt.ExtSequenceNumber r.extSecondLastSN = r.extLastSN - 1 + r.updateSnOffset() + r.extLastTS = extPkt.ExtTimestamp } func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 + r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust) + r.updateSnOffset() + r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust } @@ -148,16 +156,42 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { } r.extLastSN = r.extSecondLastSN + r.updateSnOffset() } func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { diff := int64(extPkt.ExtSequenceNumber - r.extHighestIncomingSN) + if (diff == 1 && len(extPkt.Packet.Payload) != 0) || diff > 1 { + // in-order - either contiguous packet with payload OR packet following a gap, may or may not have payload + r.extHighestIncomingSN = extPkt.ExtSequenceNumber + + ordering := SequenceNumberOrderingContiguous + if diff > 1 { + ordering = SequenceNumberOrderingGap + } + + extMungedSN := extPkt.ExtSequenceNumber - r.snOffset + extMungedTS := extPkt.ExtTimestamp - r.tsOffset + + r.extSecondLastSN = r.extLastSN + r.extLastSN = extMungedSN + r.extLastTS = extMungedTS + r.lastMarker = extPkt.Packet.Marker + + if extPkt.KeyFrame { + r.extRtxGateSn = extMungedSN + r.isInRtxGateRegion = true + } + + if r.isInRtxGateRegion && (extMungedSN-r.extRtxGateSn) > RtxGateWindow { + r.isInRtxGateRegion = false + } - // can get duplicate packet due to FEC - if diff == 0 { return &TranslationParamsRTP{ - snOrdering: SequenceNumberOrderingDuplicate, - }, ErrDuplicatePacket + snOrdering: ordering, + sequenceNumber: uint16(extMungedSN), + timestamp: uint32(extMungedTS), + }, nil } if diff < 0 { @@ -176,53 +210,25 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara }, nil } - ordering := SequenceNumberOrderingContiguous - if diff > 1 { - ordering = SequenceNumberOrderingGap - } - - r.extHighestIncomingSN = extPkt.ExtSequenceNumber - // if padding only packet, can be dropped and sequence number adjusted, if contiguous - if diff == 1 && len(extPkt.Packet.Payload) == 0 { + if diff == 1 { + r.extHighestIncomingSN = extPkt.ExtSequenceNumber + if err := r.snRangeMap.ExcludeRange(r.extHighestIncomingSN, r.extHighestIncomingSN+1); err != nil { r.logger.Errorw("could not exclude range", err, "sn", r.extHighestIncomingSN) } + + r.updateSnOffset() + return &TranslationParamsRTP{ - snOrdering: ordering, + snOrdering: SequenceNumberOrderingContiguous, }, ErrPaddingOnlyPacket } - snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber) - if err != nil { - r.logger.Errorw("could not get sequence number adjustment", err, "sn", extPkt.ExtSequenceNumber, "payloadSize", len(extPkt.Packet.Payload)) - return &TranslationParamsRTP{ - snOrdering: ordering, - }, ErrSequenceNumberOffsetNotFound - } - - extMungedSN := extPkt.ExtSequenceNumber - snOffset - extMungedTS := extPkt.ExtTimestamp - r.tsOffset - - r.extSecondLastSN = r.extLastSN - r.extLastSN = extMungedSN - r.extLastTS = extMungedTS - r.lastMarker = extPkt.Packet.Marker - - if extPkt.KeyFrame { - r.extRtxGateSn = extMungedSN - r.isInRtxGateRegion = true - } - - if r.isInRtxGateRegion && (extMungedSN-r.extRtxGateSn) > RtxGateWindow { - r.isInRtxGateRegion = false - } - + // can get duplicate packet due to FEC return &TranslationParamsRTP{ - snOrdering: ordering, - sequenceNumber: uint16(extMungedSN), - timestamp: uint32(extMungedTS), - }, nil + snOrdering: SequenceNumberOrderingDuplicate, + }, ErrDuplicatePacket } func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 { @@ -297,3 +303,11 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate func (r *RTPMunger) IsOnFrameBoundary() bool { return r.lastMarker } + +func (r *RTPMunger) updateSnOffset() { + snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN + 1) + if err != nil { + r.logger.Errorw("could not get SN offset", err) + } + r.snOffset = snOffset +} From 6c947194871d938718c5b19da67915f46daf8b27 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 7 Sep 2023 14:59:22 +0530 Subject: [PATCH 11/13] Cache extended highest. (#2043) * Cache extended highest. Prevents calculating extended highest on every update to populate PreExtendedHighest in the result. * remove incorrect comment --- pkg/sfu/utils/wraparound.go | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 5dd2b1e93..1b5502c62 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -29,10 +29,11 @@ type extendedNumber interface { type WrapAround[T number, ET extendedNumber] struct { fullRange ET - initialized bool - start T - highest T - cycles ET + initialized bool + start T + highest T + cycles ET + extendedHighest ET } func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] { @@ -47,6 +48,7 @@ func (w *WrapAround[T, ET]) Seed(from *WrapAround[T, ET]) { w.start = from.start w.highest = from.highest w.cycles = from.cycles + w.updateExtendedHighest() } type WrapAroundUpdateResult[ET extendedNumber] struct { @@ -63,15 +65,16 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { w.start = val w.highest = val + w.updateExtendedHighest() w.initialized = true return } - result.PreExtendedHighest = w.GetExtendedHighest() + result.PreExtendedHighest = w.extendedHighest gap := val - w.highest - if gap == 0 || gap > T(w.fullRange>>1) { - // duplicate OR out-of-order + if gap > T(w.fullRange>>1) { + // out-of-order result.IsRestart, result.PreExtendedStart, result.ExtendedVal = w.maybeAdjustStart(val) return } @@ -82,13 +85,15 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { } w.highest = val - result.ExtendedVal = w.getExtendedHighest(w.cycles, val) + w.updateExtendedHighest() + result.ExtendedVal = w.extendedHighest return } func (w *WrapAround[T, ET]) RollbackRestart(ev ET) { if w.isWrapBack(w.start, T(ev)) { w.cycles -= w.fullRange + w.updateExtendedHighest() } w.start = T(ev) } @@ -96,6 +101,7 @@ func (w *WrapAround[T, ET]) RollbackRestart(ev ET) { func (w *WrapAround[T, ET]) ResetHighest(ev ET) { w.highest = T(ev) w.cycles = ev & ^(w.fullRange - 1) + w.updateExtendedHighest() } func (w *WrapAround[T, ET]) GetStart() T { @@ -111,7 +117,11 @@ func (w *WrapAround[T, ET]) GetHighest() T { } func (w *WrapAround[T, ET]) GetExtendedHighest() ET { - return w.getExtendedHighest(w.cycles, w.highest) + return w.extendedHighest +} + +func (w *WrapAround[T, ET]) updateExtendedHighest() { + w.extendedHighest = getExtendedHighest(w.cycles, w.highest) } func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtendedStart ET, extendedVal ET) { @@ -125,7 +135,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended if w.isWrapBack(val, w.highest) { cycles -= w.fullRange } - extendedVal = w.getExtendedHighest(cycles, val) + extendedVal = getExtendedHighest(cycles, val) return } @@ -136,6 +146,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended if w.isWrapBack(val, w.highest) { w.cycles = w.fullRange + w.updateExtendedHighest() cycles = 0 } w.start = val @@ -144,7 +155,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended cycles -= w.fullRange } } - extendedVal = w.getExtendedHighest(cycles, val) + extendedVal = getExtendedHighest(cycles, val) return } @@ -152,6 +163,8 @@ func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool { return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1) } -func (w *WrapAround[T, ET]) getExtendedHighest(cycles ET, val T) ET { +// ------------------------------------ + +func getExtendedHighest[T number, ET extendedNumber](cycles ET, val T) ET { return cycles + ET(val) } From 3babb432c6fd94a07d6d67384c694184565850ba Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 7 Sep 2023 15:58:42 +0530 Subject: [PATCH 12/13] Missed an SN offset update in the previous round (#2044) --- pkg/sfu/rtpmunger.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 8176cfe81..1d902aaf3 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -289,6 +289,7 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate r.extSecondLastSN = extLastSN - 1 r.extLastSN = extLastSN r.snRangeMap.DecValue(uint64(num)) + r.updateSnOffset() r.tsOffset -= extLastTS - r.extLastTS r.extLastTS = extLastTS From b95670f56b1bfe40136629121b09659c22f80775 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 7 Sep 2023 22:22:00 +0530 Subject: [PATCH 13/13] Removing one snapshot in down track. (#2047) Profiling showed updating jitter going through the snapshot maps. With the reduction of one, there should only be one snapshot and hopefully that should gain some cycles back. --- pkg/sfu/buffer/rtpstats.go | 2 +- pkg/sfu/connectionquality/connectionstats.go | 49 +++++++++----------- pkg/sfu/downtrack.go | 36 +++++--------- 3 files changed, 35 insertions(+), 52 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 9bacf08f7..1f0f2694e 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -705,7 +705,7 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 return } -func (r *RTPStats) LastReceiverReport() time.Time { +func (r *RTPStats) LastReceiverReportTime() time.Time { r.lock.RLock() defer r.lock.RUnlock() diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 565eccabb..641290fa8 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -42,6 +42,7 @@ type ConnectionStatsParams struct { GetDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers GetDeltaStatsOverridden func() map[uint32]*buffer.StreamStatsWithLayers GetLastReceiverReportTime func() time.Time + GetTotalPacketsSent func() uint64 Logger logger.Logger } @@ -54,6 +55,7 @@ type ConnectionStats struct { onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat) lock sync.RWMutex + packetsSent uint64 streamingStartedAt time.Time scorer *qualityScorer @@ -213,13 +215,11 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at } func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) { - if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil { + if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil || cs.params.GetTotalPacketsSent == nil { return MinMOS, nil } - cs.lock.RLock() - streamingStartedAt := cs.streamingStartedAt - cs.lock.RUnlock() + streamingStartedAt := cs.updateStreamingStart(at) if streamingStartedAt.IsZero() { // not streaming, just return current score mos, _ := cs.scorer.GetMOSAndQuality() @@ -260,6 +260,11 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, } func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) { + if cs.params.GetDeltaStatsOverridden != nil { + // receiver report based quality scoring, use stats from receiver report for scoring + return cs.updateScoreFromReceiverReport(at) + } + if cs.params.GetDeltaStats == nil { return MinMOS, nil } @@ -275,33 +280,25 @@ func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buf deltaInfoList = append(deltaInfoList, s.RTPStats) } agg := buffer.AggregateRTPDeltaInfo(deltaInfoList) - if agg != nil && agg.Packets > 0 { - // not very accurate as streaming could have started part way in the window, but don't need accurate time - cs.maybeSetStreamingStart(agg.StartTime) - } else { - cs.clearStreamingStart() - } - - if cs.params.GetDeltaStatsOverridden != nil { - // receiver report based quality scoring, use stats from receiver report for scoring - return cs.updateScoreFromReceiverReport(at) - } - return cs.updateScoreWithAggregate(agg, at), streams } -func (cs *ConnectionStats) maybeSetStreamingStart(at time.Time) { +func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time { cs.lock.Lock() - if cs.streamingStartedAt.IsZero() { - cs.streamingStartedAt = at - } - cs.lock.Unlock() -} + defer cs.lock.Unlock() -func (cs *ConnectionStats) clearStreamingStart() { - cs.lock.Lock() - cs.streamingStartedAt = time.Time{} - cs.lock.Unlock() + packetsSent := cs.params.GetTotalPacketsSent() + if packetsSent > cs.packetsSent { + if cs.streamingStartedAt.IsZero() { + // the start could be anywhere after last update, but using `at` as this is not required to be accurate + cs.streamingStartedAt = at + } + } else { + cs.streamingStartedAt = time.Time{} + } + cs.packetsSent = packetsSent + + return cs.streamingStartedAt } func (cs *ConnectionStats) getStat() { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d48b98460..6216fa0d8 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -129,14 +129,13 @@ var ( type DownTrackState struct { RTPStats *buffer.RTPStats - DeltaStatsSnapshotId uint32 DeltaStatsOverriddenSnapshotId uint32 ForwarderState ForwarderState } func (d DownTrackState) String() string { - return fmt.Sprintf("DownTrackState{rtpStats: %s, delta: %d, deltaOverridden: %d, forwarder: %s}", - d.RTPStats.ToString(), d.DeltaStatsSnapshotId, d.DeltaStatsOverriddenSnapshotId, d.ForwarderState.String()) + return fmt.Sprintf("DownTrackState{rtpStats: %s, deltaOverridden: %d, forwarder: %s}", + d.RTPStats.ToString(), d.DeltaStatsOverriddenSnapshotId, d.ForwarderState.String()) } // ------------------------------------------------------------------- @@ -248,7 +247,6 @@ type DownTrack struct { blankFramesGeneration atomic.Uint32 connectionStats *connectionquality.ConnectionStats - deltaStatsSnapshotId uint32 deltaStatsOverriddenSnapshotId uint32 isNACKThrottled atomic.Bool @@ -310,15 +308,14 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { IsReceiverReportDriven: true, Logger: params.Logger, }) - d.deltaStatsSnapshotId = d.rtpStats.NewSnapshotId() d.deltaStatsOverriddenSnapshotId = d.rtpStats.NewSnapshotId() d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change IsFECEnabled: strings.EqualFold(codecs[0].MimeType, webrtc.MimeTypeOpus) && strings.Contains(strings.ToLower(codecs[0].SDPFmtpLine), "fec"), - GetDeltaStats: d.getDeltaStats, GetDeltaStatsOverridden: d.getDeltaStatsOverridden, - GetLastReceiverReportTime: func() time.Time { return d.rtpStats.LastReceiverReport() }, + GetLastReceiverReportTime: func() time.Time { return d.rtpStats.LastReceiverReportTime() }, + GetTotalPacketsSent: func() uint64 { return d.rtpStats.GetTotalPacketsPrimary() }, Logger: params.Logger.WithValues("direction", "down"), }) d.connectionStats.OnStatsUpdate(func(_cs *connectionquality.ConnectionStats, stat *livekit.AnalyticsStat) { @@ -328,7 +325,6 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { }) // set initial playout delay to minimum value - if d.params.PlayoutDelayLimit.GetEnabled() && d.params.PlayoutDelayLimit.GetMin() > 0 { delay := rtpextension.PlayoutDelayFromValue( uint16(d.params.PlayoutDelayLimit.GetMin()), @@ -730,13 +726,11 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa return 0 } - // LK-TODO-START // Ideally should look at header extensions negotiated for // track and decide if padding can be sent. But, browsers behave // in unexpected ways when using audio for bandwidth estimation and // padding is mainly used to probe for excess available bandwidth. // So, to be safe, limit to video tracks - // LK-TODO-END if d.kind == webrtc.RTPCodecTypeAudio { return 0 } @@ -750,6 +744,12 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa return 0 } + // Hold sending padding packets till first RTCP-RR is received for this RTP stream. + // That is definitive proof that the remote side knows about this RTP stream. + if d.rtpStats.LastReceiverReportTime().IsZero() && !paddingOnMute { + return 0 + } + // RTP padding maximum is 255 bytes. Break it up. // Use 20 byte as estimate of RTP header size (12 byte header + 8 byte extension) num := (bytesToSend + RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize - 1) / (RTPPaddingMaxPayloadSize + RTPPaddingEstimatedHeaderSize) @@ -762,16 +762,8 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa return 0 } - // LK-TODO Look at load balancing a la sfu.Receiver to spread across available CPUs bytesSent := 0 for i := 0; i < len(snts); i++ { - // LK-TODO-START - // Hold sending padding packets till first RTCP-RR is received for this RTP stream. - // That is definitive proof that the remote side knows about this RTP stream. - // The packet count check at the beginning of this function gates sending padding - // on as yet unstarted streams which is a reasonable check. - // LK-TODO-END - hdr := rtp.Header{ Version: 2, Padding: true, @@ -812,7 +804,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa bytesSent += hdr.MarshalSize() + len(payload) } - // STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be update in pacer callback + // STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be updated in pacer callback return bytesSent } @@ -979,7 +971,6 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer { func (d *DownTrack) GetState() DownTrackState { dts := DownTrackState{ RTPStats: d.rtpStats, - DeltaStatsSnapshotId: d.deltaStatsSnapshotId, DeltaStatsOverriddenSnapshotId: d.deltaStatsOverriddenSnapshotId, ForwarderState: d.forwarder.GetState(), } @@ -988,7 +979,6 @@ func (d *DownTrack) GetState() DownTrackState { func (d *DownTrack) SeedState(state DownTrackState) { d.rtpStats.Seed(state.RTPStats) - d.deltaStatsSnapshotId = state.DeltaStatsSnapshotId d.deltaStatsOverriddenSnapshotId = state.DeltaStatsOverriddenSnapshotId d.forwarder.SeedState(state.ForwarderState) } @@ -1698,10 +1688,6 @@ func (d *DownTrack) deltaStats(ds *buffer.RTPDeltaInfo) map[uint32]*buffer.Strea return streamStats } -func (d *DownTrack) getDeltaStats() map[uint32]*buffer.StreamStatsWithLayers { - return d.deltaStats(d.rtpStats.DeltaInfo(d.deltaStatsSnapshotId)) -} - func (d *DownTrack) getDeltaStatsOverridden() map[uint32]*buffer.StreamStatsWithLayers { return d.deltaStats(d.rtpStats.DeltaInfoOverridden(d.deltaStatsOverriddenSnapshotId)) }