Add SIP Support (#2240)

This commit is contained in:
Sean DuBois
2023-11-14 14:24:54 -05:00
committed by GitHub
parent 5ec692bb03
commit 702e562f9f
11 changed files with 443 additions and 11 deletions

6
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e
github.com/livekit/protocol v1.9.1
github.com/livekit/psrpc v0.5.1
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
@@ -60,14 +60,14 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/channels v1.1.0 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.0 // indirect
github.com/go-jose/go-jose/v3 v3.0.1 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/subcommands v1.2.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect

12
go.sum
View File

@@ -40,8 +40,8 @@ github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q=
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo=
github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA=
github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
@@ -82,8 +82,8 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA=
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M=
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
@@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e h1:YShBpEjkEBY7yil2gjMWlkVkxs3OI58LIIYsBdb8aBU=
github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ=
github.com/livekit/protocol v1.9.1 h1:RTycWwtGUoYLb+7WYosJ3IP8f0e8j4lLxC/osy/dQi0=
github.com/livekit/protocol v1.9.1/go.mod h1:kqDGmx+WZ6WMZ5V/T9lF8DgnuThAjetwjHq1nd7moSE=
github.com/livekit/psrpc v0.5.1 h1:ihN5uKIvbU69UsFS4HdYmou5GuK0Dt4hix4eOmRS7o8=
github.com/livekit/psrpc v0.5.1/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=

View File

@@ -66,6 +66,7 @@ type Config struct {
Room RoomConfig `yaml:"room,omitempty"`
TURN TURNConfig `yaml:"turn,omitempty"`
Ingress IngressConfig `yaml:"ingress,omitempty"`
SIP SIPConfig `yaml:"sip,omitempty"`
WebHook WebHookConfig `yaml:"webhook,omitempty"`
NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"`
KeyFile string `yaml:"key_file,omitempty"`
@@ -294,6 +295,9 @@ type IngressConfig struct {
WHIPBaseURL string `yaml:"whip_base_url,omitempty"`
}
type SIPConfig struct {
}
// not exposed to YAML
type APIConfig struct {
// amount of time to wait for API to execute, default 2s

View File

@@ -34,4 +34,8 @@ var (
ErrRemoteUnmuteNoteEnabled = psrpc.NewErrorf(psrpc.FailedPrecondition, "remote unmute not enabled")
ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found")
ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks")
ErrSIPNotConnected = psrpc.NewErrorf(psrpc.Internal, "sip not connected (redis required)")
ErrSIPTrunkNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip trunk does not exist")
ErrSIPDispatchRuleNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip dispatch rule does not exist")
ErrSIPParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip participant does not exist")
)

View File

@@ -76,3 +76,21 @@ type RoomAllocator interface {
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error)
ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error
}
//counterfeiter:generate . SIPStore
type SIPStore interface {
StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error
LoadSIPTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPTrunkInfo, error)
ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error)
DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error
StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error
LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) (*livekit.SIPDispatchRuleInfo, error)
ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error)
DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error
StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error
LoadSIPParticipant(ctx context.Context, sipParticipantID string) (*livekit.SIPParticipantInfo, error)
ListSIPParticipant(ctx context.Context) ([]*livekit.SIPParticipantInfo, error)
DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error
}

View File

