From 702e562f9ff760cf786f8bb6eb792c9533c74f26 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Tue, 14 Nov 2023 14:24:54 -0500 Subject: [PATCH] Add SIP Support (#2240) --- go.mod | 6 +- go.sum | 12 +-- pkg/config/config.go | 4 + pkg/service/errors.go | 4 + pkg/service/interfaces.go | 18 ++++ pkg/service/ioservice.go | 29 ++++++ pkg/service/redisstore.go | 130 ++++++++++++++++++++++++ pkg/service/server.go | 3 + pkg/service/sip.go | 207 ++++++++++++++++++++++++++++++++++++++ pkg/service/wire.go | 17 ++++ pkg/service/wire_gen.go | 24 ++++- 11 files changed, 443 insertions(+), 11 deletions(-) create mode 100644 pkg/service/sip.go diff --git a/go.mod b/go.mod index 6b590d2e9..79543f407 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e + github.com/livekit/protocol v1.9.1 github.com/livekit/psrpc v0.5.1 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -60,14 +60,14 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/go-jose/go-jose/v3 v3.0.0 // indirect + github.com/go-jose/go-jose/v3 v3.0.1 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.3.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-retryablehttp v0.7.4 // indirect + github.com/hashicorp/go-retryablehttp v0.7.5 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect github.com/klauspost/compress v1.17.2 // indirect diff --git a/go.sum b/go.sum index 1d5a2db31..3715a2edf 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0 github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= -github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo= -github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= +github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA= +github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= @@ -82,8 +82,8 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= -github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M= +github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e h1:YShBpEjkEBY7yil2gjMWlkVkxs3OI58LIIYsBdb8aBU= -github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ= +github.com/livekit/protocol v1.9.1 h1:RTycWwtGUoYLb+7WYosJ3IP8f0e8j4lLxC/osy/dQi0= +github.com/livekit/protocol v1.9.1/go.mod h1:kqDGmx+WZ6WMZ5V/T9lF8DgnuThAjetwjHq1nd7moSE= github.com/livekit/psrpc v0.5.1 h1:ihN5uKIvbU69UsFS4HdYmou5GuK0Dt4hix4eOmRS7o8= github.com/livekit/psrpc v0.5.1/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/config/config.go b/pkg/config/config.go index a87caf55b..a0c882f14 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -66,6 +66,7 @@ type Config struct { Room RoomConfig `yaml:"room,omitempty"` TURN TURNConfig `yaml:"turn,omitempty"` Ingress IngressConfig `yaml:"ingress,omitempty"` + SIP SIPConfig `yaml:"sip,omitempty"` WebHook WebHookConfig `yaml:"webhook,omitempty"` NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"` KeyFile string `yaml:"key_file,omitempty"` @@ -294,6 +295,9 @@ type IngressConfig struct { WHIPBaseURL string `yaml:"whip_base_url,omitempty"` } +type SIPConfig struct { +} + // not exposed to YAML type APIConfig struct { // amount of time to wait for API to execute, default 2s diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 7c27d0dac..9a77f5fde 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -34,4 +34,8 @@ var ( ErrRemoteUnmuteNoteEnabled = psrpc.NewErrorf(psrpc.FailedPrecondition, "remote unmute not enabled") ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found") ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks") + ErrSIPNotConnected = psrpc.NewErrorf(psrpc.Internal, "sip not connected (redis required)") + ErrSIPTrunkNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip trunk does not exist") + ErrSIPDispatchRuleNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip dispatch rule does not exist") + ErrSIPParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip participant does not exist") ) diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 6c32d7c5c..5dd0cb20b 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -76,3 +76,21 @@ type RoomAllocator interface { CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error } + +//counterfeiter:generate . SIPStore +type SIPStore interface { + StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error + LoadSIPTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPTrunkInfo, error) + ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error) + DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error + + StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error + LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) (*livekit.SIPDispatchRuleInfo, error) + ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error) + DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error + + StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error + LoadSIPParticipant(ctx context.Context, sipParticipantID string) (*livekit.SIPParticipantInfo, error) + ListSIPParticipant(ctx context.Context) ([]*livekit.SIPParticipantInfo, error) + DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error +} diff --git a/pkg/service/ioservice.go b/pkg/service/ioservice.go index b957e8b15..2d9070cca 100644 --- a/pkg/service/ioservice.go +++ b/pkg/service/ioservice.go @@ -17,6 +17,7 @@ package service import ( "context" "errors" + "fmt" "google.golang.org/protobuf/types/known/emptypb" @@ -32,6 +33,7 @@ type IOInfoService struct { es EgressStore is IngressStore + ss SIPStore telemetry telemetry.TelemetryService shutdown chan struct{} @@ -41,11 +43,13 @@ func NewIOInfoService( bus psrpc.MessageBus, es EgressStore, is IngressStore, + ss SIPStore, ts telemetry.TelemetryService, ) (*IOInfoService, error) { s := &IOInfoService{ es: es, is: is, + ss: ss, telemetry: ts, shutdown: make(chan struct{}), } @@ -211,6 +215,31 @@ func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateI return &emptypb.Empty{}, nil } +func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.EvaluateSIPDispatchRulesRequest) (*rpc.EvaluateSIPDispatchRulesResponse, error) { + dispatchRules, err := s.ss.ListSIPDispatchRule(ctx) + if err != nil { + return nil, err + } + + if len(dispatchRules) == 0 { + return nil, fmt.Errorf("No SIP Dispatch Rule Found") + } + + directDispatchRule := dispatchRules[0].Rule.GetDispatchRuleDirect() + if directDispatchRule == nil { + return nil, fmt.Errorf("No SIP Direct Dispatch Rule Found") + } + + return &rpc.EvaluateSIPDispatchRulesResponse{ + RoomName: directDispatchRule.RoomName, + ParticipantIdentity: req.CallingNumber, + }, nil +} + +func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc.GetSIPTrunkAuthenticationRequest) (*rpc.GetSIPTrunkAuthenticationResponse, error) { + return nil, nil +} + func (s *IOInfoService) Stop() { close(s.shutdown) diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index cc8c2e146..dfb4c3626 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -51,6 +51,10 @@ const ( IngressStatePrefix = "{ingress}_state:" RoomIngressPrefix = "room_{ingress}:" + SIPTrunkKey = "sip_trunk" + SIPDispatchRuleKey = "sip_dispatch_rule" + SIPParticipantKey = "sip_participant" + // RoomParticipantsPrefix is hash of participant_name => ParticipantInfo RoomParticipantsPrefix = "room_participants:" @@ -812,3 +816,129 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) return nil } + +func (s *RedisStore) loadOne(ctx context.Context, key, id string, info proto.Message, notFoundErr error) error { + data, err := s.rc.HGet(s.ctx, key, id).Result() + switch err { + case nil: + return proto.Unmarshal([]byte(data), info) + case redis.Nil: + return notFoundErr + default: + return err + } +} + +func (s *RedisStore) loadMany(ctx context.Context, key string, onResult func() proto.Message) error { + data, err := s.rc.HGetAll(s.ctx, key).Result() + if err != nil { + if err == redis.Nil { + return nil + } + return err + } + + for _, d := range data { + if err = proto.Unmarshal([]byte(d), onResult()); err != nil { + return err + } + } + + return nil +} + +func (s *RedisStore) StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error { + data, err := proto.Marshal(info) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, SIPTrunkKey, info.SipTrunkId, data).Err() +} + +func (s *RedisStore) LoadSIPTrunk(ctx context.Context, sipTrunkId string) (*livekit.SIPTrunkInfo, error) { + info := &livekit.SIPTrunkInfo{} + if err := s.loadOne(ctx, SIPTrunkKey, sipTrunkId, info, ErrSIPTrunkNotFound); err != nil { + return nil, err + } + + return info, nil +} + +func (s *RedisStore) DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error { + return s.rc.HDel(s.ctx, SIPTrunkKey, info.SipTrunkId).Err() +} + +func (s *RedisStore) ListSIPTrunk(ctx context.Context) (infos []*livekit.SIPTrunkInfo, err error) { + err = s.loadMany(ctx, SIPTrunkKey, func() proto.Message { + infos = append(infos, &livekit.SIPTrunkInfo{}) + return infos[len(infos)-1] + }) + + return infos, err +} + +func (s *RedisStore) StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error { + data, err := proto.Marshal(info) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId, data).Err() +} + +func (s *RedisStore) LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleId string) (*livekit.SIPDispatchRuleInfo, error) { + info := &livekit.SIPDispatchRuleInfo{} + if err := s.loadOne(ctx, SIPDispatchRuleKey, sipDispatchRuleId, info, ErrSIPDispatchRuleNotFound); err != nil { + return nil, err + } + + return info, nil +} + +func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error { + return s.rc.HDel(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId).Err() +} + +func (s *RedisStore) ListSIPDispatchRule(ctx context.Context) (infos []*livekit.SIPDispatchRuleInfo, err error) { + err = s.loadMany(ctx, SIPDispatchRuleKey, func() proto.Message { + infos = append(infos, &livekit.SIPDispatchRuleInfo{}) + return infos[len(infos)-1] + }) + + return infos, err +} + +func (s *RedisStore) StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error { + data, err := proto.Marshal(info) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, SIPParticipantKey, info.SipParticipantId, data).Err() +} +func (s *RedisStore) LoadSIPParticipant(ctx context.Context, sipParticipantId string) (*livekit.SIPParticipantInfo, error) { + info := &livekit.SIPParticipantInfo{} + if err := s.loadOne(ctx, SIPParticipantKey, sipParticipantId, info, ErrSIPParticipantNotFound); err != nil { + return nil, err + } + + return info, nil +} + +func (s *RedisStore) DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error { + return s.rc.HDel(s.ctx, SIPParticipantKey, info.SipParticipantId).Err() +} + +func (s *RedisStore) ListSIPParticipant(ctx context.Context) (infos []*livekit.SIPParticipantInfo, err error) { + err = s.loadMany(ctx, SIPParticipantKey, func() proto.Message { + infos = append(infos, &livekit.SIPParticipantInfo{}) + return infos[len(infos)-1] + }) + + return infos, err +} + +func (s *RedisStore) SendSIPParticipantDTMF(ctx context.Context, info *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) { + return nil, fmt.Errorf("TODO") +} diff --git a/pkg/service/server.go b/pkg/service/server.go index f7ade50d2..f66d848b5 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -65,6 +65,7 @@ func NewLivekitServer(conf *config.Config, roomService livekit.RoomService, egressService *EgressService, ingressService *IngressService, + sipService *SIPService, ioService *IOInfoService, rtcService *RTCService, agentService *AgentService, @@ -116,6 +117,7 @@ func NewLivekitServer(conf *config.Config, ), )) ingressServer := livekit.NewIngressServer(ingressService, twirpLoggingHook) + sipServer := livekit.NewSIPServer(sipService, twirpLoggingHook) mux := http.NewServeMux() if conf.Development { @@ -127,6 +129,7 @@ func NewLivekitServer(conf *config.Config, mux.Handle(roomServer.PathPrefix(), roomServer) mux.Handle(egressServer.PathPrefix(), egressServer) mux.Handle(ingressServer.PathPrefix(), ingressServer) + mux.Handle(sipServer.PathPrefix(), sipServer) mux.Handle("/rtc", rtcService) mux.Handle("/agent", agentService) mux.HandleFunc("/rtc/validate", rtcService.Validate) diff --git a/pkg/service/sip.go b/pkg/service/sip.go new file mode 100644 index 000000000..4a7fe411a --- /dev/null +++ b/pkg/service/sip.go @@ -0,0 +1,207 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "context" + "fmt" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" +) + +type SIPService struct { + conf *config.SIPConfig + nodeID livekit.NodeID + bus psrpc.MessageBus + psrpcClient rpc.SIPClient + store SIPStore + roomService livekit.RoomService +} + +func NewSIPService( + conf *config.SIPConfig, + nodeID livekit.NodeID, + bus psrpc.MessageBus, + psrpcClient rpc.SIPClient, + store SIPStore, + rs livekit.RoomService, + ts telemetry.TelemetryService, +) *SIPService { + return &SIPService{ + conf: conf, + nodeID: nodeID, + bus: bus, + psrpcClient: psrpcClient, + store: store, + roomService: rs, + } +} + +func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info := &livekit.SIPTrunkInfo{ + SipTrunkId: utils.NewGuid(utils.SIPTrunkPrefix), + InboundAddresses: req.InboundAddresses, + OutboundAddress: req.OutboundAddress, + OutboundNumber: req.OutboundNumber, + InboundNumbersRegex: req.InboundNumbersRegex, + Username: req.Username, + Password: req.Password, + } + + if err := s.store.StoreSIPTrunk(ctx, info); err != nil { + return nil, err + } + return info, nil +} + +func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + trunks, err := s.store.ListSIPTrunk(ctx) + if err != nil { + return nil, err + } + + return &livekit.ListSIPTrunkResponse{Items: trunks}, nil +} + +func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info, err := s.store.LoadSIPTrunk(ctx, req.SipTrunkId) + if err != nil { + return nil, err + } + + if err = s.store.DeleteSIPTrunk(ctx, info); err != nil { + return nil, err + } + + return info, nil +} + +func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.CreateSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info := &livekit.SIPDispatchRuleInfo{ + SipDispatchRuleId: utils.NewGuid(utils.SIPDispatchRulePrefix), + Rule: req.Rule, + TrunkIds: req.TrunkIds, + HidePhoneNumber: req.HidePhoneNumber, + } + + if err := s.store.StoreSIPDispatchRule(ctx, info); err != nil { + return nil, err + } + return info, nil +} + +func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + rules, err := s.store.ListSIPDispatchRule(ctx) + if err != nil { + return nil, err + } + + return &livekit.ListSIPDispatchRuleResponse{Items: rules}, nil +} + +func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.DeleteSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info, err := s.store.LoadSIPDispatchRule(ctx, req.SipDispatchRuleId) + if err != nil { + return nil, err + } + + if err = s.store.DeleteSIPDispatchRule(ctx, info); err != nil { + return nil, err + } + + return info, nil +} + +func (s *SIPService) CreateSIPParticipant(ctx context.Context, req *livekit.CreateSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info := &livekit.SIPParticipantInfo{ + SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix), + } + + if err := s.store.StoreSIPParticipant(ctx, info); err != nil { + return nil, err + } + return info, nil +} + +func (s *SIPService) ListSIPParticipant(ctx context.Context, req *livekit.ListSIPParticipantRequest) (*livekit.ListSIPParticipantResponse, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + participants, err := s.store.ListSIPParticipant(ctx) + if err != nil { + return nil, err + } + + return &livekit.ListSIPParticipantResponse{Items: participants}, nil +} + +func (s *SIPService) DeleteSIPParticipant(ctx context.Context, req *livekit.DeleteSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info, err := s.store.LoadSIPParticipant(ctx, req.SipParticipantId) + if err != nil { + return nil, err + } + + if err = s.store.DeleteSIPParticipant(ctx, info); err != nil { + return nil, err + } + + return info, nil +} + +func (s *SIPService) SendSIPParticipantDTMF(ctx context.Context, req *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + return nil, fmt.Errorf("TODO") +} diff --git a/pkg/service/wire.go b/pkg/service/wire.go index afca2631e..93e3784b0 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -70,6 +70,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live getIngressStore, getIngressConfig, NewIngressService, + rpc.NewSIPClient, + getSIPStore, + getSIPConfig, + NewSIPService, NewRoomAllocator, NewRoomService, NewRTCService, @@ -195,6 +199,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } +func getSIPStore(s ObjectStore) SIPStore { + switch store := s.(type) { + case *RedisStore: + return store + default: + return nil + } +} + +func getSIPConfig(conf *config.Config) *config.SIPConfig { + return &conf.SIP +} + func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 3a6470b88..0f0eb6e87 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -66,6 +66,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressStore := getEgressStore(objectStore) ingressStore := getIngressStore(objectStore) + sipStore := getSIPStore(objectStore) keyProvider, err := createKeyProvider(conf) if err != nil { return nil, err @@ -76,7 +77,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService) - ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, telemetryService) + ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, sipStore, telemetryService) if err != nil { return nil, err } @@ -102,6 +103,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService) + sipConfig := getSIPConfig(conf) + sipClient, err := rpc.NewSIPClient(messageBus) + if err != nil { + return nil, err + } + sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, agentClient, telemetryService) agentService, err := NewAgentService(messageBus) if err != nil { @@ -123,7 +130,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode) + livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, sipService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode) if err != nil { return nil, err } @@ -237,6 +244,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } +func getSIPStore(s ObjectStore) SIPStore { + switch store := s.(type) { + case *RedisStore: + return store + default: + return nil + } +} + +func getSIPConfig(conf *config.Config) *config.SIPConfig { + return &conf.SIP +} + func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) }