diff --git a/go.mod b/go.mod index fff0ea1dc..8418df750 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.2 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 + github.com/livekit/mediatransportutil v0.0.0-20230511025422-058ebf6b48c9 github.com/livekit/protocol v1.5.7-0.20230510002113-cadccd54108e github.com/livekit/psrpc v0.3.1-0.20230502152150-df9dd21fba11 github.com/mackerelio/go-osstat v0.2.4 @@ -31,7 +31,6 @@ require ( github.com/pion/rtcp v1.2.10 github.com/pion/rtp v1.7.13 github.com/pion/sdp/v3 v3.0.6 - github.com/pion/stun v0.4.0 github.com/pion/transport/v2 v2.2.0 github.com/pion/turn/v2 v2.1.0 github.com/pion/webrtc/v3 v3.2.1 @@ -82,6 +81,7 @@ require ( github.com/pion/randutil v0.1.0 // indirect github.com/pion/sctp v1.8.7 // indirect github.com/pion/srtp/v2 v2.0.12 // indirect + github.com/pion/stun v0.4.0 // indirect github.com/pion/udp/v2 v2.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect diff --git a/go.sum b/go.sum index e52434e66..c2fb05eda 100644 --- a/go.sum +++ b/go.sum @@ -119,8 +119,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 h1:QlQFyMwCDgjyySsrgmrMcVbEBA6KZcyTzvK+z346tUA= -github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26/go.mod h1:eDA41kiySZoG+wy4Etsjb3w0jjLx69i/vAmSjG4bteA= +github.com/livekit/mediatransportutil v0.0.0-20230511025422-058ebf6b48c9 h1:aqivx5Tal2Fa6z1ZQrrBk/vYShosQx3ecl1aMwcQRV0= +github.com/livekit/mediatransportutil v0.0.0-20230511025422-058ebf6b48c9/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34= github.com/livekit/protocol v1.5.7-0.20230510002113-cadccd54108e h1:0F3RjOkUS71P2ODHI5ZW09Cbrr6A0Pg8vCFFy6gcqkk= github.com/livekit/protocol v1.5.7-0.20230510002113-cadccd54108e/go.mod h1:vjGsR1YxXnN5BLS0yr/YjGnJOPrS0ymddCF3JwxSHGM= github.com/livekit/psrpc v0.3.1-0.20230502152150-df9dd21fba11 h1:VS23iVQu/TNiLEM5XjbBSY28+B6nSewjKWPDbieg0Ho= diff --git a/pkg/config/config.go b/pkg/config/config.go index b379140f6..a9f61aad4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -13,16 +13,12 @@ import ( "github.com/urfave/cli/v2" "gopkg.in/yaml.v3" + "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/logger/pionlogger" redisLiveKit "github.com/livekit/protocol/redis" ) -var DefaultStunServers = []string{ - "stun.l.google.com:19302", - "stun1.l.google.com:19302", -} - type CongestionControlProbeMode string type StreamTrackerType string @@ -71,21 +67,9 @@ type Config struct { } type RTCConfig struct { - UDPPort uint32 `yaml:"udp_port,omitempty"` - TCPPort uint32 `yaml:"tcp_port,omitempty"` - ICEPortRangeStart uint32 `yaml:"port_range_start,omitempty"` - ICEPortRangeEnd uint32 `yaml:"port_range_end,omitempty"` - NodeIP string `yaml:"node_ip,omitempty"` - NodeIPAutoGenerated bool `yaml:"-"` - STUNServers []string `yaml:"stun_servers,omitempty"` - TURNServers []TURNServer `yaml:"turn_servers,omitempty"` - UseExternalIP bool `yaml:"use_external_ip"` - UseICELite bool `yaml:"use_ice_lite,omitempty"` - Interfaces InterfacesConfig `yaml:"interfaces,omitempty"` - IPs IPsConfig `yaml:"ips,omitempty"` - EnableLoopbackCandidate bool `yaml:"enable_loopback_candidate"` - UseMDNS bool `yaml:"use_mdns,omitempty"` - StrictACKs bool `yaml:"strict_acks,omitempty"` + rtcconfig.RTCConfig `yaml:",inline"` + + StrictACKs bool `yaml:"strict_acks,omitempty"` // Number of packets to buffer for NACK PacketBufferSize int `yaml:"packet_buffer_size,omitempty"` @@ -98,9 +82,6 @@ type RTCConfig struct { // allow TCP and TURN/TLS fallback AllowTCPFallback *bool `yaml:"allow_tcp_fallback,omitempty"` - // for testing, disable UDP - ForceTCP bool `yaml:"force_tcp,omitempty"` - // force a reconnect on a publication error ReconnectOnPublicationError *bool `yaml:"reconnect_on_publication_error,omitempty"` @@ -111,14 +92,6 @@ type RTCConfig struct { AllowTimestampAdjustment *bool `yaml:"allow_timestamp_adjustment,omitempty"` } -type TURNServer struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - Protocol string `yaml:"protocol"` - Username string `yaml:"username,omitempty"` - Credential string `yaml:"credential,omitempty"` -} - type PLIThrottleConfig struct { LowQuality time.Duration `yaml:"low_quality,omitempty"` MidQuality time.Duration `yaml:"mid_quality,omitempty"` @@ -133,16 +106,6 @@ type CongestionControlConfig struct { MinChannelCapacity int64 `yaml:"min_channel_capacity,omitempty"` } -type InterfacesConfig struct { - Includes []string `yaml:"includes,omitempty"` - Excludes []string `yaml:"excludes,omitempty"` -} - -type IPsConfig struct { - Includes []string `yaml:"includes,omitempty"` - Excludes []string `yaml:"excludes,omitempty"` -} - type AudioConfig struct { // minimum level to be considered active, 0-127, where 0 is loudest ActiveLevel uint8 `yaml:"active_level,omitempty"` @@ -281,14 +244,16 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c conf := &Config{ Port: 7880, RTC: RTCConfig{ - UseExternalIP: false, - TCPPort: 7881, - UDPPort: 0, - ICEPortRangeStart: 0, - ICEPortRangeEnd: 0, - STUNServers: []string{}, - PacketBufferSize: 500, - StrictACKs: true, + RTCConfig: rtcconfig.RTCConfig{ + UseExternalIP: false, + TCPPort: 7881, + UDPPort: 0, + ICEPortRangeStart: 0, + ICEPortRangeEnd: 0, + STUNServers: []string{}, + }, + PacketBufferSize: 500, + StrictACKs: true, PLIThrottle: PLIThrottleConfig{ LowQuality: 500 * time.Millisecond, MidQuality: time.Second, @@ -426,6 +391,10 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c } } + if err := conf.RTC.Validate(conf.Development); err != nil { + return nil, fmt.Errorf("could not validate RTC config: %v", err) + } + if c != nil { if err := conf.updateFromCLI(c, baseFlags); err != nil { return nil, err @@ -439,17 +408,6 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c } conf.KeyFile = file - // set defaults for ports if none are set - if conf.RTC.UDPPort == 0 && conf.RTC.ICEPortRangeStart == 0 { - // to make it easier to run in dev mode/docker, default to single port - if conf.Development { - conf.RTC.UDPPort = 7882 - } else { - conf.RTC.ICEPortRangeStart = 50000 - conf.RTC.ICEPortRangeEnd = 60000 - } - } - // set defaults for Turn relay if none are set if conf.TURN.RelayPortRangeStart == 0 || conf.TURN.RelayPortRangeEnd == 0 { // to make it easier to run in dev mode/docker, default to two ports @@ -462,14 +420,6 @@ func NewConfig(confString string, strictMode bool, c *cli.Context, baseFlags []c } } - if conf.RTC.NodeIP == "" { - conf.RTC.NodeIP, err = conf.determineIP() - if err != nil { - return nil, err - } - conf.RTC.NodeIPAutoGenerated = true - } - if conf.LogLevel != "" { conf.Logging.Level = conf.LogLevel } @@ -517,13 +467,22 @@ func (conf *Config) ToCLIFlagNames(existingFlags []cli.Flag) map[string]reflect. for i := 0; i < currNode.TypeNode.NumField(); i++ { // inspect yaml tag from struct field to get path field := currNode.TypeNode.Type().Field(i) - yamlTag := strings.SplitN(field.Tag.Get("yaml"), ",", 2)[0] - if yamlTag == "" || yamlTag == "-" { + yamlTagArray := strings.SplitN(field.Tag.Get("yaml"), ",", 2) + yamlTag := yamlTagArray[0] + isInline := false + if len(yamlTagArray) > 1 && yamlTagArray[1] == "inline" { + isInline = true + } + if (yamlTag == "" && (!isInline || currNode.TagPrefix == "")) || yamlTag == "-" { continue } yamlPath := yamlTag if currNode.TagPrefix != "" { - yamlPath = fmt.Sprintf("%s.%s", currNode.TagPrefix, yamlTag) + if isInline { + yamlPath = currNode.TagPrefix + } else { + yamlPath = fmt.Sprintf("%s.%s", currNode.TagPrefix, yamlTag) + } } if existingFlagNames[yamlPath] { continue diff --git a/pkg/config/ip.go b/pkg/config/ip.go deleted file mode 100644 index e854c8e2d..000000000 --- a/pkg/config/ip.go +++ /dev/null @@ -1,200 +0,0 @@ -package config - -import ( - "context" - "fmt" - "net" - "time" - - "github.com/pion/stun" - "github.com/pkg/errors" - - "github.com/livekit/protocol/logger" -) - -func (conf *Config) determineIP() (string, error) { - if conf.RTC.UseExternalIP { - stunServers := conf.RTC.STUNServers - if len(stunServers) == 0 { - stunServers = DefaultStunServers - } - var err error - for i := 0; i < 3; i++ { - var ip string - ip, err = GetExternalIP(context.Background(), stunServers, nil) - if err == nil { - return ip, nil - } else { - time.Sleep(500 * time.Millisecond) - } - } - return "", errors.Errorf("could not resolve external IP: %v", err) - } - - // use local ip instead - addresses, err := GetLocalIPAddresses(false) - if len(addresses) > 0 { - return addresses[0], err - } - return "", err -} - -func GetLocalIPAddresses(includeLoopback bool) ([]string, error) { - ifaces, err := net.Interfaces() - if err != nil { - return nil, err - } - loopBacks := make([]string, 0) - addresses := make([]string, 0) - for _, iface := range ifaces { - addrs, err := iface.Addrs() - if err != nil { - continue - } - for _, addr := range addrs { - var ip net.IP - switch typedAddr := addr.(type) { - case *net.IPNet: - ip = typedAddr.IP.To4() - case *net.IPAddr: - ip = typedAddr.IP.To4() - default: - continue - } - if ip == nil { - continue - } - if ip.IsLoopback() { - loopBacks = append(loopBacks, ip.String()) - } else { - addresses = append(addresses, ip.String()) - } - } - } - - if includeLoopback { - addresses = append(addresses, loopBacks...) - } - - if len(addresses) > 0 { - return addresses, nil - } - if len(loopBacks) > 0 { - return loopBacks, nil - } - return nil, fmt.Errorf("could not find local IP address") -} - -// GetExternalIP return external IP for localAddr from stun server. If localAddr is nil, a local address is chosen automatically, -// else the address will be used to validate the external IP is accessible from the outside. -func GetExternalIP(ctx context.Context, stunServers []string, localAddr net.Addr) (string, error) { - if len(stunServers) == 0 { - return "", errors.New("STUN servers are required but not defined") - } - dialer := &net.Dialer{ - LocalAddr: localAddr, - } - conn, err := dialer.Dial("udp4", stunServers[0]) - if err != nil { - return "", err - } - c, err := stun.NewClient(conn) - if err != nil { - return "", err - } - defer c.Close() - - message, err := stun.Build(stun.TransactionID, stun.BindingRequest) - if err != nil { - return "", err - } - - var stunErr error - // sufficiently large buffer to not block it - ipChan := make(chan string, 20) - err = c.Start(message, func(res stun.Event) { - if res.Error != nil { - stunErr = res.Error - return - } - - var xorAddr stun.XORMappedAddress - if err := xorAddr.GetFrom(res.Message); err != nil { - stunErr = err - return - } - ip := xorAddr.IP.To4() - if ip != nil { - ipChan <- ip.String() - } - }) - if err != nil { - return "", err - } - - ctx1, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - select { - case nodeIP := <-ipChan: - if localAddr == nil { - return nodeIP, nil - } - _ = c.Close() - return nodeIP, validateExternalIP(ctx1, nodeIP, localAddr.(*net.UDPAddr)) - case <-ctx1.Done(): - msg := "could not determine public IP" - if stunErr != nil { - return "", errors.Wrap(stunErr, msg) - } else { - return "", fmt.Errorf(msg) - } - } -} - -// validateExternalIP validates that the external IP is accessible from the outside by listen the local address, -// it will send a magic string to the external IP and check the string is received by the local address. -func validateExternalIP(ctx context.Context, nodeIP string, addr *net.UDPAddr) error { - srv, err := net.ListenUDP("udp", addr) - if err != nil { - return err - } - defer srv.Close() - - magicString := "9#B8D2Nvg2xg5P$ZRwJ+f)*^Nne6*W3WamGY" - - validCh := make(chan struct{}) - go func() { - buf := make([]byte, 1024) - for { - n, err := srv.Read(buf) - if err != nil { - logger.Debugw("error reading from UDP socket", "err", err) - return - } - if string(buf[:n]) == magicString { - close(validCh) - return - } - } - }() - - cli, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(nodeIP), Port: srv.LocalAddr().(*net.UDPAddr).Port}) - if err != nil { - return err - } - defer cli.Close() - - if _, err = cli.Write([]byte(magicString)); err != nil { - return err - } - - ctx1, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - select { - case <-validCh: - return nil - case <-ctx1.Done(): - break - } - return fmt.Errorf("could not validate external IP") -} diff --git a/pkg/rtc/config.go b/pkg/rtc/config.go index b87ed857f..f3dbc2402 100644 --- a/pkg/rtc/config.go +++ b/pkg/rtc/config.go @@ -1,43 +1,26 @@ package rtc import ( - "context" - "errors" - "fmt" - "math/rand" - "net" - "strings" - "sync" - "time" - - "github.com/pion/ice/v2" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/sfu/buffer" dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/logger/pionlogger" + "github.com/livekit/mediatransportutil/pkg/rtcconfig" ) const ( - minUDPBufferSize = 5_000_000 - defaultUDPBufferSize = 16_777_216 - frameMarking = "urn:ietf:params:rtp-hdrext:framemarking" + frameMarking = "urn:ietf:params:rtp-hdrext:framemarking" ) type WebRTCConfig struct { - Configuration webrtc.Configuration - SettingEngine webrtc.SettingEngine - Receiver ReceiverConfig - BufferFactory *buffer.Factory - UDPMux ice.UDPMux - TCPMuxListener *net.TCPListener - Publisher DirectionConfig - Subscriber DirectionConfig - NAT1To1IPs []string - UseMDNS bool + rtcconfig.WebRTCConfig + + BufferFactory *buffer.Factory + Receiver ReceiverConfig + Publisher DirectionConfig + Subscriber DirectionConfig } type ReceiverConfig struct { @@ -60,138 +43,18 @@ type DirectionConfig struct { StrictACKs bool } -const ( - // number of packets to buffer up - readBufferSize = 50 - - writeBufferSizeInBytes = 4 * 1024 * 1024 -) - -func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, error) { +func NewWebRTCConfig(conf *config.Config) (*WebRTCConfig, error) { rtcConf := conf.RTC - c := webrtc.Configuration{ - SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, - } - s := webrtc.SettingEngine{ - LoggerFactory: pionlogger.NewLoggerFactory(logger.GetLogger()), - } - var ifFilter func(string) bool - if len(rtcConf.Interfaces.Includes) != 0 || len(rtcConf.Interfaces.Excludes) != 0 { - ifFilter = InterfaceFilterFromConf(rtcConf.Interfaces) - s.SetInterfaceFilter(ifFilter) - } - - var ipFilter func(net.IP) bool - if len(rtcConf.IPs.Includes) != 0 || len(rtcConf.IPs.Excludes) != 0 { - filter, err := IPFilterFromConf(rtcConf.IPs) - if err != nil { - return nil, err - } - ipFilter = filter - s.SetIPFilter(filter) - } - - if !rtcConf.UseMDNS { - s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled) - } - - var nat1to1IPs []string - // force it to the node IPs that the user has set - if externalIP != "" && (conf.RTC.UseExternalIP || (conf.RTC.NodeIP != "" && !conf.RTC.NodeIPAutoGenerated)) { - if conf.RTC.UseExternalIP { - ips, err := getNAT1to1IPsForConf(conf, ipFilter) - if err != nil { - return nil, err - } - if len(ips) == 0 { - logger.Infow("no external IPs found, using node IP for NAT1To1Ips", "ip", externalIP) - s.SetNAT1To1IPs([]string{externalIP}, webrtc.ICECandidateTypeHost) - } else { - logger.Infow("using external IPs", "ips", ips) - s.SetNAT1To1IPs(ips, webrtc.ICECandidateTypeHost) - } - nat1to1IPs = ips - } else { - s.SetNAT1To1IPs([]string{externalIP}, webrtc.ICECandidateTypeHost) - } + webRTCConfig, err := rtcconfig.NewWebRTCConfig(&rtcConf.RTCConfig, conf.Development) + if err != nil { + return nil, err } if rtcConf.PacketBufferSize == 0 { rtcConf.PacketBufferSize = 500 } - var udpMux ice.UDPMux - var err error - networkTypes := make([]webrtc.NetworkType, 0, 4) - - if !rtcConf.ForceTCP { - networkTypes = append(networkTypes, - webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6, - ) - if rtcConf.ICEPortRangeStart != 0 && rtcConf.ICEPortRangeEnd != 0 { - if err := s.SetEphemeralUDPPortRange(uint16(rtcConf.ICEPortRangeStart), uint16(rtcConf.ICEPortRangeEnd)); err != nil { - return nil, err - } - } else if rtcConf.UDPPort != 0 { - opts := []ice.UDPMuxFromPortOption{ - ice.UDPMuxFromPortWithReadBufferSize(defaultUDPBufferSize), - ice.UDPMuxFromPortWithWriteBufferSize(defaultUDPBufferSize), - ice.UDPMuxFromPortWithLogger(s.LoggerFactory.NewLogger("udp_mux")), - } - if rtcConf.EnableLoopbackCandidate { - opts = append(opts, ice.UDPMuxFromPortWithLoopback()) - } - if ipFilter != nil { - opts = append(opts, ice.UDPMuxFromPortWithIPFilter(ipFilter)) - } - if ifFilter != nil { - opts = append(opts, ice.UDPMuxFromPortWithInterfaceFilter(ifFilter)) - } - udpMux, err := ice.NewMultiUDPMuxFromPort(int(rtcConf.UDPPort), opts...) - if err != nil { - return nil, err - } - - s.SetICEUDPMux(udpMux) - if !conf.Development { - checkUDPReadBuffer() - } - } - } - - // use TCP mux when it's set - var tcpListener *net.TCPListener - if rtcConf.TCPPort != 0 { - networkTypes = append(networkTypes, - webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6, - ) - tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{ - Port: int(rtcConf.TCPPort), - }) - if err != nil { - return nil, err - } - - tcpMux := ice.NewTCPMuxDefault(ice.TCPMuxParams{ - Logger: s.LoggerFactory.NewLogger("tcp_mux"), - Listener: tcpListener, - ReadBufferSize: readBufferSize, - WriteBufferSize: writeBufferSizeInBytes, - }) - - s.SetICETCPMux(tcpMux) - } - - if len(networkTypes) == 0 { - return nil, errors.New("TCP is forced but not configured") - } - s.SetNetworkTypes(networkTypes) - - if rtcConf.EnableLoopbackCandidate { - s.SetIncludeLoopbackCandidate(true) - } - // publisher configuration publisherConfig := DirectionConfig{ StrictACKs: true, // publisher is dialed, and will always reply with ACK @@ -244,32 +107,13 @@ func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, err subscriberConfig.RTCPFeedback.Video = append(subscriberConfig.RTCPFeedback.Video, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBGoogREMB}) } - if rtcConf.UseICELite { - s.SetLite(true) - } else if rtcConf.NodeIP == "" && !rtcConf.UseExternalIP { - // use STUN servers for server to support NAT - // when deployed in production, we expect UseExternalIP to be used, and ports accessible - // this is not compatible with ICE Lite - // Do not automatically add STUN servers if nodeIP is set - if len(rtcConf.STUNServers) > 0 { - c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(rtcConf.STUNServers)} - } else { - c.ICEServers = []webrtc.ICEServer{iceServerForStunServers(config.DefaultStunServers)} - } - } - return &WebRTCConfig{ - Configuration: c, - SettingEngine: s, + WebRTCConfig: *webRTCConfig, Receiver: ReceiverConfig{ PacketBufferSize: rtcConf.PacketBufferSize, }, - UDPMux: udpMux, - TCPMuxListener: tcpListener, - Publisher: publisherConfig, - Subscriber: subscriberConfig, - NAT1To1IPs: nat1to1IPs, - UseMDNS: rtcConf.UseMDNS, + Publisher: publisherConfig, + Subscriber: subscriberConfig, }, nil } @@ -277,190 +121,3 @@ func (c *WebRTCConfig) SetBufferFactory(factory *buffer.Factory) { c.BufferFactory = factory c.SettingEngine.BufferFactory = factory.GetOrNew } - -func iceServerForStunServers(servers []string) webrtc.ICEServer { - iceServer := webrtc.ICEServer{} - for _, stunServer := range servers { - iceServer.URLs = append(iceServer.URLs, fmt.Sprintf("stun:%s", stunServer)) - } - return iceServer -} - -func getNAT1to1IPsForConf(conf *config.Config, ipFilter func(net.IP) bool) ([]string, error) { - stunServers := conf.RTC.STUNServers - if len(stunServers) == 0 { - stunServers = config.DefaultStunServers - } - localIPs, err := config.GetLocalIPAddresses(conf.RTC.EnableLoopbackCandidate) - if err != nil { - return nil, err - } - type ipmapping struct { - externalIP string - localIP string - } - addrCh := make(chan ipmapping, len(localIPs)) - - var udpPorts []int - if conf.RTC.ICEPortRangeStart != 0 && conf.RTC.ICEPortRangeEnd != 0 { - portRangeStart, portRangeEnd := uint16(conf.RTC.ICEPortRangeStart), uint16(conf.RTC.ICEPortRangeEnd) - for i := 0; i < 5; i++ { - udpPorts = append(udpPorts, rand.Intn(int(portRangeEnd-portRangeStart))+int(portRangeStart)) - } - } else if conf.RTC.UDPPort != 0 { - udpPorts = append(udpPorts, int(conf.RTC.UDPPort)) - } else { - udpPorts = append(udpPorts, 0) - } - - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) - for _, ip := range localIPs { - if ipFilter != nil && !ipFilter(net.ParseIP(ip)) { - continue - } - - wg.Add(1) - go func(localIP string) { - defer wg.Done() - for _, port := range udpPorts { - addr, err := config.GetExternalIP(ctx, stunServers, &net.UDPAddr{IP: net.ParseIP(localIP), Port: port}) - if err != nil { - if strings.Contains(err.Error(), "address already in use") { - logger.Debugw("failed to get external ip, address already in use", "local", localIP, "port", port) - continue - } - logger.Infow("failed to get external ip", "local", localIP, "err", err) - return - } - addrCh <- ipmapping{externalIP: addr, localIP: localIP} - return - } - logger.Infow("failed to get external ip after all ports tried", "local", localIP, "ports", udpPorts) - }(ip) - } - - var firstResolved bool - natMapping := make(map[string]string) - timeout := time.NewTimer(5 * time.Second) - defer timeout.Stop() - -done: - for { - select { - case mapping := <-addrCh: - if !firstResolved { - firstResolved = true - timeout.Reset(1 * time.Second) - } - if local, ok := natMapping[mapping.externalIP]; ok { - logger.Infow("external ip already solved, ignore duplicate", - "external", mapping.externalIP, - "local", local, - "ignore", mapping.localIP) - } else { - natMapping[mapping.externalIP] = mapping.localIP - } - - case <-timeout.C: - break done - } - } - cancel() - wg.Wait() - - if len(natMapping) == 0 { - // no external ip resolved - return nil, nil - } - - // mapping unresolved local ip to itself - for _, local := range localIPs { - var found bool - for _, localIPMapping := range natMapping { - if local == localIPMapping { - found = true - break - } - } - if !found { - natMapping[local] = local - } - } - - nat1to1IPs := make([]string, 0, len(natMapping)) - for external, local := range natMapping { - nat1to1IPs = append(nat1to1IPs, fmt.Sprintf("%s/%s", external, local)) - } - return nat1to1IPs, nil -} - -func InterfaceFilterFromConf(ifs config.InterfacesConfig) func(string) bool { - includes := ifs.Includes - excludes := ifs.Excludes - return func(s string) bool { - // filter by include interfaces - if len(includes) > 0 { - for _, iface := range includes { - if iface == s { - return true - } - } - return false - } - - // filter by exclude interfaces - if len(excludes) > 0 { - for _, iface := range excludes { - if iface == s { - return false - } - } - } - return true - } -} - -func IPFilterFromConf(ips config.IPsConfig) (func(ip net.IP) bool, error) { - var ipnets [2][]*net.IPNet - var err error - for i, ips := range [][]string{ips.Includes, ips.Excludes} { - ipnets[i], err = func(fromIPs []string) ([]*net.IPNet, error) { - var toNets []*net.IPNet - for _, ip := range fromIPs { - _, ipnet, err := net.ParseCIDR(ip) - if err != nil { - return nil, err - } - toNets = append(toNets, ipnet) - } - return toNets, nil - }(ips) - - if err != nil { - return nil, err - } - } - - includes, excludes := ipnets[0], ipnets[1] - - return func(ip net.IP) bool { - if len(includes) > 0 { - for _, ipn := range includes { - if ipn.Contains(ip) { - return true - } - } - return false - } - - if len(excludes) > 0 { - for _, ipn := range excludes { - if ipn.Contains(ip) { - return false - } - } - } - return true - }, nil -} diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 1b2dadefb..adbe5862c 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -649,7 +649,7 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p // disable mux, it doesn't play too well with unit test conf.RTC.UDPPort = 0 conf.RTC.TCPPort = 0 - rtcConf, err := NewWebRTCConfig(conf, "") + rtcConf, err := NewWebRTCConfig(conf) if err != nil { panic(err) } diff --git a/pkg/rtc/rtc_unix.go b/pkg/rtc/rtc_unix.go deleted file mode 100644 index 3b357a21f..000000000 --- a/pkg/rtc/rtc_unix.go +++ /dev/null @@ -1,40 +0,0 @@ -//go:build !windows -// +build !windows - -package rtc - -import ( - "net" - "syscall" - - "github.com/livekit/protocol/logger" -) - -func checkUDPReadBuffer() { - val, err := getUDPReadBuffer() - if err == nil { - if val < minUDPBufferSize { - logger.Warnw("UDP receive buffer is too small for a production set-up", nil, - "current", val, - "suggested", minUDPBufferSize) - } else { - logger.Debugw("UDP receive buffer size", "current", val) - } - } -} - -func getUDPReadBuffer() (int, error) { - conn, err := net.ListenUDP("udp4", nil) - if err != nil { - return 0, err - } - defer func() { _ = conn.Close() }() - _ = conn.SetReadBuffer(defaultUDPBufferSize) - fd, err := conn.File() - if err != nil { - return 0, nil - } - defer func() { _ = fd.Close() }() - - return syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF) -} diff --git a/pkg/rtc/rtc_windows.go b/pkg/rtc/rtc_windows.go deleted file mode 100644 index beeab22cf..000000000 --- a/pkg/rtc/rtc_windows.go +++ /dev/null @@ -1,7 +0,0 @@ -//go:build windows -// +build windows - -package rtc - -func checkUDPReadBuffer() { -} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index c9986e92a..061446b00 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -11,6 +11,7 @@ import ( "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/version" + "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -67,7 +68,7 @@ func NewLocalRoomManager( egressLauncher rtc.EgressLauncher, versionGenerator utils.TimedVersionGenerator, ) (*RoomManager, error) { - rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) + rtcConf, err := rtc.NewWebRTCConfig(conf) if err != nil { return nil, err } @@ -687,7 +688,7 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room, tlsOnly bool) []*livek } if !hasSTUN { - iceServers = append(iceServers, iceServerForStunServers(config.DefaultStunServers)) + iceServers = append(iceServers, iceServerForStunServers(rtcconfig.DefaultStunServers)) } return iceServers } diff --git a/test/client/client.go b/test/client/client.go index 76a347e87..384cdff79 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -19,6 +19,7 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" + "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -120,7 +121,9 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { c.ctx, c.cancel = context.WithCancel(context.Background()) conf := rtc.WebRTCConfig{ - Configuration: rtcConf, + WebRTCConfig: rtcconfig.WebRTCConfig{ + Configuration: rtcConf, + }, } conf.SettingEngine.SetLite(false) conf.SettingEngine.SetAnsweringDTLSRole(webrtc.DTLSRoleClient)