@@ -17,6 +17,7 @@ package service
import (
"context"
"errors"
"fmt"
"google.golang.org/protobuf/types/known/emptypb"
@@ -32,6 +33,7 @@ type IOInfoService struct {
es EgressStore
is IngressStore
ss SIPStore
telemetry telemetry.TelemetryService
shutdown chan struct{}
@@ -41,11 +43,13 @@ func NewIOInfoService(
bus psrpc.MessageBus,
es EgressStore,
is IngressStore,
ss SIPStore,
ts telemetry.TelemetryService,
) (*IOInfoService, error) {
s := &IOInfoService{
es: es,
is: is,
ss: ss,
telemetry: ts,
shutdown: make(chan struct{}),
}
@@ -211,6 +215,31 @@ func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateI
return &emptypb.Empty{}, nil
}
func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.EvaluateSIPDispatchRulesRequest) (*rpc.EvaluateSIPDispatchRulesResponse, error) {
dispatchRules, err := s.ss.ListSIPDispatchRule(ctx)
if err != nil {
return nil, err
}
if len(dispatchRules) == 0 {
return nil, fmt.Errorf("No SIP Dispatch Rule Found")
}
directDispatchRule := dispatchRules[0].Rule.GetDispatchRuleDirect()
if directDispatchRule == nil {
return nil, fmt.Errorf("No SIP Direct Dispatch Rule Found")
}
return &rpc.EvaluateSIPDispatchRulesResponse{
RoomName: directDispatchRule.RoomName,
ParticipantIdentity: req.CallingNumber,
}, nil
}
func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc.GetSIPTrunkAuthenticationRequest) (*rpc.GetSIPTrunkAuthenticationResponse, error) {
return nil, nil
}
func (s *IOInfoService) Stop() {
close(s.shutdown)

View File

@@ -51,6 +51,10 @@ const (
IngressStatePrefix = "{ingress}_state:"
RoomIngressPrefix = "room_{ingress}:"
SIPTrunkKey = "sip_trunk"
SIPDispatchRuleKey = "sip_dispatch_rule"
SIPParticipantKey = "sip_participant"
// RoomParticipantsPrefix is hash of participant_name => ParticipantInfo
RoomParticipantsPrefix = "room_participants:"
@@ -812,3 +816,129 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo)
return nil
}
func (s *RedisStore) loadOne(ctx context.Context, key, id string, info proto.Message, notFoundErr error) error {
data, err := s.rc.HGet(s.ctx, key, id).Result()
switch err {
case nil:
return proto.Unmarshal([]byte(data), info)
case redis.Nil:
return notFoundErr
default:
return err
}
}
func (s *RedisStore) loadMany(ctx context.Context, key string, onResult func() proto.Message) error {
data, err := s.rc.HGetAll(s.ctx, key).Result()
if err != nil {
if err == redis.Nil {
return nil
}
return err
}
for _, d := range data {
if err = proto.Unmarshal([]byte(d), onResult()); err != nil {
return err
}
}
return nil
}
func (s *RedisStore) StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error {
data, err := proto.Marshal(info)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, SIPTrunkKey, info.SipTrunkId, data).Err()
}
func (s *RedisStore) LoadSIPTrunk(ctx context.Context, sipTrunkId string) (*livekit.SIPTrunkInfo, error) {
info := &livekit.SIPTrunkInfo{}
if err := s.loadOne(ctx, SIPTrunkKey, sipTrunkId, info, ErrSIPTrunkNotFound); err != nil {
return nil, err
}
return info, nil
}
func (s *RedisStore) DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error {
return s.rc.HDel(s.ctx, SIPTrunkKey, info.SipTrunkId).Err()
}
func (s *RedisStore) ListSIPTrunk(ctx context.Context) (infos []*livekit.SIPTrunkInfo, err error) {
err = s.loadMany(ctx, SIPTrunkKey, func() proto.Message {
infos = append(infos, &livekit.SIPTrunkInfo{})
return infos[len(infos)-1]
})
return infos, err
}
func (s *RedisStore) StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error {
data, err := proto.Marshal(info)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId, data).Err()
}
func (s *RedisStore) LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleId string) (*livekit.SIPDispatchRuleInfo, error) {
info := &livekit.SIPDispatchRuleInfo{}
if err := s.loadOne(ctx, SIPDispatchRuleKey, sipDispatchRuleId, info, ErrSIPDispatchRuleNotFound); err != nil {
return nil, err
}
return info, nil
}
func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error {
return s.rc.HDel(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId).Err()
}
func (s *RedisStore) ListSIPDispatchRule(ctx context.Context) (infos []*livekit.SIPDispatchRuleInfo, err error) {
err = s.loadMany(ctx, SIPDispatchRuleKey, func() proto.Message {
infos = append(infos, &livekit.SIPDispatchRuleInfo{})
return infos[len(infos)-1]
})
return infos, err
}
func (s *RedisStore) StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error {
data, err := proto.Marshal(info)
if err != nil {
return err
}
return s.rc.HSet(s.ctx, SIPParticipantKey, info.SipParticipantId, data).Err()
}
func (s *RedisStore) LoadSIPParticipant(ctx context.Context, sipParticipantId string) (*livekit.SIPParticipantInfo, error) {
info := &livekit.SIPParticipantInfo{}
if err := s.loadOne(ctx, SIPParticipantKey, sipParticipantId, info, ErrSIPParticipantNotFound); err != nil {
return nil, err
}
return info, nil
}
func (s *RedisStore) DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error {
return s.rc.HDel(s.ctx, SIPParticipantKey, info.SipParticipantId).Err()
}
func (s *RedisStore) ListSIPParticipant(ctx context.Context) (infos []*livekit.SIPParticipantInfo, err error) {
err = s.loadMany(ctx, SIPParticipantKey, func() proto.Message {
infos = append(infos, &livekit.SIPParticipantInfo{})
return infos[len(infos)-1]
})
return infos, err
}
func (s *RedisStore) SendSIPParticipantDTMF(ctx context.Context, info *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) {
return nil, fmt.Errorf("TODO")
}

View File

