Merge remote-tracking branch 'origin/master' into raja_1833

This commit is contained in:
boks1971
2023-11-22 11:48:47 +05:30
31 changed files with 1520 additions and 223 deletions
+4
View File
@@ -66,6 +66,7 @@ type Config struct {
Room RoomConfig `yaml:"room,omitempty"`
TURN TURNConfig `yaml:"turn,omitempty"`
Ingress IngressConfig `yaml:"ingress,omitempty"`
SIP SIPConfig `yaml:"sip,omitempty"`
WebHook WebHookConfig `yaml:"webhook,omitempty"`
NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"`
KeyFile string `yaml:"key_file,omitempty"`
@@ -294,6 +295,9 @@ type IngressConfig struct {
WHIPBaseURL string `yaml:"whip_base_url,omitempty"`
}
type SIPConfig struct {
}
// not exposed to YAML
type APIConfig struct {
// amount of time to wait for API to execute, default 2s
+1 -1
View File
@@ -786,7 +786,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
p.TransportManager.Close()
}()
p.dataChannelStats.Report()
p.dataChannelStats.Stop()
return nil
}
+8 -8
View File
@@ -83,7 +83,7 @@ func TestJoinedState(t *testing.T) {
func TestRoomJoin(t *testing.T) {
t.Run("joining returns existing participant data", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants})
pNew := newMockParticipant("new", types.CurrentProtocol, false, false)
pNew := NewMockParticipant("new", types.CurrentProtocol, false, false)
_ = rm.Join(pNew, nil, nil, iceServersForRoom)
@@ -98,7 +98,7 @@ func TestRoomJoin(t *testing.T) {
t.Run("subscribe to existing channels upon join", func(t *testing.T) {
numExisting := 3
rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting})
p := newMockParticipant("new", types.CurrentProtocol, false, false)
p := NewMockParticipant("new", types.CurrentProtocol, false, false)
err := rm.Join(p, nil, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom)
require.NoError(t, err)
@@ -154,7 +154,7 @@ func TestRoomJoin(t *testing.T) {
rm.lock.Lock()
rm.protoRoom.MaxParticipants = 1
rm.lock.Unlock()
p := newMockParticipant("second", types.ProtocolVersion(0), false, false)
p := NewMockParticipant("second", types.ProtocolVersion(0), false, false)
err := rm.Join(p, nil, nil, iceServersForRoom)
require.Equal(t, ErrMaxParticipantsExceeded, err)
@@ -414,7 +414,7 @@ func TestNewTrack(t *testing.T) {
pub := participants[2].(*typesfakes.FakeLocalParticipant)
// pub adds track
track := newMockTrack(livekit.TrackType_VIDEO, "webcam")
track := NewMockTrack(livekit.TrackType_VIDEO, "webcam")
trackCB := pub.OnTrackPublishedArgsForCall(0)
require.NotNil(t, trackCB)
trackCB(pub, track)
@@ -653,7 +653,7 @@ func TestHiddenParticipants(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1})
defer rm.Close()
pNew := newMockParticipant("new", types.CurrentProtocol, false, false)
pNew := NewMockParticipant("new", types.CurrentProtocol, false, false)
rm.Join(pNew, nil, nil, iceServersForRoom)
// expect new participant to get a JoinReply
@@ -667,7 +667,7 @@ func TestHiddenParticipants(t *testing.T) {
t.Run("hidden participant subscribes to tracks", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2})
hidden := newMockParticipant("hidden", types.CurrentProtocol, true, false)
hidden := NewMockParticipant("hidden", types.CurrentProtocol, true, false)
err := rm.Join(hidden, nil, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom)
require.NoError(t, err)
@@ -689,7 +689,7 @@ func TestRoomUpdate(t *testing.T) {
p1 := rm.GetParticipants()[0].(*typesfakes.FakeLocalParticipant)
require.Equal(t, 0, p1.SendRoomUpdateCallCount())
p2 := newMockParticipant("p2", types.CurrentProtocol, false, false)
p2 := NewMockParticipant("p2", types.CurrentProtocol, false, false)
require.NoError(t, rm.Join(p2, nil, nil, iceServersForRoom))
// p1 should have received an update
@@ -743,7 +743,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room {
)
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i))
participant := newMockParticipant(identity, opts.protocol, i >= opts.num, true)
participant := NewMockParticipant(identity, opts.protocol, i >= opts.num, true)
err := rm.Join(participant, nil, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom)
require.NoError(t, err)
participant.StateReturns(livekit.ParticipantInfo_ACTIVE)
@@ -16,6 +16,7 @@ package rtc
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -27,7 +28,7 @@ func init() {
prometheus.Init("test", livekit.NodeType_SERVER, "test")
}
func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool, publisher bool) *typesfakes.FakeLocalParticipant {
func NewMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool, publisher bool) *typesfakes.FakeLocalParticipant {
p := &typesfakes.FakeLocalParticipant{}
sid := utils.NewGuid(utils.ParticipantPrefix)
p.IDReturns(livekit.ParticipantID(sid))
@@ -44,6 +45,12 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro
State: livekit.ParticipantInfo_JOINED,
IsPublisher: publisher,
})
p.ToProtoWithVersionReturns(&livekit.ParticipantInfo{
Sid: sid,
Identity: string(identity),
State: livekit.ParticipantInfo_JOINED,
IsPublisher: publisher,
}, utils.TimedVersion{})
p.SetMetadataCalls(func(m string) {
var f func(participant types.LocalParticipant)
@@ -71,11 +78,12 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro
p.AddTrackCalls(func(req *livekit.AddTrackRequest) {
updateTrack()
})
p.GetLoggerReturns(logger.GetLogger())
return p
}
func newMockTrack(kind livekit.TrackType, name string) *typesfakes.FakeMediaTrack {
func NewMockTrack(kind livekit.TrackType, name string) *typesfakes.FakeMediaTrack {
t := &typesfakes.FakeMediaTrack{}
t.IDReturns(livekit.TrackID(utils.NewGuid(utils.TrackPrefix)))
t.KindReturns(kind)
+11 -9
View File
@@ -657,12 +657,10 @@ func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionSta
}
t.maybeNotifyFullyEstablished()
t.logICECandidates()
}
case webrtc.PeerConnectionStateFailed:
t.params.Logger.Infow("peer connection failed")
t.clearConnTimer()
t.logICECandidates()
t.handleConnectionFailed(false)
}
}
@@ -1606,13 +1604,17 @@ func (t *PCTransport) handleRemoteICECandidate(e *event) error {
}
func (t *PCTransport) handleLogICECandidates(e *event) error {
t.params.Logger.Infow(
"ice candidates",
"lc", t.allowedLocalCandidates.Get(),
"rc", t.allowedRemoteCandidates.Get(),
"lc (filtered)", t.filteredLocalCandidates.Get(),
"rc (filtered)", t.filteredRemoteCandidates.Get(),
)
lc := t.allowedLocalCandidates.Get()
rc := t.allowedRemoteCandidates.Get()
if len(lc) != 0 || len(rc) != 0 {
t.params.Logger.Infow(
"ice candidates",
"lc", lc,
"rc", rc,
"lc (filtered)", t.filteredLocalCandidates.Get(),
"rc (filtered)", t.filteredRemoteCandidates.Get(),
)
}
return nil
}
+25 -4
View File
@@ -198,12 +198,27 @@ func (s *AgentHandler) HandleConnection(conn *websocket.Conn) {
}
func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorkerRequest) {
if err := s.doHandleRegister(worker, msg); err != nil {
logger.Errorw("failed to register worker", err, "workerID", msg.WorkerId, "jobType", msg.Type)
worker.conn.Close()
}
}
func (s *AgentHandler) doHandleRegister(worker *worker, msg *livekit.RegisterWorkerRequest) error {
if msg.WorkerId == "" {
return errors.New("invalid worker id")
}
s.mu.Lock()
defer s.mu.Unlock()
if worker.id != "" {
s.mu.Unlock()
return errors.New("worker already registered")
}
switch msg.Type {
case livekit.JobType_JT_ROOM:
worker.id = msg.WorkerId
worker.jobType = msg.Type
delete(s.unregistered, worker.conn)
s.roomWorkers[worker.id] = worker
@@ -218,6 +233,7 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke
case livekit.JobType_JT_PUBLISHER:
worker.id = msg.WorkerId
worker.jobType = msg.Type
delete(s.unregistered, worker.conn)
s.publisherWorkers[worker.id] = worker
@@ -229,7 +245,11 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke
s.publisherRegistered = true
}
}
default:
s.mu.Unlock()
return errors.New("invalid job type")
}
s.mu.Unlock()
_, err := worker.sigConn.WriteServerMessage(&livekit.ServerMessage{
Message: &livekit.ServerMessage_Register{
@@ -242,6 +262,8 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke
if err != nil {
logger.Errorw("failed to write server message", err)
}
return nil
}
func (s *AgentHandler) handleAvailability(w *worker, msg *livekit.AvailabilityResponse) {
@@ -366,8 +388,7 @@ func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty
Availability: &livekit.AvailabilityRequest{Job: job},
}})
if err != nil {
logger.Errorw("failed to send availability request", err)
return nil, err
logger.Errorw("failed to send availability request", err, "workerID", selected.id)
}
select {
@@ -379,7 +400,7 @@ func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty
Assignment: &livekit.JobAssignment{Job: job},
}})
if err != nil {
logger.Errorw("failed to assign job", err)
logger.Errorw("failed to assign job", err, "workerID", selected.id)
} else {
selected.mu.Lock()
selected.activeJobs++
+4
View File
@@ -34,4 +34,8 @@ var (
ErrRemoteUnmuteNoteEnabled = psrpc.NewErrorf(psrpc.FailedPrecondition, "remote unmute not enabled")
ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found")
ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks")
ErrSIPNotConnected = psrpc.NewErrorf(psrpc.Internal, "sip not connected (redis required)")
ErrSIPTrunkNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip trunk does not exist")
ErrSIPDispatchRuleNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip dispatch rule does not exist")
ErrSIPParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip participant does not exist")
)
+18
View File
@@ -76,3 +76,21 @@ type RoomAllocator interface {
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error)
ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error
}
//counterfeiter:generate . SIPStore
type SIPStore interface {
StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error
LoadSIPTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPTrunkInfo, error)
ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error)
DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error
StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error
LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) (*livekit.SIPDispatchRuleInfo, error)
ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error)
DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error
StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error
LoadSIPParticipant(ctx context.Context, sipParticipantID string) (*livekit.SIPParticipantInfo, error)
ListSIPParticipant(ctx context.Context) ([]*livekit.SIPParticipantInfo, error)
DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error
}
+5 -1
View File
@@ -20,11 +20,12 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/telemetry"
)
type IOInfoService struct {
@@ -32,6 +33,7 @@ type IOInfoService struct {
es EgressStore
is IngressStore
ss SIPStore
telemetry telemetry.TelemetryService
shutdown chan struct{}
@@ -41,11 +43,13 @@ func NewIOInfoService(
bus psrpc.MessageBus,
es EgressStore,
is IngressStore,
ss SIPStore,
ts telemetry.TelemetryService,
) (*IOInfoService, error) {
s := &IOInfoService{
es: es,
is: is,
ss: ss,
telemetry: ts,
shutdown: make(chan struct{}),
}
+328
View File
@@ -0,0 +1,328 @@
package service
import (
"context"
"fmt"
"math"
"regexp"
"sort"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
)
// sipRulePriority returns sorting priority for dispatch rules. Lower value means higher priority.
func sipRulePriority(info *livekit.SIPDispatchRuleInfo) int32 {
// In all these cases, prefer pin-protected rules.
// Thus, the order will be the following:
// - 0: Direct or Pin (both pin-protected)
// - 1: Individual (pin-protected)
// - 100: Direct (open)
// - 101: Individual (open)
const (
last = math.MaxInt32
)
// TODO: Maybe allow setting specific priorities for dispatch rules?
switch rule := info.GetRule().GetRule().(type) {
default:
return last
case *livekit.SIPDispatchRule_DispatchRuleDirect:
if rule.DispatchRuleDirect.GetPin() != "" {
return 0
}
return 100
case *livekit.SIPDispatchRule_DispatchRuleIndividual:
if rule.DispatchRuleIndividual.GetPin() != "" {
return 1
}
return 101
}
}
// sipSortRules predictably sorts dispatch rules by priority (first one is highest).
func sipSortRules(rules []*livekit.SIPDispatchRuleInfo) {
sort.Slice(rules, func(i, j int) bool {
p1, p2 := sipRulePriority(rules[i]), sipRulePriority(rules[j])
if p1 < p2 {
return true
} else if p1 > p2 {
return false
}
// For predictable sorting order.
room1, _, _ := sipGetPinAndRoom(rules[i])
room2, _, _ := sipGetPinAndRoom(rules[j])
return room1 < room2
})
}
// sipSelectDispatch takes a list of dispatch rules, and takes the decision which one should be selected.
// It returns an error if there are conflicting rules. Returns nil if no rules match.
func sipSelectDispatch(rules []*livekit.SIPDispatchRuleInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) {
if len(rules) == 0 {
return nil, nil
}
// Sorting will do the selection for us. We already filtered out irrelevant ones in matchSIPDispatchRule.
sipSortRules(rules)
byPin := make(map[string]*livekit.SIPDispatchRuleInfo)
var (
pinRule *livekit.SIPDispatchRuleInfo
openRule *livekit.SIPDispatchRuleInfo
)
openCnt := 0
for _, r := range rules {
_, pin, err := sipGetPinAndRoom(r)
if err != nil {
return nil, err
}
if pin == "" {
openRule = r // last one
openCnt++
} else if r2 := byPin[pin]; r2 != nil {
return nil, fmt.Errorf("Conflicting SIP Dispatch Rules: Same PIN for %q and %q",
r.SipDispatchRuleId, r2.SipDispatchRuleId)
} else {
byPin[pin] = r
// Pick the first one with a Pin. If Pin was provided in the request, we already filtered the right rules.
// If not, this rule will just be used to send RequestPin=true flag.
if pinRule == nil {
pinRule = r
}
}
}
if req.GetPin() != "" {
// If it's still nil that's fine. We will report "no rules matched" later.
return pinRule, nil
}
if pinRule != nil {
return pinRule, nil
}
if openCnt > 1 {
return nil, fmt.Errorf("Conflicting SIP Dispatch Rules: Matched %d open rules for %q", openCnt, req.CallingNumber)
}
return openRule, nil
}
// sipGetPinAndRoom returns a room name/prefix and the pin for a dispatch rule. Just a convenience wrapper.
func sipGetPinAndRoom(info *livekit.SIPDispatchRuleInfo) (room, pin string, err error) {
// TODO: Could probably add methods on SIPDispatchRuleInfo struct instead.
switch rule := info.GetRule().GetRule().(type) {
default:
return "", "", fmt.Errorf("Unsupported SIP Dispatch Rule: %T", rule)
case *livekit.SIPDispatchRule_DispatchRuleDirect:
pin = rule.DispatchRuleDirect.GetPin()
room = rule.DispatchRuleDirect.GetRoomName()
case *livekit.SIPDispatchRule_DispatchRuleIndividual:
pin = rule.DispatchRuleIndividual.GetPin()
room = rule.DispatchRuleIndividual.GetRoomPrefix()
}
return room, pin, nil
}
// sipMatchTrunk finds a SIP Trunk definition matching the request.
// Returns nil if no rules matched or an error if there are conflicting definitions.
func sipMatchTrunk(trunks []*livekit.SIPTrunkInfo, calling, called string) (*livekit.SIPTrunkInfo, error) {
var (
selectedTrunk *livekit.SIPTrunkInfo
defaultTrunk *livekit.SIPTrunkInfo
defaultTrunkCnt int // to error in case there are multiple ones
)
for _, tr := range trunks {
// Do not consider it if regexp doesn't match.
matches := len(tr.InboundNumbersRegex) == 0
for _, reStr := range tr.InboundNumbersRegex {
// TODO: we should cache it
re, err := regexp.Compile(reStr)
if err != nil {
logger.Errorw("cannot parse SIP trunk regexp", err, "trunkID", tr.SipTrunkId)
continue
}
if re.MatchString(calling) {
matches = true
break
}
}
if !matches {
continue
}
if tr.OutboundNumber == "" {
// Default/wildcard trunk.
defaultTrunk = tr
defaultTrunkCnt++
} else if tr.OutboundNumber == called {
// Trunk specific to the number.
if selectedTrunk != nil {
return nil, fmt.Errorf("Multiple SIP Trunks matched for %q", called)
}
selectedTrunk = tr
// Keep searching! We want to know if there are any conflicting Trunk definitions.
}
}
if selectedTrunk != nil {
return selectedTrunk, nil
}
if defaultTrunkCnt > 1 {
return nil, fmt.Errorf("Multiple default SIP Trunks matched for %q", called)
}
// Could still be nil here.
return defaultTrunk, nil
}
// sipMatchDispatchRule finds the best dispatch rule matching the request parameters. Returns an error if no rule matched.
// Trunk parameter can be nil, in which case only wildcard dispatch rules will be effective (ones without Trunk IDs).
func sipMatchDispatchRule(trunk *livekit.SIPTrunkInfo, rules []*livekit.SIPDispatchRuleInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) {
// Trunk can still be nil here in case none matched or were defined.
// This is still fine, but only in case we'll match exactly one wildcard dispatch rule.
if len(rules) == 0 {
return nil, fmt.Errorf("No SIP Dispatch Rules defined")
}
// We split the matched dispatch rules into two sets: specific and default (aka wildcard).
// First, attempt to match any of the specific rules, where we did match the Trunk ID.
// If nothing matches there - fallback to default/wildcard rules, where no Trunk IDs were mentioned.
var (
specificRules []*livekit.SIPDispatchRuleInfo
defaultRules []*livekit.SIPDispatchRuleInfo
)
noPin := req.NoPin
sentPin := req.GetPin()
for _, info := range rules {
_, rulePin, err := sipGetPinAndRoom(info)
if err != nil {
logger.Errorw("Invalid SIP Dispatch Rule", err, "dispatchRuleID", info.SipDispatchRuleId)
continue
}
// Filter heavily on the Pin, so that only relevant rules remain.
if noPin {
if rulePin != "" {
// Skip pin-protected rules if no pin mode requested.
continue
}
} else if sentPin != "" {
if rulePin == "" {
// Pin already sent, skip non-pin-protected rules.
continue
}
if sentPin != rulePin {
// Pin doesn't match. Don't return an error here, just wait for other rule to match (or none at all).
// Note that we will NOT match non-pin-protected rules, thus it will not fallback to open rules.
continue
}
}
if len(info.TrunkIds) == 0 {
// Default/wildcard dispatch rule.
defaultRules = append(defaultRules, info)
continue
}
// Specific dispatch rules. Require a Trunk associated with the number.
if trunk == nil {
continue
}
matches := false
for _, id := range info.TrunkIds {
if id == trunk.SipTrunkId {
matches = true
break
}
}
if !matches {
continue
}
specificRules = append(specificRules, info)
}
best, err := sipSelectDispatch(specificRules, req)
if err != nil {
return nil, err
} else if best != nil {
return best, nil
}
best, err = sipSelectDispatch(defaultRules, req)
if err != nil {
return nil, err
} else if best != nil {
return best, nil
}
if trunk == nil {
return nil, fmt.Errorf("No SIP Trunk or Dispatch Rules matched for %q", req.CalledNumber)
}
return nil, fmt.Errorf("No SIP Dispatch Rules matched for %q", req.CalledNumber)
}
// matchSIPTrunk finds a SIP Trunk definition matching the request.
// Returns nil if no rules matched or an error if there are conflicting definitions.
func (s *IOInfoService) matchSIPTrunk(ctx context.Context, calling, called string) (*livekit.SIPTrunkInfo, error) {
trunks, err := s.ss.ListSIPTrunk(ctx)
if err != nil {
return nil, err
}
return sipMatchTrunk(trunks, calling, called)
}
// matchSIPDispatchRule finds the best dispatch rule matching the request parameters. Returns an error if no rule matched.
// Trunk parameter can be nil, in which case only wildcard dispatch rules will be effective (ones without Trunk IDs).
func (s *IOInfoService) matchSIPDispatchRule(ctx context.Context, trunk *livekit.SIPTrunkInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) {
// Trunk can still be nil here in case none matched or were defined.
// This is still fine, but only in case we'll match exactly one wildcard dispatch rule.
rules, err := s.ss.ListSIPDispatchRule(ctx)
if err != nil {
return nil, err
}
return sipMatchDispatchRule(trunk, rules, req)
}
func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.EvaluateSIPDispatchRulesRequest) (*rpc.EvaluateSIPDispatchRulesResponse, error) {
trunk, err := s.matchSIPTrunk(ctx, req.CallingNumber, req.CalledNumber)
if err != nil {
return nil, err
}
best, err := s.matchSIPDispatchRule(ctx, trunk, req)
if err != nil {
return nil, err
}
sentPin := req.GetPin()
from := req.CallingNumber
if best.HidePhoneNumber {
// TODO: Decide on the phone masking format.
// Maybe keep regional code, but mask all but 4 last digits?
from = from[len(from)-4:]
}
fromName := "Phone " + from
room, rulePin, err := sipGetPinAndRoom(best)
if err != nil {
return nil, err
}
if rulePin != "" {
if sentPin == "" {
return &rpc.EvaluateSIPDispatchRulesResponse{
RequestPin: true,
}, nil
}
if rulePin != sentPin {
// This should never happen in practice, because matchSIPDispatchRule should remove rules with the wrong pin.
return nil, fmt.Errorf("Incorrect PIN for SIP room")
}
} else {
// Pin was sent, but room doesn't require one. Assume user accidentally pressed phone button.
}
switch rule := best.GetRule().GetRule().(type) {
case *livekit.SIPDispatchRule_DispatchRuleIndividual:
// TODO: Decide on the suffix. Do we need to escape specific characters?
room = rule.DispatchRuleIndividual.GetRoomPrefix() + from
}
return &rpc.EvaluateSIPDispatchRulesResponse{
RoomName: room,
ParticipantIdentity: fromName,
}, nil
}
func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc.GetSIPTrunkAuthenticationRequest) (*rpc.GetSIPTrunkAuthenticationResponse, error) {
trunk, err := s.matchSIPTrunk(ctx, req.From, req.To)
if err != nil {
return nil, err
}
return &rpc.GetSIPTrunkAuthenticationResponse{
Username: trunk.Username,
Password: trunk.Password,
}, nil
}
+390
View File
@@ -0,0 +1,390 @@
package service
import (
"fmt"
"testing"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/stretchr/testify/require"
)
const (
sipNumber1 = "1111 1111"
sipNumber2 = "2222 2222"
sipNumber3 = "3333 3333"
sipTrunkID1 = "aaa"
sipTrunkID2 = "bbb"
)
func TestSIPMatchTrunk(t *testing.T) {
cases := []struct {
name string
trunks []*livekit.SIPTrunkInfo
exp int
expErr bool
}{
{
name: "empty",
trunks: nil,
exp: -1, // no error; nil result
},
{
name: "one wildcard",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa"},
},
exp: 0,
},
{
name: "matching",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber2},
},
exp: 0,
},
{
name: "matching regexp",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber2, InboundNumbersRegex: []string{`^\d+ \d+$`}},
},
exp: 0,
},
{
name: "not matching",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber3},
},
exp: -1,
},
{
name: "not matching regexp",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber2, InboundNumbersRegex: []string{`^\d+$`}},
},
exp: -1,
},
{
name: "one match",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber3},
{SipTrunkId: "bbb", OutboundNumber: sipNumber2},
},
exp: 1,
},
{
name: "many matches",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber3},
{SipTrunkId: "bbb", OutboundNumber: sipNumber2},
{SipTrunkId: "ccc", OutboundNumber: sipNumber2},
},
expErr: true,
},
{
name: "many matches default",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber3},
{SipTrunkId: "bbb"},
{SipTrunkId: "ccc", OutboundNumber: sipNumber2},
{SipTrunkId: "ddd"},
},
exp: 2,
},
{
name: "regexp",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber3},
{SipTrunkId: "bbb", OutboundNumber: sipNumber2},
{SipTrunkId: "ccc", OutboundNumber: sipNumber2, InboundNumbersRegex: []string{`^\d+$`}},
},
exp: 1,
},
{
name: "multiple defaults",
trunks: []*livekit.SIPTrunkInfo{
{SipTrunkId: "aaa", OutboundNumber: sipNumber3},
{SipTrunkId: "bbb"},
{SipTrunkId: "ccc"},
},
expErr: true,
},
}
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
got, err := sipMatchTrunk(c.trunks, sipNumber1, sipNumber2)
if c.expErr {
require.Error(t, err)
require.Nil(t, got)
t.Log(err)
} else {
var exp *livekit.SIPTrunkInfo
if c.exp >= 0 {
exp = c.trunks[c.exp]
}
require.NoError(t, err)
require.Equal(t, exp, got)
}
})
}
}
func newSIPTrunkDispatch() *livekit.SIPTrunkInfo {
return &livekit.SIPTrunkInfo{
SipTrunkId: sipTrunkID1,
OutboundNumber: sipNumber2,
}
}
func newSIPReqDispatch(pin string, noPin bool) *rpc.EvaluateSIPDispatchRulesRequest {
return &rpc.EvaluateSIPDispatchRulesRequest{
CallingNumber: sipNumber1,
CalledNumber: sipNumber2,
Pin: pin,
//NoPin: noPin, // TODO
}
}
func newDirectDispatch(room, pin string) *livekit.SIPDispatchRule {
return &livekit.SIPDispatchRule{
Rule: &livekit.SIPDispatchRule_DispatchRuleDirect{
DispatchRuleDirect: &livekit.SIPDispatchRuleDirect{
RoomName: room, Pin: pin,
},
},
}
}
func newIndividualDispatch(roomPref, pin string) *livekit.SIPDispatchRule {
return &livekit.SIPDispatchRule{
Rule: &livekit.SIPDispatchRule_DispatchRuleIndividual{
DispatchRuleIndividual: &livekit.SIPDispatchRuleIndividual{
RoomPrefix: roomPref, Pin: pin,
},
},
}
}
func TestSIPMatchDispatchRule(t *testing.T) {
cases := []struct {
name string
trunk *livekit.SIPTrunkInfo
rules []*livekit.SIPDispatchRuleInfo
reqPin string
noPin bool
exp int
expErr bool
}{
// These cases just validate that no rules produce an error.
{
name: "empty",
trunk: nil,
rules: nil,
expErr: true,
},
{
name: "only trunk",
trunk: newSIPTrunkDispatch(),
rules: nil,
expErr: true,
},
// Default rules should work even if no trunk is defined.
{
name: "one rule/no trunk",
trunk: nil,
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newDirectDispatch("sip", "")},
},
exp: 0,
},
// Default rule should work with a trunk too.
{
name: "one rule/default trunk",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newDirectDispatch("sip", "")},
},
exp: 0,
},
// Rule matching the trunk should be selected.
{
name: "one rule/specific trunk",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: []string{sipTrunkID1, sipTrunkID2}, Rule: newDirectDispatch("sip", "")},
},
exp: 0,
},
// Rule NOT matching the trunk should NOT be selected.
{
name: "one rule/wrong trunk",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: []string{"zzz"}, Rule: newDirectDispatch("sip", "")},
},
expErr: true,
},
// Direct rule with a pin should be selected, even if no pin is provided.
{
name: "direct pin/correct",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip", "123")},
{TrunkIds: []string{sipTrunkID2}, Rule: newDirectDispatch("sip", "456")},
},
reqPin: "123",
exp: 0,
},
// Direct rule with a pin should reject wrong pin.
{
name: "direct pin/wrong",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip", "123")},
{TrunkIds: []string{sipTrunkID2}, Rule: newDirectDispatch("sip", "456")},
},
reqPin: "zzz",
expErr: true,
},
// Multiple direct rules with the same pin should result in an error.
{
name: "direct pin/conflict",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip1", "123")},
{TrunkIds: []string{sipTrunkID1, sipTrunkID2}, Rule: newDirectDispatch("sip2", "123")},
},
reqPin: "123",
expErr: true,
},
// Multiple direct rules with the same pin on different trunks are ok.
{
name: "direct pin/no conflict on different trunk",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip1", "123")},
{TrunkIds: []string{sipTrunkID2}, Rule: newDirectDispatch("sip2", "123")},
},
reqPin: "123",
exp: 0,
},
// Specific direct rules should take priority over default direct rules.
{
name: "direct pin/default and specific",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newDirectDispatch("sip1", "123")},
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "123")},
},
reqPin: "123",
exp: 1,
},
// Specific direct rules should take priority over default direct rules. No pin.
{
name: "direct/default and specific",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newDirectDispatch("sip1", "")},
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "")},
},
exp: 1,
},
// Specific direct rules should take priority over default direct rules. One with pin, other without.
{
name: "direct/default and specific/mixed 1",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newDirectDispatch("sip1", "123")},
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "")},
},
exp: 1,
},
{
name: "direct/default and specific/mixed 2",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newDirectDispatch("sip1", "")},
{TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "123")},
},
exp: 1,
},
// Multiple default direct rules are not allowed.
{
name: "direct/multiple defaults",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newDirectDispatch("sip1", "")},
{TrunkIds: nil, Rule: newDirectDispatch("sip2", "")},
},
expErr: true,
},
// Cannot use both direct and individual rules with the same pin setup.
{
name: "direct vs individual/private",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newIndividualDispatch("pref_", "123")},
{TrunkIds: nil, Rule: newDirectDispatch("sip", "123")},
},
expErr: true,
},
{
name: "direct vs individual/open",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newIndividualDispatch("pref_", "")},
{TrunkIds: nil, Rule: newDirectDispatch("sip", "")},
},
expErr: true,
},
// Direct rules take priority over individual rules.
{
name: "direct vs individual/priority",
trunk: newSIPTrunkDispatch(),
rules: []*livekit.SIPDispatchRuleInfo{
{TrunkIds: nil, Rule: newIndividualDispatch("pref_", "123")},
{TrunkIds: nil, Rule: newDirectDispatch("sip", "456")},
},
reqPin: "456",
exp: 1,
},
}
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
pins := []string{c.reqPin}
if !c.expErr && c.reqPin != "" {
// Should match the same rule, even if no pin is set (so that it can be requested).
pins = append(pins, "")
}
for i, r := range c.rules {
if r.SipDispatchRuleId == "" {
r.SipDispatchRuleId = fmt.Sprintf("rule_%d", i)
}
}
for _, pin := range pins {
pin := pin
name := pin
if name == "" {
name = "no pin"
}
t.Run(name, func(t *testing.T) {
got, err := sipMatchDispatchRule(c.trunk, c.rules, newSIPReqDispatch(pin, c.noPin))
if c.expErr {
require.Error(t, err)
require.Nil(t, got)
t.Log(err)
} else {
var exp *livekit.SIPDispatchRuleInfo
if c.exp >= 0 {
exp = c.rules[c.exp]
}
require.NoError(t, err)
require.Equal(t, exp, got)
}
})
}
})
}
}
+130
View File
@@ -51,6 +51,10 @@ const (
IngressStatePrefix = "{ingress}_state:"
RoomIngressPrefix = "room_{ingress}:"
SIPTrunkKey = "sip_trunk"
SIPDispatchRuleKey = "sip_dispatch_rule"
SIPParticipantKey = "sip_participant"
// RoomParticipantsPrefix is hash of participant_name => ParticipantInfo
RoomParticipantsPrefix = "room_participants:"
@@ -812,3 +816,129 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo)
return nil
}
func (s *RedisStore) loadOne(ctx context.Context, key, id string, info proto.Message, notFoundErr error) error {
data, err := s.rc.HGet(s.ctx, key, id).Result()
switch err {
case nil:
return proto.Unmarshal([]byte(data), info)
case redis.Nil:
return notFoundErr
default:
return err
}
}
func (s *RedisStore) loadMany(ctx context.Context, key string, onResult func() proto.Message) error {
data, err := s.rc.HGetAll(s.ctx, key).Result()
if err != nil {
if err == redis.Nil {
return nil
}
return err
}
for _, d := range data {
if err = proto.Unmarshal([]byte(d), onResult()); err != nil {
return err
}
}
return nil
}
func (s *RedisStore) StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error {
data, err := proto.Marshal(info)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, SIPTrunkKey, info.SipTrunkId, data).Err()
}
func (s *RedisStore) LoadSIPTrunk(ctx context.Context, sipTrunkId string) (*livekit.SIPTrunkInfo, error) {
info := &livekit.SIPTrunkInfo{}
if err := s.loadOne(ctx, SIPTrunkKey, sipTrunkId, info, ErrSIPTrunkNotFound); err != nil {
return nil, err
}
return info, nil
}
func (s *RedisStore) DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error {
return s.rc.HDel(s.ctx, SIPTrunkKey, info.SipTrunkId).Err()
}
func (s *RedisStore) ListSIPTrunk(ctx context.Context) (infos []*livekit.SIPTrunkInfo, err error) {
err = s.loadMany(ctx, SIPTrunkKey, func() proto.Message {
infos = append(infos, &livekit.SIPTrunkInfo{})
return infos[len(infos)-1]
})
return infos, err
}
func (s *RedisStore) StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error {
data, err := proto.Marshal(info)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId, data).Err()
}
func (s *RedisStore) LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleId string) (*livekit.SIPDispatchRuleInfo, error) {
info := &livekit.SIPDispatchRuleInfo{}
if err := s.loadOne(ctx, SIPDispatchRuleKey, sipDispatchRuleId, info, ErrSIPDispatchRuleNotFound); err != nil {
return nil, err
}
return info, nil
}
func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error {
return s.rc.HDel(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId).Err()
}
func (s *RedisStore) ListSIPDispatchRule(ctx context.Context) (infos []*livekit.SIPDispatchRuleInfo, err error) {
err = s.loadMany(ctx, SIPDispatchRuleKey, func() proto.Message {
infos = append(infos, &livekit.SIPDispatchRuleInfo{})
return infos[len(infos)-1]
})
return infos, err
}
func (s *RedisStore) StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error {
data, err := proto.Marshal(info)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, SIPParticipantKey, info.SipParticipantId, data).Err()
}
func (s *RedisStore) LoadSIPParticipant(ctx context.Context, sipParticipantId string) (*livekit.SIPParticipantInfo, error) {
info := &livekit.SIPParticipantInfo{}
if err := s.loadOne(ctx, SIPParticipantKey, sipParticipantId, info, ErrSIPParticipantNotFound); err != nil {
return nil, err
}
return info, nil
}
func (s *RedisStore) DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error {
return s.rc.HDel(s.ctx, SIPParticipantKey, info.SipParticipantId).Err()
}
func (s *RedisStore) ListSIPParticipant(ctx context.Context) (infos []*livekit.SIPParticipantInfo, err error) {
err = s.loadMany(ctx, SIPParticipantKey, func() proto.Message {
infos = append(infos, &livekit.SIPParticipantInfo{})
return infos[len(infos)-1]
})
return infos, err
}
func (s *RedisStore) SendSIPParticipantDTMF(ctx context.Context, info *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) {
return nil, fmt.Errorf("TODO")
}
+1 -1
View File
@@ -269,7 +269,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
close(done)
if signalStats != nil {
signalStats.Report()
signalStats.Stop()
}
}()
+3
View File
@@ -65,6 +65,7 @@ func NewLivekitServer(conf *config.Config,
roomService livekit.RoomService,
egressService *EgressService,
ingressService *IngressService,
sipService *SIPService,
ioService *IOInfoService,
rtcService *RTCService,
agentService *AgentService,
@@ -116,6 +117,7 @@ func NewLivekitServer(conf *config.Config,
),
))
ingressServer := livekit.NewIngressServer(ingressService, twirpLoggingHook)
sipServer := livekit.NewSIPServer(sipService, twirpLoggingHook)
mux := http.NewServeMux()
if conf.Development {
@@ -127,6 +129,7 @@ func NewLivekitServer(conf *config.Config,
mux.Handle(roomServer.PathPrefix(), roomServer)
mux.Handle(egressServer.PathPrefix(), egressServer)
mux.Handle(ingressServer.PathPrefix(), ingressServer)
mux.Handle(sipServer.PathPrefix(), sipServer)
mux.Handle("/rtc", rtcService)
mux.Handle("/agent", agentService)
mux.HandleFunc("/rtc/validate", rtcService.Validate)
+207
View File
@@ -0,0 +1,207 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"context"
"fmt"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
)
type SIPService struct {
conf *config.SIPConfig
nodeID livekit.NodeID
bus psrpc.MessageBus
psrpcClient rpc.SIPClient
store SIPStore
roomService livekit.RoomService
}
func NewSIPService(
conf *config.SIPConfig,
nodeID livekit.NodeID,
bus psrpc.MessageBus,
psrpcClient rpc.SIPClient,
store SIPStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
) *SIPService {
return &SIPService{
conf: conf,
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
store: store,
roomService: rs,
}
}
func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info := &livekit.SIPTrunkInfo{
SipTrunkId: utils.NewGuid(utils.SIPTrunkPrefix),
InboundAddresses: req.InboundAddresses,
OutboundAddress: req.OutboundAddress,
OutboundNumber: req.OutboundNumber,
InboundNumbersRegex: req.InboundNumbersRegex,
Username: req.Username,
Password: req.Password,
}
if err := s.store.StoreSIPTrunk(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
trunks, err := s.store.ListSIPTrunk(ctx)
if err != nil {
return nil, err
}
return &livekit.ListSIPTrunkResponse{Items: trunks}, nil
}
func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info, err := s.store.LoadSIPTrunk(ctx, req.SipTrunkId)
if err != nil {
return nil, err
}
if err = s.store.DeleteSIPTrunk(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.CreateSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info := &livekit.SIPDispatchRuleInfo{
SipDispatchRuleId: utils.NewGuid(utils.SIPDispatchRulePrefix),
Rule: req.Rule,
TrunkIds: req.TrunkIds,
HidePhoneNumber: req.HidePhoneNumber,
}
if err := s.store.StoreSIPDispatchRule(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
rules, err := s.store.ListSIPDispatchRule(ctx)
if err != nil {
return nil, err
}
return &livekit.ListSIPDispatchRuleResponse{Items: rules}, nil
}
func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.DeleteSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info, err := s.store.LoadSIPDispatchRule(ctx, req.SipDispatchRuleId)
if err != nil {
return nil, err
}
if err = s.store.DeleteSIPDispatchRule(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) CreateSIPParticipant(ctx context.Context, req *livekit.CreateSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info := &livekit.SIPParticipantInfo{
SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix),
}
if err := s.store.StoreSIPParticipant(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) ListSIPParticipant(ctx context.Context, req *livekit.ListSIPParticipantRequest) (*livekit.ListSIPParticipantResponse, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
participants, err := s.store.ListSIPParticipant(ctx)
if err != nil {
return nil, err
}
return &livekit.ListSIPParticipantResponse{Items: participants}, nil
}
func (s *SIPService) DeleteSIPParticipant(ctx context.Context, req *livekit.DeleteSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info, err := s.store.LoadSIPParticipant(ctx, req.SipParticipantId)
if err != nil {
return nil, err
}
if err = s.store.DeleteSIPParticipant(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) SendSIPParticipantDTMF(ctx context.Context, req *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
return nil, fmt.Errorf("TODO")
}
+17
View File
@@ -70,6 +70,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
getIngressStore,
getIngressConfig,
NewIngressService,
rpc.NewSIPClient,
getSIPStore,
getSIPConfig,
NewSIPService,
NewRoomAllocator,
NewRoomService,
NewRTCService,
@@ -195,6 +199,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getSIPStore(s ObjectStore) SIPStore {
switch store := s.(type) {
case *RedisStore:
return store
default:
return nil
}
}
func getSIPConfig(conf *config.Config) *config.SIPConfig {
return &conf.SIP
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}
+22 -2
View File
@@ -66,6 +66,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
egressStore := getEgressStore(objectStore)
ingressStore := getIngressStore(objectStore)
sipStore := getSIPStore(objectStore)
keyProvider, err := createKeyProvider(conf)
if err != nil {
return nil, err
@@ -76,7 +77,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService)
ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, telemetryService)
ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, sipStore, telemetryService)
if err != nil {
return nil, err
}
@@ -102,6 +103,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService)
sipConfig := getSIPConfig(conf)
sipClient, err := rpc.NewSIPClient(messageBus)
if err != nil {
return nil, err
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, agentClient, telemetryService)
agentService, err := NewAgentService(messageBus)
if err != nil {
@@ -123,7 +130,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode)
livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, sipService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode)
if err != nil {
return nil, err
}
@@ -237,6 +244,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getSIPStore(s ObjectStore) SIPStore {
switch store := s.(type) {
case *RedisStore:
return store
default:
return nil
}
}
func getSIPConfig(conf *config.Config) *config.SIPConfig {
return &conf.SIP
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}
+1 -1
View File
@@ -71,7 +71,7 @@ func (s *DataStats) ToProtoActive() *livekit.RTPStats {
return &livekit.RTPStats{
StartTime: timestamppb.New(time.Unix(s.windowStart/1e9, s.windowStart%1e9)),
EndTime: timestamppb.New(time.Now()),
EndTime: timestamppb.New(time.Unix(0, now)),
Duration: float64(duration / 1e9),
Bytes: uint64(s.windowBytes),
Bitrate: float64(s.windowBytes) * 8 / float64(duration) / 1e9,
+2 -2
View File
@@ -52,8 +52,8 @@ func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLay
ddExtID: ddExtID,
logger: logger,
onMaxLayerChanged: onMaxLayerChanged,
seqWrapAround: utils.NewWrapAround[uint16, uint64](),
frameWrapAround: utils.NewWrapAround[uint16, uint64](),
seqWrapAround: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
frameWrapAround: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
frameChecker: NewFrameIntegrityChecker(180, 1024), // 2seconds for L3T3 30fps video
}
}
+6 -56
View File
@@ -60,8 +60,8 @@ type RTPStatsReceiver struct {
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
return &RTPStatsReceiver{
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](),
timestamp: utils.NewWrapAround[uint32, uint64](),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
}
}
@@ -123,36 +123,16 @@ func (r *RTPStatsReceiver) Update(
)
} else {
resSN = r.sequenceNumber.Update(sequenceNumber)
if resSN.IsUnhandled {
flowState.IsNotHandled = true
return
}
resTS = r.timestamp.Update(timestamp)
}
pktSize := uint64(hdrSize + payloadSize + paddingSize)
gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
if gapSN <= 0 { // duplicate OR out-of-order
// before start, don't restart
if resTS.IsRestart {
r.logger.Infow(
"rolling back timestamp restart",
"tsBefore", resTS.PreExtendedStart,
"tsAfter", r.timestamp.GetExtendedStart(),
"snBefore", resSN.PreExtendedStart,
"snAfter", r.sequenceNumber.GetExtendedStart(),
)
r.timestamp.RollbackRestart(resTS.PreExtendedStart)
}
if resSN.IsRestart {
r.logger.Infow(
"rolling back sequence number restart",
"snBefore", resSN.PreExtendedStart,
"snAfter", r.sequenceNumber.GetExtendedStart(),
"tsBefore", resTS.PreExtendedStart,
"tsAfter", r.timestamp.GetExtendedStart(),
)
r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart)
flowState.IsNotHandled = true
return
}
if -gapSN >= cNumSequenceNumbers/2 {
r.logger.Warnw(
"large sequence number gap negative", nil,
@@ -179,36 +159,6 @@ func (r *RTPStatsReceiver) Update(
r.packetsOutOfOrder++
}
if resSN.IsRestart {
r.packetsLost += resSN.PreExtendedStart - resSN.ExtendedVal
extStartSN := r.sequenceNumber.GetExtendedStart()
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
s := &r.snapshots[i]
if s.extStartSN == resSN.PreExtendedStart {
s.extStartSN = extStartSN
}
}
r.logger.Infow(
"adjusting start sequence number",
"snBefore", resSN.PreExtendedStart,
"snAfter", resSN.ExtendedVal,
"tsBefore", resTS.PreExtendedStart,
"tsAfter", resTS.ExtendedVal,
)
}
if resTS.IsRestart {
r.logger.Infow(
"adjusting start timestamp",
"tsBefore", resTS.PreExtendedStart,
"tsAfter", resTS.ExtendedVal,
"snBefore", resSN.PreExtendedStart,
"snAfter", resSN.ExtendedVal,
)
}
if r.isInRange(resSN.ExtendedVal, resSN.PreExtendedHighest) {
if r.history.IsSet(resSN.ExtendedVal) {
r.bytesDuplicate += pktSize
+4 -1
View File
@@ -1033,7 +1033,10 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.bindLock.Unlock()
d.connectionStats.Close()
d.rtpStats.Stop()
d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString())
rtpStats := d.rtpStats.ToString()
if rtpStats != "" {
d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", rtpStats)
}
d.maxLayerNotifierChMu.Lock()
d.maxLayerNotifierChClosed = true
+24 -13
View File
@@ -26,7 +26,12 @@ type extendedNumber interface {
uint32 | uint64
}
type WrapAroundParams struct {
IsRestartAllowed bool
}
type WrapAround[T number, ET extendedNumber] struct {
params WrapAroundParams
fullRange ET
initialized bool
@@ -36,9 +41,10 @@ type WrapAround[T number, ET extendedNumber] struct {
extendedHighest ET
}
func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] {
func NewWrapAround[T number, ET extendedNumber](params WrapAroundParams) *WrapAround[T, ET] {
var t T
return &WrapAround[T, ET]{
params: params,
fullRange: 1 << (unsafe.Sizeof(t) * 8),
}
}
@@ -52,6 +58,7 @@ func (w *WrapAround[T, ET]) Seed(from *WrapAround[T, ET]) {
}
type WrapAroundUpdateResult[ET extendedNumber] struct {
IsUnhandled bool // when set, other fields are invalid
IsRestart bool
PreExtendedStart ET // valid only if IsRestart = true
PreExtendedHighest ET
@@ -140,20 +147,24 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (result WrapAroundUpdateResu
}
if val-w.start > T(w.fullRange>>1) {
// out-of-order with existing start => a new start
result.IsRestart = true
if val > w.start {
result.PreExtendedStart = w.fullRange + ET(w.start)
} else {
result.PreExtendedStart = ET(w.start)
}
if w.params.IsRestartAllowed {
// out-of-order with existing start => a new start
result.IsRestart = true
if val > w.start {
result.PreExtendedStart = w.fullRange + ET(w.start)
} else {
result.PreExtendedStart = ET(w.start)
}
if w.isWrapBack(val, w.highest) {
w.cycles = w.fullRange
w.updateExtendedHighest()
cycles = 0
if w.isWrapBack(val, w.highest) {
w.cycles = w.fullRange
w.updateExtendedHighest()
cycles = 0
}
w.start = val
} else {
result.IsUnhandled = true
}
w.start = val
} else {
if w.isWrapBack(val, w.highest) {
cycles -= w.fullRange
+139 -4
View File
@@ -21,7 +21,7 @@ import (
)
func TestWrapAroundUint16(t *testing.T) {
w := NewWrapAround[uint16, uint32]()
w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: true})
testCases := []struct {
name string
input uint16
@@ -194,8 +194,143 @@ func TestWrapAroundUint16(t *testing.T) {
}
}
func TestWrapAroundUint16NoRestart(t *testing.T) {
w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: false})
testCases := []struct {
name string
input uint16
updated WrapAroundUpdateResult[uint32]
start uint16
extendedStart uint32
highest uint16
extendedHighest uint32
}{
// initialize
{
name: "initialize",
input: 10,
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: 9,
ExtendedVal: 10,
},
start: 10,
extendedStart: 10,
highest: 10,
extendedHighest: 10,
},
// an older number without wrap around should not reset start point
{
name: "no reset start no wrap around",
input: 8,
updated: WrapAroundUpdateResult[uint32]{
IsUnhandled: true,
// the following fields are not valid when `IsUnhandled = true`, but code fills it in
// and they are filled in here for testing purposes
PreExtendedHighest: 10,
ExtendedVal: 8,
},
start: 10,
extendedStart: 10,
highest: 10,
extendedHighest: 10,
},
// an older number with wrap around should not reset start point
{
name: "no reset start wrap around",
input: (1 << 16) - 6,
updated: WrapAroundUpdateResult[uint32]{
IsUnhandled: true,
PreExtendedHighest: 10,
ExtendedVal: (1 << 16) - 6,
},
start: 10,
extendedStart: 10,
highest: 10,
extendedHighest: 10,
},
// yet another older number with wrap around should not reset start point
{
name: "no reset start again",
input: (1 << 16) - 12,
updated: WrapAroundUpdateResult[uint32]{
IsUnhandled: true,
PreExtendedHighest: 10,
ExtendedVal: (1 << 16) - 12,
},
start: 10,
extendedStart: 10,
highest: 10,
extendedHighest: 10,
},
// duplicate should return same as highest
{
name: "duplicate",
input: 10,
updated: WrapAroundUpdateResult[uint32]{
PreExtendedHighest: 10,
ExtendedVal: 10,
},
start: 10,
extendedStart: 10,
highest: 10,
extendedHighest: 10,
},
// a significant jump in order should move highest to that
{
name: "big in-order jump",
input: (1 << 15) - 10,
updated: WrapAroundUpdateResult[uint32]{
PreExtendedHighest: 10,
ExtendedVal: (1 << 15) - 10,
},
start: 10,
extendedStart: 10,
highest: (1 << 15) - 10,
extendedHighest: (1 << 15) - 10,
},
// in-order, should update highest
{
name: "in-order",
input: (1 << 15) + 13,
updated: WrapAroundUpdateResult[uint32]{
PreExtendedHighest: (1 << 15) - 10,
ExtendedVal: (1 << 15) + 13,
},
start: 10,
extendedStart: 10,
highest: (1 << 15) + 13,
extendedHighest: (1 << 15) + 13,
},
// now out-of-order should not reset start as half the range has been seen
{
name: "out-of-order after half range",
input: (1 << 15) - 11,
updated: WrapAroundUpdateResult[uint32]{
PreExtendedHighest: (1 << 15) + 13,
ExtendedVal: (1 << 15) - 11,
},
start: 10,
extendedStart: 10,
highest: (1 << 15) + 13,
extendedHighest: (1 << 15) + 13,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.updated, w.Update(tc.input))
require.Equal(t, tc.start, w.GetStart())
require.Equal(t, tc.extendedStart, w.GetExtendedStart())
require.Equal(t, tc.highest, w.GetHighest())
require.Equal(t, tc.extendedHighest, w.GetExtendedHighest())
})
}
}
func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) {
w := NewWrapAround[uint16, uint64]()
w := NewWrapAround[uint16, uint64](WrapAroundParams{IsRestartAllowed: true})
// initialize
w.Update(23)
@@ -268,7 +403,7 @@ func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) {
}
func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) {
w := NewWrapAround[uint16, uint64]()
w := NewWrapAround[uint16, uint64](WrapAroundParams{IsRestartAllowed: true})
// initialize
w.Update(65534)
@@ -314,7 +449,7 @@ func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) {
}
func TestWrapAroundUint32(t *testing.T) {
w := NewWrapAround[uint32, uint64]()
w := NewWrapAround[uint32, uint64](WrapAroundParams{IsRestartAllowed: true})
testCases := []struct {
name string
input uint32
+24 -28
View File
@@ -20,12 +20,11 @@ import (
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
)
const statsReportInterval = 10 * time.Second
type BytesTrackType string
const (
@@ -35,11 +34,11 @@ const (
// stats for signal and data channel
type BytesTrackStats struct {
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
lastStatsReport atomic.Value // *time.Time
telemetry TelemetryService
trackID livekit.TrackID
pID livekit.ParticipantID
send, recv atomic.Uint64
telemetry TelemetryService
isStopped atomic.Bool
}
func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats {
@@ -48,8 +47,7 @@ func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, tele
pID: pID,
telemetry: telemetry,
}
now := time.Now()
s.lastStatsReport.Store(&now)
go s.reporter()
return s
}
@@ -59,29 +57,13 @@ func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) {
} else {
s.recv.Add(bytes)
}
s.report(false)
}
func (s *BytesTrackStats) Report() {
s.report(true)
func (s *BytesTrackStats) Stop() {
s.isStopped.Store(true)
}
func (s *BytesTrackStats) report(force bool) {
now := time.Now()
if !force {
lr := s.lastStatsReport.Load().(*time.Time)
if time.Since(*lr) < statsReportInterval {
return
}
if !s.lastStatsReport.CompareAndSwap(lr, &now) {
return
}
} else {
s.lastStatsReport.Store(&now)
}
func (s *BytesTrackStats) report() {
if recv := s.recv.Swap(0); recv > 0 {
s.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_UPSTREAM, s.pID, s.trackID), &livekit.AnalyticsStat{
Streams: []*livekit.AnalyticsStream{
@@ -99,6 +81,20 @@ func (s *BytesTrackStats) report(force bool) {
}
}
func (s *BytesTrackStats) reporter() {
ticker := time.NewTicker(config.TelemetryStatsUpdateInterval)
defer ticker.Stop()
for !s.isStopped.Load() {
<-ticker.C
s.report()
}
s.report()
}
// -----------------------------------------------------------------------
func BytesTrackIDForParticipantID(typ BytesTrackType, participantID livekit.ParticipantID) livekit.TrackID {
return livekit.TrackID(fmt.Sprintf("%s_%s%s", utils.TrackPrefix, string(typ), participantID))
}
+2 -2
View File
@@ -126,8 +126,6 @@ func (s *StatsWorker) ClosedAt() time.Time {
return s.closedAt
}
// -------------------------------------------------------------------------
func (s *StatsWorker) collectStats(
ts *timestamppb.Timestamp,
streamType livekit.StreamType,
@@ -151,6 +149,8 @@ func (s *StatsWorker) collectStats(
return stats
}
// -------------------------------------------------------------------------
// create a single stream and single video layer post aggregation
func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat {
if len(stats) == 0 {