Files
livekit/pkg/service/server.go
cnderrauber 8ae564065e generate & log egressID for start egress request (#4303)
* generate & log egressID for start egress request

* seprate method

* use ctx value instead of metadata

* comment
2026-02-10 09:34:56 +08:00

411 lines
10 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"runtime"
"runtime/pprof"
"strconv"
"time"
"github.com/pion/turn/v4"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
"github.com/twitchtv/twirp"
"github.com/urfave/negroni/v3"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/xtwirp"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/version"
)
type LivekitServer struct {
config *config.Config
ioService *IOInfoService
rtcService *RTCService
whipService *WHIPService
agentService *AgentService
httpServer *http.Server
promServer *http.Server
router routing.Router
roomManager *RoomManager
signalServer *SignalServer
turnServer *turn.Server
currentNode routing.LocalNode
running atomic.Bool
doneChan chan struct{}
closedChan chan struct{}
}
func NewLivekitServer(conf *config.Config,
roomService livekit.RoomService,
agentDispatchService *AgentDispatchService,
egressService *EgressService,
ingressService *IngressService,
sipService *SIPService,
ioService *IOInfoService,
rtcService *RTCService,
whipService *WHIPService,
agentService *AgentService,
keyProvider auth.KeyProvider,
router routing.Router,
roomManager *RoomManager,
signalServer *SignalServer,
turnServer *turn.Server,
currentNode routing.LocalNode,
) (s *LivekitServer, err error) {
s = &LivekitServer{
config: conf,
ioService: ioService,
rtcService: rtcService,
whipService: whipService,
agentService: agentService,
router: router,
roomManager: roomManager,
signalServer: signalServer,
// turn server starts automatically
turnServer: turnServer,
currentNode: currentNode,
closedChan: make(chan struct{}),
}
middlewares := []negroni.Handler{
// always first
negroni.NewRecovery(),
// CORS is allowed, we rely on token authentication to prevent improper use
cors.New(cors.Options{
AllowOriginFunc: func(origin string) bool {
return true
},
AllowedMethods: []string{"OPTIONS", "HEAD", "GET", "POST", "PATCH", "DELETE"},
AllowedHeaders: []string{"*"},
ExposedHeaders: []string{"*"},
// allow preflight to be cached for a day
MaxAge: 86400,
}),
negroni.HandlerFunc(RemoveDoubleSlashes),
}
if keyProvider != nil {
middlewares = append(middlewares, NewAPIKeyAuthMiddleware(keyProvider))
}
serverOptions := []any{
twirp.WithServerHooks(twirp.ChainHooks(
TwirpLogger(),
TwirpEgressID(),
TwirpRequestStatusReporter(),
)),
}
for _, opt := range xtwirp.DefaultServerOptions() {
serverOptions = append(serverOptions, opt)
}
roomServer := livekit.NewRoomServiceServer(roomService, serverOptions...)
agentDispatchServer := livekit.NewAgentDispatchServiceServer(agentDispatchService, serverOptions...)
egressServer := livekit.NewEgressServer(egressService, serverOptions...)
ingressServer := livekit.NewIngressServer(ingressService, serverOptions...)
sipServer := livekit.NewSIPServer(sipService, serverOptions...)
mux := http.NewServeMux()
if conf.Development {
// pprof handlers are registered onto DefaultServeMux
mux = http.DefaultServeMux
mux.HandleFunc("/debug/goroutine", s.debugGoroutines)
mux.HandleFunc("/debug/rooms", s.debugInfo)
}
xtwirp.RegisterServer(mux, roomServer)
xtwirp.RegisterServer(mux, agentDispatchServer)
xtwirp.RegisterServer(mux, egressServer)
xtwirp.RegisterServer(mux, ingressServer)
xtwirp.RegisterServer(mux, sipServer)
rtcService.SetupRoutes(mux)
whipService.SetupRoutes(mux)
mux.Handle("/agent", agentService)
mux.HandleFunc("/", s.defaultHandler)
s.httpServer = &http.Server{
Handler: configureMiddlewares(mux, middlewares...),
}
if conf.PrometheusPort > 0 {
logger.Warnw("prometheus_port is deprecated, please switch prometheus.port instead", nil)
conf.Prometheus.Port = conf.PrometheusPort
}
if conf.Prometheus.Port > 0 {
promHandler := promhttp.Handler()
if conf.Prometheus.Username != "" && conf.Prometheus.Password != "" {
protectedHandler := negroni.New()
protectedHandler.Use(negroni.HandlerFunc(GenBasicAuthMiddleware(conf.Prometheus.Username, conf.Prometheus.Password)))
protectedHandler.UseHandler(promHandler)
promHandler = protectedHandler
}
s.promServer = &http.Server{
Handler: promHandler,
}
}
if err = router.RemoveDeadNodes(); err != nil {
return
}
return
}
func (s *LivekitServer) Node() *livekit.Node {
return s.currentNode.Clone()
}
func (s *LivekitServer) HTTPPort() int {
return int(s.config.Port)
}
func (s *LivekitServer) IsRunning() bool {
return s.running.Load()
}
func (s *LivekitServer) Start() error {
if s.running.Load() {
return errors.New("already running")
}
s.doneChan = make(chan struct{})
if err := s.router.RegisterNode(); err != nil {
return err
}
defer func() {
if err := s.router.UnregisterNode(); err != nil {
logger.Errorw("could not unregister node", err)
}
}()
if err := s.router.Start(); err != nil {
return err
}
if err := s.ioService.Start(); err != nil {
return err
}
addresses := s.config.BindAddresses
if addresses == nil {
addresses = []string{""}
}
// ensure we could listen
listeners := make([]net.Listener, 0)
promListeners := make([]net.Listener, 0)
for _, addr := range addresses {
ln, err := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Port))))
if err != nil {
return err
}
listeners = append(listeners, ln)
if s.promServer != nil {
ln, err = net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Prometheus.Port))))
if err != nil {
return err
}
promListeners = append(promListeners, ln)
}
}
values := []any{
"portHttp", s.config.Port,
"nodeID", s.currentNode.NodeID(),
"nodeIP", s.currentNode.NodeIP(),
"version", version.Version,
}
if s.config.BindAddresses != nil {
values = append(values, "bindAddresses", s.config.BindAddresses)
}
if s.config.RTC.TCPPort != 0 {
values = append(values, "rtc.portTCP", s.config.RTC.TCPPort)
}
if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort.Valid() {
values = append(values, "rtc.portUDP", s.config.RTC.UDPPort)
} else {
values = append(values,
"rtc.portICERange", []uint32{s.config.RTC.ICEPortRangeStart, s.config.RTC.ICEPortRangeEnd},
)
}
if s.config.Prometheus.Port != 0 {
values = append(values, "portPrometheus", s.config.Prometheus.Port)
}
if s.config.Region != "" {
values = append(values, "region", s.config.Region)
}
logger.Infow("starting LiveKit server", values...)
if runtime.GOOS == "windows" {
logger.Infow("Windows detected, capacity management is unavailable")
}
for _, promLn := range promListeners {
go s.promServer.Serve(promLn)
}
if err := s.signalServer.Start(); err != nil {
return err
}
httpGroup := &errgroup.Group{}
for _, ln := range listeners {
l := ln
httpGroup.Go(func() error {
return s.httpServer.Serve(l)
})
}
go func() {
if err := httpGroup.Wait(); err != http.ErrServerClosed {
logger.Errorw("could not start server", err)
s.Stop(true)
}
}()
go s.backgroundWorker()
// give time for Serve goroutine to start
time.Sleep(100 * time.Millisecond)
s.running.Store(true)
<-s.doneChan
// wait for shutdown
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_ = s.httpServer.Shutdown(ctx)
if s.turnServer != nil {
_ = s.turnServer.Close()
}
s.roomManager.Stop()
s.signalServer.Stop()
s.ioService.Stop()
close(s.closedChan)
return nil
}
func (s *LivekitServer) Stop(force bool) {
// wait for all participants to exit
s.router.Drain()
partTicker := time.NewTicker(5 * time.Second)
waitingForParticipants := !force && s.roomManager.HasParticipants()
for waitingForParticipants {
<-partTicker.C
logger.Infow("waiting for participants to exit")
waitingForParticipants = s.roomManager.HasParticipants()
}
partTicker.Stop()
if !s.running.Swap(false) {
return
}
s.router.Stop()
close(s.doneChan)
// wait for fully closed
<-s.closedChan
}
func (s *LivekitServer) RoomManager() *RoomManager {
return s.roomManager
}
func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, _ *http.Request) {
_ = pprof.Lookup("goroutine").WriteTo(w, 2)
}
func (s *LivekitServer) debugInfo(w http.ResponseWriter, _ *http.Request) {
s.roomManager.lock.RLock()
info := make([]map[string]any, 0, len(s.roomManager.rooms))
for _, room := range s.roomManager.rooms {
info = append(info, room.DebugInfo())
}
s.roomManager.lock.RUnlock()
b, err := json.Marshal(info)
if err != nil {
w.WriteHeader(400)
_, _ = w.Write([]byte(err.Error()))
} else {
_, _ = w.Write(b)
}
}
func (s *LivekitServer) defaultHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
s.healthCheck(w, r)
} else {
http.NotFound(w, r)
}
}
func (s *LivekitServer) healthCheck(w http.ResponseWriter, _ *http.Request) {
var updatedAt time.Time
if s.Node().Stats != nil {
updatedAt = time.Unix(s.Node().Stats.UpdatedAt, 0)
}
if time.Since(updatedAt) > 4*time.Second {
w.WriteHeader(http.StatusNotAcceptable)
_, _ = w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt)))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}
// worker to perform periodic tasks per node
func (s *LivekitServer) backgroundWorker() {
roomTicker := time.NewTicker(1 * time.Second)
defer roomTicker.Stop()
for {
select {
case <-s.doneChan:
return
case <-roomTicker.C:
s.roomManager.CloseIdleRooms()
}
}
}
func configureMiddlewares(handler http.Handler, middlewares ...negroni.Handler) *negroni.Negroni {
n := negroni.New()
for _, m := range middlewares {
n.Use(m)
}
n.UseHandler(handler)
return n
}