@@ -65,6 +65,7 @@ func NewLivekitServer(conf *config.Config,
roomService livekit.RoomService,
egressService *EgressService,
ingressService *IngressService,
sipService *SIPService,
ioService *IOInfoService,
rtcService *RTCService,
agentService *AgentService,
@@ -116,6 +117,7 @@ func NewLivekitServer(conf *config.Config,
),
))
ingressServer := livekit.NewIngressServer(ingressService, twirpLoggingHook)
sipServer := livekit.NewSIPServer(sipService, twirpLoggingHook)
mux := http.NewServeMux()
if conf.Development {
@@ -127,6 +129,7 @@ func NewLivekitServer(conf *config.Config,
mux.Handle(roomServer.PathPrefix(), roomServer)
mux.Handle(egressServer.PathPrefix(), egressServer)
mux.Handle(ingressServer.PathPrefix(), ingressServer)
mux.Handle(sipServer.PathPrefix(), sipServer)
mux.Handle("/rtc", rtcService)
mux.Handle("/agent", agentService)
mux.HandleFunc("/rtc/validate", rtcService.Validate)

207
pkg/service/sip.go Normal file
View File

@@ -0,0 +1,207 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"context"
"fmt"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
)
type SIPService struct {
conf *config.SIPConfig
nodeID livekit.NodeID
bus psrpc.MessageBus
psrpcClient rpc.SIPClient
store SIPStore
roomService livekit.RoomService
}
func NewSIPService(
conf *config.SIPConfig,
nodeID livekit.NodeID,
bus psrpc.MessageBus,
psrpcClient rpc.SIPClient,
store SIPStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
) *SIPService {
return &SIPService{
conf: conf,
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
store: store,
roomService: rs,
}
}
func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info := &livekit.SIPTrunkInfo{
SipTrunkId: utils.NewGuid(utils.SIPTrunkPrefix),
InboundAddresses: req.InboundAddresses,
OutboundAddress: req.OutboundAddress,
OutboundNumber: req.OutboundNumber,
InboundNumbersRegex: req.InboundNumbersRegex,
Username: req.Username,
Password: req.Password,
}
if err := s.store.StoreSIPTrunk(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
trunks, err := s.store.ListSIPTrunk(ctx)
if err != nil {
return nil, err
}
return &livekit.ListSIPTrunkResponse{Items: trunks}, nil
}
func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info, err := s.store.LoadSIPTrunk(ctx, req.SipTrunkId)
if err != nil {
return nil, err
}
if err = s.store.DeleteSIPTrunk(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.CreateSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info := &livekit.SIPDispatchRuleInfo{
SipDispatchRuleId: utils.NewGuid(utils.SIPDispatchRulePrefix),
Rule: req.Rule,
TrunkIds: req.TrunkIds,
HidePhoneNumber: req.HidePhoneNumber,
}
if err := s.store.StoreSIPDispatchRule(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
rules, err := s.store.ListSIPDispatchRule(ctx)
if err != nil {
return nil, err
}
return &livekit.ListSIPDispatchRuleResponse{Items: rules}, nil
}
func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.DeleteSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info, err := s.store.LoadSIPDispatchRule(ctx, req.SipDispatchRuleId)
if err != nil {
return nil, err
}
if err = s.store.DeleteSIPDispatchRule(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) CreateSIPParticipant(ctx context.Context, req *livekit.CreateSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info := &livekit.SIPParticipantInfo{
SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix),
}
if err := s.store.StoreSIPParticipant(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) ListSIPParticipant(ctx context.Context, req *livekit.ListSIPParticipantRequest) (*livekit.ListSIPParticipantResponse, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
participants, err := s.store.ListSIPParticipant(ctx)
if err != nil {
return nil, err
}
return &livekit.ListSIPParticipantResponse{Items: participants}, nil
}
func (s *SIPService) DeleteSIPParticipant(ctx context.Context, req *livekit.DeleteSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
info, err := s.store.LoadSIPParticipant(ctx, req.SipParticipantId)
if err != nil {
return nil, err
}
if err = s.store.DeleteSIPParticipant(ctx, info); err != nil {
return nil, err
}
return info, nil
}
func (s *SIPService) SendSIPParticipantDTMF(ctx context.Context, req *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) {
if s.store == nil {
return nil, ErrSIPNotConnected
}
return nil, fmt.Errorf("TODO")
}

View File

@@ -70,6 +70,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
getIngressStore,
getIngressConfig,
NewIngressService,
rpc.NewSIPClient,
getSIPStore,
getSIPConfig,
NewSIPService,
NewRoomAllocator,
NewRoomService,
NewRTCService,
@@ -195,6 +199,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getSIPStore(s ObjectStore) SIPStore {
switch store := s.(type) {
case *RedisStore:
return store
default:
return nil
}
}
func getSIPConfig(conf *config.Config) *config.SIPConfig {
return &conf.SIP
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}

View File

@@ -66,6 +66,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
egressStore := getEgressStore(objectStore)
ingressStore := getIngressStore(objectStore)
sipStore := getSIPStore(objectStore)
keyProvider, err := createKeyProvider(conf)
if err != nil {
return nil, err
@@ -76,7 +77,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService)
ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, telemetryService)
ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, sipStore, telemetryService)
if err != nil {
return nil, err
}
@@ -102,6 +103,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
return nil, err
}
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService)
sipConfig := getSIPConfig(conf)
sipClient, err := rpc.NewSIPClient(messageBus)
if err != nil {
return nil, err
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, agentClient, telemetryService)
agentService, err := NewAgentService(messageBus)
if err != nil {
@@ -123,7 +130,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode)
livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, sipService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode)
if err != nil {
return nil, err
}
@@ -237,6 +244,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getSIPStore(s ObjectStore) SIPStore {
switch store := s.(type) {
case *RedisStore:
return store
default:
return nil
}
}
func getSIPConfig(conf *config.Config) *config.SIPConfig {
return &conf.SIP
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}