mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 22:05:39 +00:00
This allows listing, adding and deleting agent dispatches on an existing room. Requests go to a new AgentDispatchService, which sends them over RPC to the rtc.Room via the RoomManager. The rtc.Room then does agent job management using RPCs to the agent service.
404 lines
10 KiB
Go
404 lines
10 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"runtime"
|
|
"runtime/pprof"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/pion/turn/v2"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rs/cors"
|
|
"github.com/twitchtv/twirp"
|
|
"github.com/urfave/negroni/v3"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/routing"
|
|
"github.com/livekit/livekit-server/version"
|
|
"github.com/livekit/protocol/auth"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
)
|
|
|
|
type LivekitServer struct {
|
|
config *config.Config
|
|
ioService *IOInfoService
|
|
rtcService *RTCService
|
|
agentService *AgentService
|
|
httpServer *http.Server
|
|
promServer *http.Server
|
|
router routing.Router
|
|
roomManager *RoomManager
|
|
signalServer *SignalServer
|
|
turnServer *turn.Server
|
|
currentNode routing.LocalNode
|
|
running atomic.Bool
|
|
doneChan chan struct{}
|
|
closedChan chan struct{}
|
|
}
|
|
|
|
func NewLivekitServer(conf *config.Config,
|
|
roomService livekit.RoomService,
|
|
agentDispatchService *AgentDispatchService,
|
|
egressService *EgressService,
|
|
ingressService *IngressService,
|
|
sipService *SIPService,
|
|
ioService *IOInfoService,
|
|
rtcService *RTCService,
|
|
agentService *AgentService,
|
|
keyProvider auth.KeyProvider,
|
|
router routing.Router,
|
|
roomManager *RoomManager,
|
|
signalServer *SignalServer,
|
|
turnServer *turn.Server,
|
|
currentNode routing.LocalNode,
|
|
) (s *LivekitServer, err error) {
|
|
s = &LivekitServer{
|
|
config: conf,
|
|
ioService: ioService,
|
|
rtcService: rtcService,
|
|
agentService: agentService,
|
|
router: router,
|
|
roomManager: roomManager,
|
|
signalServer: signalServer,
|
|
// turn server starts automatically
|
|
turnServer: turnServer,
|
|
currentNode: currentNode,
|
|
closedChan: make(chan struct{}),
|
|
}
|
|
|
|
middlewares := []negroni.Handler{
|
|
// always first
|
|
negroni.NewRecovery(),
|
|
// CORS is allowed, we rely on token authentication to prevent improper use
|
|
cors.New(cors.Options{
|
|
AllowOriginFunc: func(origin string) bool {
|
|
return true
|
|
},
|
|
AllowedHeaders: []string{"*"},
|
|
// allow preflight to be cached for a day
|
|
MaxAge: 86400,
|
|
}),
|
|
}
|
|
if keyProvider != nil {
|
|
middlewares = append(middlewares, NewAPIKeyAuthMiddleware(keyProvider))
|
|
}
|
|
|
|
twirpLoggingHook := TwirpLogger()
|
|
twirpRequestStatusHook := TwirpRequestStatusReporter()
|
|
roomServer := livekit.NewRoomServiceServer(roomService, twirpLoggingHook)
|
|
agentDispatchServer := livekit.NewAgentDispatchServiceServer(agentDispatchService, twirpLoggingHook)
|
|
egressServer := livekit.NewEgressServer(egressService, twirp.WithServerHooks(
|
|
twirp.ChainHooks(
|
|
twirpLoggingHook,
|
|
twirpRequestStatusHook,
|
|
),
|
|
))
|
|
ingressServer := livekit.NewIngressServer(ingressService, twirpLoggingHook)
|
|
sipServer := livekit.NewSIPServer(sipService, twirpLoggingHook)
|
|
|
|
mux := http.NewServeMux()
|
|
if conf.Development {
|
|
// pprof handlers are registered onto DefaultServeMux
|
|
mux = http.DefaultServeMux
|
|
mux.HandleFunc("/debug/goroutine", s.debugGoroutines)
|
|
mux.HandleFunc("/debug/rooms", s.debugInfo)
|
|
}
|
|
|
|
mux.Handle(roomServer.PathPrefix(), roomServer)
|
|
mux.Handle(agentDispatchServer.PathPrefix(), agentDispatchServer)
|
|
mux.Handle(egressServer.PathPrefix(), egressServer)
|
|
mux.Handle(ingressServer.PathPrefix(), ingressServer)
|
|
mux.Handle(sipServer.PathPrefix(), sipServer)
|
|
mux.Handle("/rtc", rtcService)
|
|
mux.Handle("/agent", agentService)
|
|
mux.HandleFunc("/rtc/validate", rtcService.Validate)
|
|
mux.HandleFunc("/", s.defaultHandler)
|
|
|
|
s.httpServer = &http.Server{
|
|
Handler: configureMiddlewares(mux, middlewares...),
|
|
}
|
|
|
|
if conf.PrometheusPort > 0 {
|
|
logger.Warnw("prometheus_port is deprecated, please switch prometheus.port instead", nil)
|
|
conf.Prometheus.Port = conf.PrometheusPort
|
|
}
|
|
|
|
if conf.Prometheus.Port > 0 {
|
|
promHandler := promhttp.Handler()
|
|
if conf.Prometheus.Username != "" && conf.Prometheus.Password != "" {
|
|
protectedHandler := negroni.New()
|
|
protectedHandler.Use(negroni.HandlerFunc(GenBasicAuthMiddleware(conf.Prometheus.Username, conf.Prometheus.Password)))
|
|
protectedHandler.UseHandler(promHandler)
|
|
promHandler = protectedHandler
|
|
}
|
|
s.promServer = &http.Server{
|
|
Handler: promHandler,
|
|
}
|
|
}
|
|
|
|
// clean up old rooms on startup
|
|
if err = roomManager.CleanupRooms(); err != nil {
|
|
return
|
|
}
|
|
if err = router.RemoveDeadNodes(); err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *LivekitServer) Node() *livekit.Node {
|
|
return s.currentNode
|
|
}
|
|
|
|
func (s *LivekitServer) HTTPPort() int {
|
|
return int(s.config.Port)
|
|
}
|
|
|
|
func (s *LivekitServer) IsRunning() bool {
|
|
return s.running.Load()
|
|
}
|
|
|
|
func (s *LivekitServer) Start() error {
|
|
if s.running.Load() {
|
|
return errors.New("already running")
|
|
}
|
|
s.doneChan = make(chan struct{})
|
|
|
|
if err := s.router.RegisterNode(); err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err := s.router.UnregisterNode(); err != nil {
|
|
logger.Errorw("could not unregister node", err)
|
|
}
|
|
}()
|
|
|
|
if err := s.router.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.ioService.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
addresses := s.config.BindAddresses
|
|
if addresses == nil {
|
|
addresses = []string{""}
|
|
}
|
|
|
|
// ensure we could listen
|
|
listeners := make([]net.Listener, 0)
|
|
promListeners := make([]net.Listener, 0)
|
|
for _, addr := range addresses {
|
|
ln, err := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Port))))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
listeners = append(listeners, ln)
|
|
|
|
if s.promServer != nil {
|
|
ln, err = net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Prometheus.Port))))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
promListeners = append(promListeners, ln)
|
|
}
|
|
}
|
|
|
|
values := []interface{}{
|
|
"portHttp", s.config.Port,
|
|
"nodeID", s.currentNode.Id,
|
|
"nodeIP", s.currentNode.Ip,
|
|
"version", version.Version,
|
|
}
|
|
if s.config.BindAddresses != nil {
|
|
values = append(values, "bindAddresses", s.config.BindAddresses)
|
|
}
|
|
if s.config.RTC.TCPPort != 0 {
|
|
values = append(values, "rtc.portTCP", s.config.RTC.TCPPort)
|
|
}
|
|
if !s.config.RTC.ForceTCP && s.config.RTC.UDPPort.Valid() {
|
|
values = append(values, "rtc.portUDP", s.config.RTC.UDPPort)
|
|
} else {
|
|
values = append(values,
|
|
"rtc.portICERange", []uint32{s.config.RTC.ICEPortRangeStart, s.config.RTC.ICEPortRangeEnd},
|
|
)
|
|
}
|
|
if s.config.Prometheus.Port != 0 {
|
|
values = append(values, "portPrometheus", s.config.Prometheus.Port)
|
|
}
|
|
if s.config.Region != "" {
|
|
values = append(values, "region", s.config.Region)
|
|
}
|
|
logger.Infow("starting LiveKit server", values...)
|
|
if runtime.GOOS == "windows" {
|
|
logger.Infow("Windows detected, capacity management is unavailable")
|
|
}
|
|
|
|
for _, promLn := range promListeners {
|
|
go s.promServer.Serve(promLn)
|
|
}
|
|
|
|
if err := s.signalServer.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
httpGroup := &errgroup.Group{}
|
|
for _, ln := range listeners {
|
|
l := ln
|
|
httpGroup.Go(func() error {
|
|
return s.httpServer.Serve(l)
|
|
})
|
|
}
|
|
go func() {
|
|
if err := httpGroup.Wait(); err != http.ErrServerClosed {
|
|
logger.Errorw("could not start server", err)
|
|
s.Stop(true)
|
|
}
|
|
}()
|
|
|
|
go s.backgroundWorker()
|
|
|
|
// give time for Serve goroutine to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
s.running.Store(true)
|
|
|
|
<-s.doneChan
|
|
|
|
// wait for shutdown
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
_ = s.httpServer.Shutdown(ctx)
|
|
|
|
if s.turnServer != nil {
|
|
_ = s.turnServer.Close()
|
|
}
|
|
|
|
s.roomManager.Stop()
|
|
s.signalServer.Stop()
|
|
s.ioService.Stop()
|
|
|
|
close(s.closedChan)
|
|
return nil
|
|
}
|
|
|
|
func (s *LivekitServer) Stop(force bool) {
|
|
// wait for all participants to exit
|
|
s.router.Drain()
|
|
partTicker := time.NewTicker(5 * time.Second)
|
|
waitingForParticipants := !force && s.roomManager.HasParticipants()
|
|
for waitingForParticipants {
|
|
<-partTicker.C
|
|
logger.Infow("waiting for participants to exit")
|
|
waitingForParticipants = s.roomManager.HasParticipants()
|
|
}
|
|
partTicker.Stop()
|
|
|
|
if !s.running.Swap(false) {
|
|
return
|
|
}
|
|
|
|
s.router.Stop()
|
|
close(s.doneChan)
|
|
|
|
// wait for fully closed
|
|
<-s.closedChan
|
|
}
|
|
|
|
func (s *LivekitServer) RoomManager() *RoomManager {
|
|
return s.roomManager
|
|
}
|
|
|
|
func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, _ *http.Request) {
|
|
_ = pprof.Lookup("goroutine").WriteTo(w, 2)
|
|
}
|
|
|
|
func (s *LivekitServer) debugInfo(w http.ResponseWriter, _ *http.Request) {
|
|
s.roomManager.lock.RLock()
|
|
info := make([]map[string]interface{}, 0, len(s.roomManager.rooms))
|
|
for _, room := range s.roomManager.rooms {
|
|
info = append(info, room.DebugInfo())
|
|
}
|
|
s.roomManager.lock.RUnlock()
|
|
|
|
b, err := json.Marshal(info)
|
|
if err != nil {
|
|
w.WriteHeader(400)
|
|
_, _ = w.Write([]byte(err.Error()))
|
|
} else {
|
|
_, _ = w.Write(b)
|
|
}
|
|
}
|
|
|
|
func (s *LivekitServer) defaultHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path == "/" {
|
|
s.healthCheck(w, r)
|
|
} else {
|
|
http.NotFound(w, r)
|
|
}
|
|
}
|
|
|
|
func (s *LivekitServer) healthCheck(w http.ResponseWriter, _ *http.Request) {
|
|
var updatedAt time.Time
|
|
if s.Node().Stats != nil {
|
|
updatedAt = time.Unix(s.Node().Stats.UpdatedAt, 0)
|
|
}
|
|
if time.Since(updatedAt) > 4*time.Second {
|
|
w.WriteHeader(http.StatusNotAcceptable)
|
|
_, _ = w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt)))
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("OK"))
|
|
}
|
|
|
|
// worker to perform periodic tasks per node
|
|
func (s *LivekitServer) backgroundWorker() {
|
|
roomTicker := time.NewTicker(1 * time.Second)
|
|
defer roomTicker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.doneChan:
|
|
return
|
|
case <-roomTicker.C:
|
|
s.roomManager.CloseIdleRooms()
|
|
}
|
|
}
|
|
}
|
|
|
|
func configureMiddlewares(handler http.Handler, middlewares ...negroni.Handler) *negroni.Negroni {
|
|
n := negroni.New()
|
|
for _, m := range middlewares {
|
|
n.Use(m)
|
|
}
|
|
n.UseHandler(handler)
|
|
return n
|
|
}
|