mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 17:45:40 +00:00
410 lines
10 KiB
Go
410 lines
10 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"runtime"
|
|
"runtime/pprof"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/pion/turn/v4"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rs/cors"
|
|
"github.com/twitchtv/twirp"
|
|
"github.com/urfave/negroni/v3"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/livekit/protocol/auth"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/utils/xtwirp"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/routing"
|
|
"github.com/livekit/livekit-server/version"
|
|
)
|
|
|
|
type LivekitServer struct {
|
|
config *config.Config
|
|
ioService *IOInfoService
|
|
rtcService *RTCService
|
|
whipService *WHIPService
|
|
agentService *AgentService
|
|
httpServer *http.Server
|
|
promServer *http.Server
|
|
router routing.Router
|
|
roomManager *RoomManager
|
|
signalServer *SignalServer
|
|
turnServer *turn.Server
|
|
currentNode routing.LocalNode
|
|
running atomic.Bool
|
|
doneChan chan struct{}
|
|
closedChan chan struct{}
|
|
}
|
|
|
|
func NewLivekitServer(conf *config.Config,
|
|
roomService livekit.RoomService,
|
|
agentDispatchService *AgentDispatchService,
|
|
egressService *EgressService,
|
|
ingressService *IngressService,
|
|
sipService *SIPService,
|
|
ioService *IOInfoService,
|
|
rtcService *RTCService,
|
|
whipService *WHIPService,
|
|
agentService *AgentService,
|
|
keyProvider auth.KeyProvider,
|
|
router routing.Router,
|
|
roomManager *RoomManager,
|
|
signalServer *SignalServer,
|
|
turnServer *turn.Server,
|
|
currentNode routing.LocalNode,
|
|
) (s *LivekitServer, err error) {
|
|
s = &LivekitServer{
|
|
config: conf,
|
|
ioService: ioService,
|
|
rtcService: rtcService,
|
|
whipService: whipService,
|
|
agentService: agentService,
|
|
router: router,
|
|
roomManager: roomManager,
|
|
signalServer: signalServer,
|
|
// turn server starts automatically
|
|
turnServer: turnServer,
|
|
currentNode: currentNode,
|
|
closedChan: make(chan struct{}),
|
|
}
|
|
|
|
middlewares := []negroni.Handler{
|
|
// always first
|
|
negroni.NewRecovery(),
|
|
// CORS is allowed, we rely on token authentication to prevent improper use
|
|
cors.New(cors.Options{
|
|
AllowOriginFunc: func(origin string) bool {
|
|
return true
|
|
},
|
|
AllowedMethods: []string{"OPTIONS", "HEAD", "GET", "POST", "PATCH", "DELETE"},
|
|
AllowedHeaders: []string{"*"},
|
|
ExposedHeaders: []string{"*"},
|
|
// allow preflight to be cached for a day
|
|
MaxAge: 86400,
|
|
}),
|
|
negroni.HandlerFunc(RemoveDoubleSlashes),
|
|
}
|
|
if keyProvider != nil {
|
|
middlewares = append(middlewares, NewAPIKeyAuthMiddleware(keyProvider))
|
|
}
|
|
|
|
serverOptions := []any{
|
|
twirp.WithServerHooks(twirp.ChainHooks(
|
|
TwirpLogger(),
|
|
TwirpRequestStatusReporter(),
|
|
)),
|
|
}
|
|
for _, opt := range xtwirp.DefaultServerOptions() {
|
|
serverOptions = append(serverOptions, opt)
|
|
}
|
|
roomServer := livekit.NewRoomServiceServer(roomService, serverOptions...)
|
|
agentDispatchServer := livekit.NewAgentDispatchServiceServer(agentDispatchService, serverOptions...)
|
|
egressServer := livekit.NewEgressServer(egressService, serverOptions...)
|
|
ingressServer := livekit.NewIngressServer(ingressService, serverOptions...)
|
|
sipServer := livekit.NewSIPServer(sipService, serverOptions...)
|
|
|
|
mux := http.NewServeMux()
|
|
if conf.Development {
|
|
// pprof handlers are registered onto DefaultServeMux
|
|
mux = http.DefaultServeMux
|
|
mux.HandleFunc("/debug/goroutine", s.debugGoroutines)
|
|
mux.HandleFunc("/debug/rooms", s.debugInfo)
|
|
}
|
|
|
|
xtwirp.RegisterServer(mux, roomServer)
|
|
xtwirp.RegisterServer(mux, agentDispatchServer)
|
|
xtwirp.RegisterServer(mux, egressServer)
|
|
xtwirp.RegisterServer(mux, ingressServer)
|
|
xtwirp.RegisterServer(mux, sipServer)
|
|
rtcService.SetupRoutes(mux)
|
|
whipService.SetupRoutes(mux)
|
|
mux.Handle("/agent", agentService)
|
|
mux.HandleFunc("/", s.defaultHandler)
|
|
|
|
s.httpServer = &http.Server{
|
|
Handler: configureMiddlewares(mux, middlewares...),
|
|
}
|
|
|
|
if conf.PrometheusPort > 0 {
|
|
logger.Warnw("prometheus_port is deprecated, please switch prometheus.port instead", nil)
|
|
conf.Prometheus.Port = conf.PrometheusPort
|
|
}
|
|
|
|
if conf.Prometheus.Port > 0 {
|
|
promHandler := promhttp.Handler()
|
|
if conf.Prometheus.Username != "" && conf.Prometheus.Password != "" {
|
|
protectedHandler := negroni.New()
|
|
protectedHandler.Use(negroni.HandlerFunc(GenBasicAuthMiddleware(conf.Prometheus.Username, conf.Prometheus.Password)))
|
|
protectedHandler.UseHandler(promHandler)
|
|
promHandler = protectedHandler
|
|
}
|
|
s.promServer = &http.Server{
|
|
Handler: promHandler,
|
|
}
|
|
}
|
|
|
|
if err = router.RemoveDeadNodes(); err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *LivekitServer) Node() *livekit.Node {
|
|
return s.currentNode.Clone()
|
|
}
|
|
|
|
func (s *LivekitServer) HTTPPort() int {
|
|
return int(s.config.Port)
|
|
}
|
|
|
|
func (s *LivekitServer) IsRunning() bool {
|
|
return s.running.Load()
|
|
}
|
|
|
|
func (s *LivekitServer) Start() error {
|
|
if s.running.Load() {
|
|
return errors.New("already running")
|
|
}
|
|
s.doneChan = make(chan struct{})
|
|
|
|
if err := s.router.RegisterNode(); err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err := s.router.UnregisterNode(); err != nil {
|
|
logger.Errorw("could not unregister node", err)
|
|
}
|
|
}()
|
|
|
|
if err := s.router.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.ioService.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
addresses := s.config.BindAddresses
|
|
if addresses == nil {
|
|
addresses = []string{""}
|
|
}
|
|
|
|
// ensure we could listen
|
|
listeners := make([]net.Listener, 0)
|
|
promListeners := make([]net.Listener, 0)
|
|
for _, addr := range addresses {
|
|
ln, err := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Port))))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
listeners = append(listeners, ln)
|
|
|
|
if s.promServer != nil {
|
|
ln, err = net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Prometheus.Port))))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
promListeners = append(promListeners, ln)
|
|
}
|
|
}
|
|
|
|
values := []any{
|
|
"portHttp", s.config.Port,
|
|
"nodeID", s.currentNode.NodeID(),
|
|
"nodeIP", s.currentNode.NodeIP(),
|
|
"version", version.Version,
|
|
}
|
|
if s.config.BindAddresses != nil {
|
|
values = append(values, "bindAddresses", s.config.BindAddresses)
|
|
}
|
|
if s.config.RTC.TCPPort != 0 {
|
|
values = append(values, "rtc.portTCP", s.config.RTC.TCPPort)
|
|
}
|
|
if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort.Valid() {
|
|
values = append(values, "rtc.portUDP", s.config.RTC.UDPPort)
|
|
} else {
|
|
values = append(values,
|
|
"rtc.portICERange", []uint32{s.config.RTC.ICEPortRangeStart, s.config.RTC.ICEPortRangeEnd},
|
|
)
|
|
}
|
|
if s.config.Prometheus.Port != 0 {
|
|
values = append(values, "portPrometheus", s.config.Prometheus.Port)
|
|
}
|
|
if s.config.Region != "" {
|
|
values = append(values, "region", s.config.Region)
|
|
}
|
|
logger.Infow("starting LiveKit server", values...)
|
|
if runtime.GOOS == "windows" {
|
|
logger.Infow("Windows detected, capacity management is unavailable")
|
|
}
|
|
|
|
for _, promLn := range promListeners {
|
|
go s.promServer.Serve(promLn)
|
|
}
|
|
|
|
if err := s.signalServer.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
httpGroup := &errgroup.Group{}
|
|
for _, ln := range listeners {
|
|
l := ln
|
|
httpGroup.Go(func() error {
|
|
return s.httpServer.Serve(l)
|
|
})
|
|
}
|
|
go func() {
|
|
if err := httpGroup.Wait(); err != http.ErrServerClosed {
|
|
logger.Errorw("could not start server", err)
|
|
s.Stop(true)
|
|
}
|
|
}()
|
|
|
|
go s.backgroundWorker()
|
|
|
|
// give time for Serve goroutine to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
s.running.Store(true)
|
|
|
|
<-s.doneChan
|
|
|
|
// wait for shutdown
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
_ = s.httpServer.Shutdown(ctx)
|
|
|
|
if s.turnServer != nil {
|
|
_ = s.turnServer.Close()
|
|
}
|
|
|
|
s.roomManager.Stop()
|
|
s.signalServer.Stop()
|
|
s.ioService.Stop()
|
|
|
|
close(s.closedChan)
|
|
return nil
|
|
}
|
|
|
|
func (s *LivekitServer) Stop(force bool) {
|
|
// wait for all participants to exit
|
|
s.router.Drain()
|
|
partTicker := time.NewTicker(5 * time.Second)
|
|
waitingForParticipants := !force && s.roomManager.HasParticipants()
|
|
for waitingForParticipants {
|
|
<-partTicker.C
|
|
logger.Infow("waiting for participants to exit")
|
|
waitingForParticipants = s.roomManager.HasParticipants()
|
|
}
|
|
partTicker.Stop()
|
|
|
|
if !s.running.Swap(false) {
|
|
return
|
|
}
|
|
|
|
s.router.Stop()
|
|
close(s.doneChan)
|
|
|
|
// wait for fully closed
|
|
<-s.closedChan
|
|
}
|
|
|
|
func (s *LivekitServer) RoomManager() *RoomManager {
|
|
return s.roomManager
|
|
}
|
|
|
|
func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, _ *http.Request) {
|
|
_ = pprof.Lookup("goroutine").WriteTo(w, 2)
|
|
}
|
|
|
|
func (s *LivekitServer) debugInfo(w http.ResponseWriter, _ *http.Request) {
|
|
s.roomManager.lock.RLock()
|
|
info := make([]map[string]any, 0, len(s.roomManager.rooms))
|
|
for _, room := range s.roomManager.rooms {
|
|
info = append(info, room.DebugInfo())
|
|
}
|
|
s.roomManager.lock.RUnlock()
|
|
|
|
b, err := json.Marshal(info)
|
|
if err != nil {
|
|
w.WriteHeader(400)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
} else {
|
|
_, _ = w.Write(b)
|
|
}
|
|
}
|
|
|
|
func (s *LivekitServer) defaultHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path == "/" {
|
|
s.healthCheck(w, r)
|
|
} else {
|
|
http.NotFound(w, r)
|
|
}
|
|
}
|
|
|
|
func (s *LivekitServer) healthCheck(w http.ResponseWriter, _ *http.Request) {
|
|
var updatedAt time.Time
|
|
if s.Node().Stats != nil {
|
|
updatedAt = time.Unix(s.Node().Stats.UpdatedAt, 0)
|
|
}
|
|
if time.Since(updatedAt) > 4*time.Second {
|
|
w.WriteHeader(http.StatusNotAcceptable)
|
|
_, _ = w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt)))
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("OK"))
|
|
}
|
|
|
|
// worker to perform periodic tasks per node
|
|
func (s *LivekitServer) backgroundWorker() {
|
|
roomTicker := time.NewTicker(1 * time.Second)
|
|
defer roomTicker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.doneChan:
|
|
return
|
|
case <-roomTicker.C:
|
|
s.roomManager.CloseIdleRooms()
|
|
}
|
|
}
|
|
}
|
|
|
|
func configureMiddlewares(handler http.Handler, middlewares ...negroni.Handler) *negroni.Negroni {
|
|
n := negroni.New()
|
|
for _, m := range middlewares {
|
|
n.Use(m)
|
|
}
|
|
n.UseHandler(handler)
|
|
return n
|
|
}
|