Files
livekit/pkg/service/server.go
T
Raja Subramanian bb0ee1139c Drive-by misc changes. (#3399)
While working on a different service, noticed delegation of setting up
routes to the service itself. So, making that change and making some
methods internal only as there is no need to export them outside the
service.
2025-02-04 12:32:05 +05:30

400 lines
10 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"runtime"
"runtime/pprof"
"strconv"
"time"
"github.com/pion/turn/v4"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
"github.com/twitchtv/twirp"
"github.com/urfave/negroni/v3"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/version"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
type LivekitServer struct {
config *config.Config
ioService *IOInfoService
rtcService *RTCService
agentService *AgentService
httpServer *http.Server
promServer *http.Server
router routing.Router
roomManager *RoomManager
signalServer *SignalServer
turnServer *turn.Server
currentNode routing.LocalNode
running atomic.Bool
doneChan chan struct{}
closedChan chan struct{}
}
func NewLivekitServer(conf *config.Config,
roomService livekit.RoomService,
agentDispatchService *AgentDispatchService,
egressService *EgressService,
ingressService *IngressService,
sipService *SIPService,
ioService *IOInfoService,
rtcService *RTCService,
agentService *AgentService,
keyProvider auth.KeyProvider,
router routing.Router,
roomManager *RoomManager,
signalServer *SignalServer,
turnServer *turn.Server,
currentNode routing.LocalNode,
) (s *LivekitServer, err error) {
s = &LivekitServer{
config: conf,
ioService: ioService,
rtcService: rtcService,
agentService: agentService,
router: router,
roomManager: roomManager,
signalServer: signalServer,
// turn server starts automatically
turnServer: turnServer,
currentNode: currentNode,
closedChan: make(chan struct{}),
}
middlewares := []negroni.Handler{
// always first
negroni.NewRecovery(),
// CORS is allowed, we rely on token authentication to prevent improper use
cors.New(cors.Options{
AllowOriginFunc: func(origin string) bool {
return true
},
AllowedHeaders: []string{"*"},
// allow preflight to be cached for a day
MaxAge: 86400,
}),
negroni.HandlerFunc(RemoveDoubleSlashes),
}
if keyProvider != nil {
middlewares = append(middlewares, NewAPIKeyAuthMiddleware(keyProvider))
}
serverOptions := []interface{}{
twirp.WithServerHooks(twirp.ChainHooks(
TwirpLogger(),
TwirpRequestStatusReporter(),
)),
}
roomServer := livekit.NewRoomServiceServer(roomService, serverOptions...)
agentDispatchServer := livekit.NewAgentDispatchServiceServer(agentDispatchService, serverOptions...)
egressServer := livekit.NewEgressServer(egressService, serverOptions...)
ingressServer := livekit.NewIngressServer(ingressService, serverOptions...)
sipServer := livekit.NewSIPServer(sipService, serverOptions...)
mux := http.NewServeMux()
if conf.Development {
// pprof handlers are registered onto DefaultServeMux
mux = http.DefaultServeMux
mux.HandleFunc("/debug/goroutine", s.debugGoroutines)
mux.HandleFunc("/debug/rooms", s.debugInfo)
}
mux.Handle(roomServer.PathPrefix(), roomServer)
mux.Handle(agentDispatchServer.PathPrefix(), agentDispatchServer)
mux.Handle(egressServer.PathPrefix(), egressServer)
mux.Handle(ingressServer.PathPrefix(), ingressServer)
mux.Handle(sipServer.PathPrefix(), sipServer)
mux.Handle("/rtc", rtcService)
rtcService.SetupRoutes(mux)
mux.Handle("/agent", agentService)
mux.HandleFunc("/", s.defaultHandler)
s.httpServer = &http.Server{
Handler: configureMiddlewares(mux, middlewares...),
}
if conf.PrometheusPort > 0 {
logger.Warnw("prometheus_port is deprecated, please switch prometheus.port instead", nil)
conf.Prometheus.Port = conf.PrometheusPort
}
if conf.Prometheus.Port > 0 {
promHandler := promhttp.Handler()
if conf.Prometheus.Username != "" && conf.Prometheus.Password != "" {
protectedHandler := negroni.New()
protectedHandler.Use(negroni.HandlerFunc(GenBasicAuthMiddleware(conf.Prometheus.Username, conf.Prometheus.Password)))
protectedHandler.UseHandler(promHandler)
promHandler = protectedHandler
}
s.promServer = &http.Server{
Handler: promHandler,
}
}
if err = router.RemoveDeadNodes(); err != nil {
return
}
return
}
func (s *LivekitServer) Node() *livekit.Node {
return s.currentNode.Clone()
}
func (s *LivekitServer) HTTPPort() int {
return int(s.config.Port)
}
func (s *LivekitServer) IsRunning() bool {
return s.running.Load()
}
func (s *LivekitServer) Start() error {
if s.running.Load() {
return errors.New("already running")
}
s.doneChan = make(chan struct{})
if err := s.router.RegisterNode(); err != nil {
return err
}
defer func() {
if err := s.router.UnregisterNode(); err != nil {
logger.Errorw("could not unregister node", err)
}
}()
if err := s.router.Start(); err != nil {
return err
}
if err := s.ioService.Start(); err != nil {
return err
}
addresses := s.config.BindAddresses
if addresses == nil {
addresses = []string{""}
}
// ensure we could listen
listeners := make([]net.Listener, 0)
promListeners := make([]net.Listener, 0)
for _, addr := range addresses {
ln, err := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Port))))
if err != nil {
return err
}
listeners = append(listeners, ln)
if s.promServer != nil {
ln, err = net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Prometheus.Port))))
if err != nil {
return err
}
promListeners = append(promListeners, ln)
}
}
values := []interface{}{
"portHttp", s.config.Port,
"nodeID", s.currentNode.NodeID(),
"nodeIP", s.currentNode.NodeIP(),
"version", version.Version,
}
if s.config.BindAddresses != nil {
values = append(values, "bindAddresses", s.config.BindAddresses)
}
if s.config.RTC.TCPPort != 0 {
values = append(values, "rtc.portTCP", s.config.RTC.TCPPort)
}
if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort.Valid() {
values = append(values, "rtc.portUDP", s.config.RTC.UDPPort)
} else {
values = append(values,
"rtc.portICERange", []uint32{s.config.RTC.ICEPortRangeStart, s.config.RTC.ICEPortRangeEnd},
)
}
if s.config.Prometheus.Port != 0 {
values = append(values, "portPrometheus", s.config.Prometheus.Port)
}
if s.config.Region != "" {
values = append(values, "region", s.config.Region)
}
logger.Infow("starting LiveKit server", values...)
if runtime.GOOS == "windows" {
logger.Infow("Windows detected, capacity management is unavailable")
}
for _, promLn := range promListeners {
go s.promServer.Serve(promLn)
}
if err := s.signalServer.Start(); err != nil {
return err
}
httpGroup := &errgroup.Group{}
for _, ln := range listeners {
l := ln
httpGroup.Go(func() error {
return s.httpServer.Serve(l)
})
}
go func() {
if err := httpGroup.Wait(); err != http.ErrServerClosed {
logger.Errorw("could not start server", err)
s.Stop(true)
}
}()
go s.backgroundWorker()
// give time for Serve goroutine to start
time.Sleep(100 * time.Millisecond)
s.running.Store(true)
<-s.doneChan
// wait for shutdown
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_ = s.httpServer.Shutdown(ctx)
if s.turnServer != nil {
_ = s.turnServer.Close()
}
s.roomManager.Stop()
s.signalServer.Stop()
s.ioService.Stop()
close(s.closedChan)
return nil
}
func (s *LivekitServer) Stop(force bool) {
// wait for all participants to exit
s.router.Drain()
partTicker := time.NewTicker(5 * time.Second)
waitingForParticipants := !force && s.roomManager.HasParticipants()
for waitingForParticipants {
<-partTicker.C
logger.Infow("waiting for participants to exit")
waitingForParticipants = s.roomManager.HasParticipants()
}
partTicker.Stop()
if !s.running.Swap(false) {
return
}
s.router.Stop()
close(s.doneChan)
// wait for fully closed
<-s.closedChan
}
func (s *LivekitServer) RoomManager() *RoomManager {
return s.roomManager
}
func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, _ *http.Request) {
_ = pprof.Lookup("goroutine").WriteTo(w, 2)
}
func (s *LivekitServer) debugInfo(w http.ResponseWriter, _ *http.Request) {
s.roomManager.lock.RLock()
info := make([]map[string]interface{}, 0, len(s.roomManager.rooms))
for _, room := range s.roomManager.rooms {
info = append(info, room.DebugInfo())
}
s.roomManager.lock.RUnlock()
b, err := json.Marshal(info)
if err != nil {
w.WriteHeader(400)
_, _ = w.Write([]byte(err.Error()))
} else {
_, _ = w.Write(b)
}
}
func (s *LivekitServer) defaultHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
s.healthCheck(w, r)
} else {
http.NotFound(w, r)
}
}
func (s *LivekitServer) healthCheck(w http.ResponseWriter, _ *http.Request) {
var updatedAt time.Time
if s.Node().Stats != nil {
updatedAt = time.Unix(s.Node().Stats.UpdatedAt, 0)
}
if time.Since(updatedAt) > 4*time.Second {
w.WriteHeader(http.StatusNotAcceptable)
_, _ = w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt)))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}
// worker to perform periodic tasks per node
func (s *LivekitServer) backgroundWorker() {
roomTicker := time.NewTicker(1 * time.Second)
defer roomTicker.Stop()
for {
select {
case <-s.doneChan:
return
case <-roomTicker.C:
s.roomManager.CloseIdleRooms()
}
}
}
func configureMiddlewares(handler http.Handler, middlewares ...negroni.Handler) *negroni.Negroni {
n := negroni.New()
for _, m := range middlewares {
n.Use(m)
}
n.UseHandler(handler)
return n
}