Implement SIP iterators. (#3332)

This commit is contained in:
Denys Smirnov
2025-02-20 13:13:21 +02:00
committed by GitHub
parent 363353d6e5
commit 60a09cb4be
12 changed files with 836 additions and 253 deletions

4
go.mod
View File

@@ -7,6 +7,7 @@ toolchain go1.23.6
require (
github.com/bep/debounce v1.2.1
github.com/d5/tengo/v2 v2.17.0
github.com/dennwc/iters v1.0.1
github.com/dustin/go-humanize v1.0.1
github.com/elliotchance/orderedmap/v2 v2.7.0
github.com/florianl/go-tc v0.4.4
@@ -22,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564
github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83
github.com/livekit/protocol v1.33.1-0.20250218100713-bfe8101478c7
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -77,7 +78,6 @@ require (
github.com/containerd/continuity v0.4.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/iters v1.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v26.1.4+incompatible // indirect
github.com/docker/docker v27.1.1+incompatible // indirect

4
go.sum
View File

@@ -170,8 +170,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY=
github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83 h1:1HFZ41AaFE+disN7Md9g0MQNpnw9Y2p3QKbjYNtjQjA=
github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So=
github.com/livekit/protocol v1.33.1-0.20250218100713-bfe8101478c7 h1:c40dw41dbIm9Wdeezv8fuYfQicbFotYji3f7CrVa7lU=
github.com/livekit/protocol v1.33.1-0.20250218100713-bfe8101478c7/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=

View File

@@ -87,15 +87,15 @@ type SIPStore interface {
LoadSIPTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPTrunkInfo, error)
LoadSIPInboundTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPInboundTrunkInfo, error)
LoadSIPOutboundTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPOutboundTrunkInfo, error)
ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error)
ListSIPInboundTrunk(ctx context.Context) ([]*livekit.SIPInboundTrunkInfo, error)
ListSIPOutboundTrunk(ctx context.Context) ([]*livekit.SIPOutboundTrunkInfo, error)
ListSIPTrunk(ctx context.Context, opts *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error)
ListSIPInboundTrunk(ctx context.Context, opts *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error)
ListSIPOutboundTrunk(ctx context.Context, opts *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error)
DeleteSIPTrunk(ctx context.Context, sipTrunkID string) error
StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error
LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) (*livekit.SIPDispatchRuleInfo, error)
ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error)
DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error
ListSIPDispatchRule(ctx context.Context, opts *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error)
DeleteSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) error
}
//counterfeiter:generate . AgentStore

View File

@@ -19,6 +19,7 @@ import (
"errors"
"net/netip"
"github.com/dennwc/iters"
"github.com/twitchtv/twirp"
"github.com/livekit/protocol/livekit"
@@ -30,23 +31,54 @@ import (
// matchSIPTrunk finds a SIP Trunk definition matching the request.
// Returns nil if no rules matched or an error if there are conflicting definitions.
func (s *IOInfoService) matchSIPTrunk(ctx context.Context, trunkID, calling, called string, srcIP netip.Addr) (*livekit.SIPInboundTrunkInfo, error) {
trunks, err := s.ss.ListSIPInboundTrunk(ctx)
if err != nil {
return nil, err
if s.ss == nil {
return nil, ErrSIPNotConnected
}
return sip.MatchTrunk(trunks, srcIP, calling, called)
if trunkID != "" {
// This is a best-effort optimization. Fallthrough to listing trunks if it doesn't work.
if tr, err := s.ss.LoadSIPInboundTrunk(ctx, trunkID); err == nil {
tr, err = sip.MatchTrunkIter(iters.Slice([]*livekit.SIPInboundTrunkInfo{tr}), srcIP, calling, called)
if err == nil {
return tr, nil
}
}
}
it := s.SelectSIPInboundTrunk(ctx, called)
return sip.MatchTrunkIter(it, srcIP, calling, called)
}
func (s *IOInfoService) SelectSIPInboundTrunk(ctx context.Context, called string) iters.Iter[*livekit.SIPInboundTrunkInfo] {
it := livekit.ListPageIter(s.ss.ListSIPInboundTrunk, &livekit.ListSIPInboundTrunkRequest{
Numbers: []string{called},
})
return iters.PagesAsIter(ctx, it)
}
// matchSIPDispatchRule finds the best dispatch rule matching the request parameters. Returns an error if no rule matched.
// Trunk parameter can be nil, in which case only wildcard dispatch rules will be effective (ones without Trunk IDs).
func (s *IOInfoService) matchSIPDispatchRule(ctx context.Context, trunk *livekit.SIPInboundTrunkInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) {
if s.ss == nil {
return nil, ErrSIPNotConnected
}
var trunkID string
if trunk != nil {
trunkID = trunk.SipTrunkId
}
// Trunk can still be nil here in case none matched or were defined.
// This is still fine, but only in case we'll match exactly one wildcard dispatch rule.
rules, err := s.ss.ListSIPDispatchRule(ctx)
if err != nil {
return nil, err
it := s.SelectSIPDispatchRule(ctx, trunkID)
return sip.MatchDispatchRuleIter(trunk, it, req)
}
func (s *IOInfoService) SelectSIPDispatchRule(ctx context.Context, trunkID string) iters.Iter[*livekit.SIPDispatchRuleInfo] {
var trunkIDs []string
if trunkID != "" {
trunkIDs = []string{trunkID}
}
return sip.MatchDispatchRule(trunk, rules, req)
it := livekit.ListPageIter(s.ss.ListSIPDispatchRule, &livekit.ListSIPDispatchRuleRequest{
TrunkIds: trunkIDs,
})
return iters.PagesAsIter(ctx, it)
}
func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.EvaluateSIPDispatchRulesRequest) (*rpc.EvaluateSIPDispatchRulesResponse, error) {

View File

@@ -0,0 +1,122 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service_test
import (
"context"
"github.com/dennwc/iters"
"github.com/livekit/livekit-server/pkg/service"
"github.com/livekit/psrpc"
"slices"
"testing"
"github.com/livekit/protocol/livekit"
"github.com/stretchr/testify/require"
)
func ioStoreDocker(t testing.TB) (*service.IOInfoService, *service.RedisStore) {
r := redisClientDocker(t)
bus := psrpc.NewRedisMessageBus(r)
rs := service.NewRedisStore(r)
io, err := service.NewIOInfoService(bus, rs, rs, rs, nil)
require.NoError(t, err)
return io, rs
}
func TestSIPTrunkSelect(t *testing.T) {
ctx := context.Background()
s, rs := ioStoreDocker(t)
for _, tr := range []*livekit.SIPInboundTrunkInfo{
{SipTrunkId: "any", Numbers: nil},
{SipTrunkId: "B", Numbers: []string{"B1", "B2"}},
{SipTrunkId: "BC", Numbers: []string{"B1", "C1"}},
} {
err := rs.StoreSIPInboundTrunk(ctx, tr)
require.NoError(t, err)
}
for _, tr := range []*livekit.SIPTrunkInfo{
{SipTrunkId: "old-any", OutboundNumber: ""},
{SipTrunkId: "old-A", OutboundNumber: "A"},
} {
err := rs.StoreSIPTrunk(ctx, tr)
require.NoError(t, err)
}
for _, c := range []struct {
number string
exp []string
}{
{"A", []string{"old-A", "old-any", "any"}},
{"B1", []string{"B", "BC", "old-any", "any"}},
{"B2", []string{"B", "old-any", "any"}},
{"C1", []string{"BC", "old-any", "any"}},
{"wrong", []string{"old-any", "any"}},
} {
t.Run(c.number, func(t *testing.T) {
it := s.SelectSIPInboundTrunk(ctx, c.number)
defer it.Close()
list, err := iters.All(it)
require.NoError(t, err)
var ids []string
for _, v := range list {
ids = append(ids, v.SipTrunkId)
}
slices.Sort(c.exp)
slices.Sort(ids)
require.Equal(t, c.exp, ids)
})
}
}
func TestSIPRuleSelect(t *testing.T) {
ctx := context.Background()
s, rs := ioStoreDocker(t)
for _, r := range []*livekit.SIPDispatchRuleInfo{
{SipDispatchRuleId: "any", TrunkIds: nil},
{SipDispatchRuleId: "B", TrunkIds: []string{"B1", "B2"}},
{SipDispatchRuleId: "BC", TrunkIds: []string{"B1", "C1"}},
} {
err := rs.StoreSIPDispatchRule(ctx, r)
require.NoError(t, err)
}
for _, c := range []struct {
trunk string
exp []string
}{
{"A", []string{"any"}},
{"B1", []string{"B", "BC", "any"}},
{"B2", []string{"B", "any"}},
{"C1", []string{"BC", "any"}},
{"wrong", []string{"any"}},
} {
t.Run(c.trunk, func(t *testing.T) {
it := s.SelectSIPDispatchRule(ctx, c.trunk)
defer it.Close()
list, err := iters.All(it)
require.NoError(t, err)
var ids []string
for _, v := range list {
ids = append(ids, v.SipDispatchRuleId)
}
slices.Sort(c.exp)
slices.Sort(ids)
require.Equal(t, c.exp, ids)
})
}
}

View File

@@ -17,6 +17,7 @@ package service
import (
"context"
"fmt"
"slices"
"strconv"
"strings"
"time"
@@ -858,7 +859,7 @@ func (s *RedisStore) DeleteAgentDispatch(_ context.Context, dispatch *livekit.Ag
func (s *RedisStore) ListAgentDispatches(_ context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) {
key := AgentDispatchPrefix + string(roomName)
dispatches, err := redisLoadMany[livekit.AgentDispatch](s.ctx, s, key)
dispatches, err := redisLoadAll[livekit.AgentDispatch](s.ctx, s, key)
if err != nil {
return nil, err
}
@@ -869,7 +870,7 @@ func (s *RedisStore) ListAgentDispatches(_ context.Context, roomName livekit.Roo
}
key = AgentJobPrefix + string(roomName)
jobs, err := redisLoadMany[livekit.Job](s.ctx, s, key)
jobs, err := redisLoadAll[livekit.Job](s.ctx, s, key)
if err != nil {
return nil, err
}
@@ -936,10 +937,12 @@ func redisStoreOne(ctx context.Context, s *RedisStore, key, id string, p proto.M
return s.rc.HSet(s.ctx, key, id, data).Err()
}
func redisLoadOne[T any, P interface {
type protoMsg[T any] interface {
*T
proto.Message
}](ctx context.Context, s *RedisStore, key, id string, notFoundErr error) (P, error) {
}
func redisLoadOne[T any, P protoMsg[T]](ctx context.Context, s *RedisStore, key, id string, notFoundErr error) (P, error) {
data, err := s.rc.HGet(s.ctx, key, id).Result()
if err == redis.Nil {
return nil, notFoundErr
@@ -954,11 +957,8 @@ func redisLoadOne[T any, P interface {
return p, err
}
func redisLoadMany[T any, P interface {
*T
proto.Message
}](ctx context.Context, s *RedisStore, key string) ([]P, error) {
data, err := s.rc.HGetAll(s.ctx, key).Result()
func redisLoadAll[T any, P protoMsg[T]](ctx context.Context, s *RedisStore, key string) ([]P, error) {
data, err := s.rc.HVals(s.ctx, key).Result()
if err == redis.Nil {
return nil, nil
} else if err != nil {
@@ -976,3 +976,103 @@ func redisLoadMany[T any, P interface {
return list, nil
}
func redisLoadBatch[T any, P protoMsg[T]](ctx context.Context, s *RedisStore, key string, ids []string, keepEmpty bool) ([]P, error) {
data, err := s.rc.HMGet(s.ctx, key, ids...).Result()
if err == redis.Nil {
if keepEmpty {
return make([]P, len(ids)), nil
}
return nil, nil
} else if err != nil {
return nil, err
}
if !keepEmpty {
list := make([]P, 0, len(data))
for _, v := range data {
if d, ok := v.(string); ok {
var p P = new(T)
if err = proto.Unmarshal([]byte(d), p); err != nil {
return list, err
}
list = append(list, p)
}
}
return list, nil
}
// Keep zero values where ID was not found.
list := make([]P, len(ids))
for i := range ids {
if d, ok := data[i].(string); ok {
var p P = new(T)
if err = proto.Unmarshal([]byte(d), p); err != nil {
return list, err
}
list[i] = p
}
}
return list, nil
}
func redisIDs(ctx context.Context, s *RedisStore, key string) ([]string, error) {
list, err := s.rc.HKeys(s.ctx, key).Result()
if err == redis.Nil {
return nil, nil
} else if err != nil {
return nil, err
}
slices.Sort(list)
return list, nil
}
type protoEntity[T any] interface {
protoMsg[T]
ID() string
}
func redisIterPage[T any, P protoEntity[T]](ctx context.Context, s *RedisStore, key string, page *livekit.Pagination) ([]P, error) {
if page == nil {
return redisLoadAll[T, P](ctx, s, key)
}
ids, err := redisIDs(ctx, s, key)
if err != nil {
return nil, err
}
if len(ids) == 0 {
return nil, nil
}
if page.AfterId != "" {
i, ok := slices.BinarySearch(ids, page.AfterId)
if ok {
i++
}
ids = ids[i:]
if len(ids) == 0 {
return nil, nil
}
}
limit := 1000
if page.Limit > 0 {
limit = int(page.Limit)
}
if len(ids) > limit {
ids = ids[:limit]
}
return redisLoadBatch[T, P](ctx, s, key, ids, false)
}
func sortProtos[T any, P protoEntity[T]](arr []P) {
slices.SortFunc(arr, func(a, b P) int {
return strings.Compare(a.ID(), b.ID())
})
}
func sortPage[T any, P protoEntity[T]](items []P, page *livekit.Pagination) []P {
sortProtos(items)
if page != nil {
if limit := int(page.Limit); limit > 0 && len(items) > limit {
items = items[:limit]
}
}
return items
}

View File

@@ -114,68 +114,108 @@ func (s *RedisStore) DeleteSIPTrunk(ctx context.Context, id string) error {
return err
}
func (s *RedisStore) listSIPLegacyTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error) {
return redisLoadMany[livekit.SIPTrunkInfo](ctx, s, SIPTrunkKey)
func (s *RedisStore) listSIPLegacyTrunk(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPTrunkInfo, error) {
return redisIterPage[livekit.SIPTrunkInfo](ctx, s, SIPTrunkKey, page)
}
func (s *RedisStore) listSIPInboundTrunk(ctx context.Context) ([]*livekit.SIPInboundTrunkInfo, error) {
return redisLoadMany[livekit.SIPInboundTrunkInfo](ctx, s, SIPInboundTrunkKey)
func (s *RedisStore) listSIPInboundTrunk(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPInboundTrunkInfo, error) {
return redisIterPage[livekit.SIPInboundTrunkInfo](ctx, s, SIPInboundTrunkKey, page)
}
func (s *RedisStore) listSIPOutboundTrunk(ctx context.Context) ([]*livekit.SIPOutboundTrunkInfo, error) {
return redisLoadMany[livekit.SIPOutboundTrunkInfo](ctx, s, SIPOutboundTrunkKey)
func (s *RedisStore) listSIPOutboundTrunk(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPOutboundTrunkInfo, error) {
return redisIterPage[livekit.SIPOutboundTrunkInfo](ctx, s, SIPOutboundTrunkKey, page)
}
func (s *RedisStore) ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error) {
infos, err := s.listSIPLegacyTrunk(ctx)
func (s *RedisStore) listSIPDispatchRule(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPDispatchRuleInfo, error) {
return redisIterPage[livekit.SIPDispatchRuleInfo](ctx, s, SIPDispatchRuleKey, page)
}
func (s *RedisStore) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) {
var items []*livekit.SIPTrunkInfo
old, err := s.listSIPLegacyTrunk(ctx, req.Page)
if err != nil {
return nil, err
}
in, err := s.listSIPInboundTrunk(ctx)
for _, t := range old {
v := t
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
in, err := s.listSIPInboundTrunk(ctx, req.Page)
if err != nil {
return infos, err
return nil, err
}
for _, t := range in {
infos = append(infos, t.AsTrunkInfo())
v := t.AsTrunkInfo()
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
out, err := s.listSIPOutboundTrunk(ctx)
out, err := s.listSIPOutboundTrunk(ctx, req.Page)
if err != nil {
return infos, err
return nil, err
}
for _, t := range out {
infos = append(infos, t.AsTrunkInfo())
v := t.AsTrunkInfo()
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
return infos, nil
items = sortPage(items, req.Page)
return &livekit.ListSIPTrunkResponse{Items: items}, nil
}
func (s *RedisStore) ListSIPInboundTrunk(ctx context.Context) (infos []*livekit.SIPInboundTrunkInfo, err error) {
in, err := s.listSIPInboundTrunk(ctx)
func (s *RedisStore) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) {
var items []*livekit.SIPInboundTrunkInfo
in, err := s.listSIPInboundTrunk(ctx, req.Page)
if err != nil {
return in, err
return nil, err
}
old, err := s.listSIPLegacyTrunk(ctx)
for _, t := range in {
v := t
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
old, err := s.listSIPLegacyTrunk(ctx, req.Page)
if err != nil {
return nil, err
}
for _, t := range old {
in = append(in, t.AsInbound())
v := t.AsInbound()
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
return in, nil
items = sortPage(items, req.Page)
return &livekit.ListSIPInboundTrunkResponse{Items: items}, nil
}
func (s *RedisStore) ListSIPOutboundTrunk(ctx context.Context) (infos []*livekit.SIPOutboundTrunkInfo, err error) {
out, err := s.listSIPOutboundTrunk(ctx)
func (s *RedisStore) ListSIPOutboundTrunk(ctx context.Context, req *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) {
var items []*livekit.SIPOutboundTrunkInfo
out, err := s.listSIPOutboundTrunk(ctx, req.Page)
if err != nil {
return out, err
return nil, err
}
old, err := s.listSIPLegacyTrunk(ctx)
for _, t := range out {
v := t
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
old, err := s.listSIPLegacyTrunk(ctx, req.Page)
if err != nil {
return nil, err
}
for _, t := range old {
out = append(out, t.AsOutbound())
v := t.AsOutbound()
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
return out, nil
items = sortPage(items, req.Page)
return &livekit.ListSIPOutboundTrunkResponse{Items: items}, nil
}
func (s *RedisStore) StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error {
@@ -186,10 +226,22 @@ func (s *RedisStore) LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleId
return redisLoadOne[livekit.SIPDispatchRuleInfo](ctx, s, SIPDispatchRuleKey, sipDispatchRuleId, ErrSIPDispatchRuleNotFound)
}
func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error {
return s.rc.HDel(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId).Err()
func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, sipDispatchRuleId string) error {
return s.rc.HDel(s.ctx, SIPDispatchRuleKey, sipDispatchRuleId).Err()
}
func (s *RedisStore) ListSIPDispatchRule(ctx context.Context) (infos []*livekit.SIPDispatchRuleInfo, err error) {
return redisLoadMany[livekit.SIPDispatchRuleInfo](ctx, s, SIPDispatchRuleKey)
func (s *RedisStore) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) {
var items []*livekit.SIPDispatchRuleInfo
out, err := s.listSIPDispatchRule(ctx, req.Page)
if err != nil {
return nil, err
}
for _, t := range out {
v := t
if req.Filter(v) && req.Page.Filter(v) {
items = append(items, v)
}
}
items = sortPage(items, req.Page)
return &livekit.ListSIPDispatchRuleResponse{Items: items}, nil
}

View File

@@ -16,10 +16,12 @@ package service_test
import (
"context"
"fmt"
"slices"
"strings"
"testing"
"github.com/dennwc/iters"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
@@ -31,14 +33,14 @@ import (
func TestSIPStoreDispatch(t *testing.T) {
ctx := context.Background()
rs := redisStore(t)
rs := redisStoreDocker(t)
id := guid.New(utils.SIPDispatchRulePrefix)
// No dispatch rules initially.
list, err := rs.ListSIPDispatchRule(ctx)
list, err := rs.ListSIPDispatchRule(ctx, &livekit.ListSIPDispatchRuleRequest{})
require.NoError(t, err)
require.Empty(t, list)
require.Empty(t, list.Items)
// Loading non-existent dispatch should return proper not found error.
got, err := rs.LoadSIPDispatchRule(ctx, id)
@@ -69,21 +71,21 @@ func TestSIPStoreDispatch(t *testing.T) {
require.True(t, proto.Equal(rule, got))
// Listing
list, err = rs.ListSIPDispatchRule(ctx)
list, err = rs.ListSIPDispatchRule(ctx, &livekit.ListSIPDispatchRuleRequest{})
require.NoError(t, err)
require.Len(t, list, 1)
require.True(t, proto.Equal(rule, list[0]))
require.Len(t, list.Items, 1)
require.True(t, proto.Equal(rule, list.Items[0]))
// Deletion. Should not return error if not exists.
err = rs.DeleteSIPDispatchRule(ctx, &livekit.SIPDispatchRuleInfo{SipDispatchRuleId: id})
err = rs.DeleteSIPDispatchRule(ctx, id)
require.NoError(t, err)
err = rs.DeleteSIPDispatchRule(ctx, &livekit.SIPDispatchRuleInfo{SipDispatchRuleId: id})
err = rs.DeleteSIPDispatchRule(ctx, id)
require.NoError(t, err)
// Check that it's deleted.
list, err = rs.ListSIPDispatchRule(ctx)
list, err = rs.ListSIPDispatchRule(ctx, &livekit.ListSIPDispatchRuleRequest{})
require.NoError(t, err)
require.Empty(t, list)
require.Empty(t, list.Items)
got, err = rs.LoadSIPDispatchRule(ctx, id)
require.Equal(t, service.ErrSIPDispatchRuleNotFound, err)
@@ -92,7 +94,7 @@ func TestSIPStoreDispatch(t *testing.T) {
func TestSIPStoreTrunk(t *testing.T) {
ctx := context.Background()
rs := redisStore(t)
rs := redisStoreDocker(t)
oldID := guid.New(utils.SIPTrunkPrefix)
inID := guid.New(utils.SIPTrunkPrefix)
@@ -100,25 +102,25 @@ func TestSIPStoreTrunk(t *testing.T) {
// No trunks initially. Check legacy, inbound, outbound.
// Loading non-existent trunk should return proper not found error.
oldList, err := rs.ListSIPTrunk(ctx)
oldList, err := rs.ListSIPTrunk(ctx, &livekit.ListSIPTrunkRequest{})
require.NoError(t, err)
require.Empty(t, oldList)
require.Empty(t, oldList.Items)
old, err := rs.LoadSIPTrunk(ctx, oldID)
require.Equal(t, service.ErrSIPTrunkNotFound, err)
require.Nil(t, old)
inList, err := rs.ListSIPInboundTrunk(ctx)
inList, err := rs.ListSIPInboundTrunk(ctx, &livekit.ListSIPInboundTrunkRequest{})
require.NoError(t, err)
require.Empty(t, inList)
require.Empty(t, inList.Items)
in, err := rs.LoadSIPInboundTrunk(ctx, oldID)
require.Equal(t, service.ErrSIPTrunkNotFound, err)
require.Nil(t, in)
outList, err := rs.ListSIPOutboundTrunk(ctx)
outList, err := rs.ListSIPOutboundTrunk(ctx, &livekit.ListSIPOutboundTrunkRequest{})
require.NoError(t, err)
require.Empty(t, outList)
require.Empty(t, outList.Items)
out, err := rs.LoadSIPOutboundTrunk(ctx, oldID)
require.Equal(t, service.ErrSIPTrunkNotFound, err)
@@ -187,33 +189,33 @@ func TestSIPStoreTrunk(t *testing.T) {
require.True(t, proto.Equal(oldT.AsOutbound(), outT2))
// Listing (always shows legacy + new)
listOld, err := rs.ListSIPTrunk(ctx)
listOld, err := rs.ListSIPTrunk(ctx, &livekit.ListSIPTrunkRequest{})
require.NoError(t, err)
require.Len(t, listOld, 3)
slices.SortFunc(listOld, func(a, b *livekit.SIPTrunkInfo) int {
require.Len(t, listOld.Items, 3)
slices.SortFunc(listOld.Items, func(a, b *livekit.SIPTrunkInfo) int {
return strings.Compare(a.Name, b.Name)
})
require.True(t, proto.Equal(inT.AsTrunkInfo(), listOld[0]))
require.True(t, proto.Equal(oldT, listOld[1]))
require.True(t, proto.Equal(outT.AsTrunkInfo(), listOld[2]))
require.True(t, proto.Equal(inT.AsTrunkInfo(), listOld.Items[0]))
require.True(t, proto.Equal(oldT, listOld.Items[1]))
require.True(t, proto.Equal(outT.AsTrunkInfo(), listOld.Items[2]))
listIn, err := rs.ListSIPInboundTrunk(ctx)
listIn, err := rs.ListSIPInboundTrunk(ctx, &livekit.ListSIPInboundTrunkRequest{})
require.NoError(t, err)
require.Len(t, listIn, 2)
slices.SortFunc(listIn, func(a, b *livekit.SIPInboundTrunkInfo) int {
require.Len(t, listIn.Items, 2)
slices.SortFunc(listIn.Items, func(a, b *livekit.SIPInboundTrunkInfo) int {
return strings.Compare(a.Name, b.Name)
})
require.True(t, proto.Equal(inT, listIn[0]))
require.True(t, proto.Equal(oldT.AsInbound(), listIn[1]))
require.True(t, proto.Equal(inT, listIn.Items[0]))
require.True(t, proto.Equal(oldT.AsInbound(), listIn.Items[1]))
listOut, err := rs.ListSIPOutboundTrunk(ctx)
listOut, err := rs.ListSIPOutboundTrunk(ctx, &livekit.ListSIPOutboundTrunkRequest{})
require.NoError(t, err)
require.Len(t, listOut, 2)
slices.SortFunc(listOut, func(a, b *livekit.SIPOutboundTrunkInfo) int {
require.Len(t, listOut.Items, 2)
slices.SortFunc(listOut.Items, func(a, b *livekit.SIPOutboundTrunkInfo) int {
return strings.Compare(a.Name, b.Name)
})
require.True(t, proto.Equal(oldT.AsOutbound(), listOut[0]))
require.True(t, proto.Equal(outT, listOut[1]))
require.True(t, proto.Equal(oldT.AsOutbound(), listOut.Items[0]))
require.True(t, proto.Equal(outT, listOut.Items[1]))
// Deletion. Should not return error if not exists.
err = rs.DeleteSIPTrunk(ctx, oldID)
@@ -237,17 +239,17 @@ func TestSIPStoreTrunk(t *testing.T) {
require.NoError(t, err)
// Check everything is deleted.
oldList, err = rs.ListSIPTrunk(ctx)
oldList, err = rs.ListSIPTrunk(ctx, &livekit.ListSIPTrunkRequest{})
require.NoError(t, err)
require.Empty(t, oldList)
require.Empty(t, oldList.Items)
inList, err = rs.ListSIPInboundTrunk(ctx)
inList, err = rs.ListSIPInboundTrunk(ctx, &livekit.ListSIPInboundTrunkRequest{})
require.NoError(t, err)
require.Empty(t, inList)
require.Empty(t, inList.Items)
outList, err = rs.ListSIPOutboundTrunk(ctx)
outList, err = rs.ListSIPOutboundTrunk(ctx, &livekit.ListSIPOutboundTrunkRequest{})
require.NoError(t, err)
require.Empty(t, outList)
require.Empty(t, outList.Items)
old, err = rs.LoadSIPTrunk(ctx, oldID)
require.Equal(t, service.ErrSIPTrunkNotFound, err)
@@ -261,3 +263,95 @@ func TestSIPStoreTrunk(t *testing.T) {
require.Equal(t, service.ErrSIPTrunkNotFound, err)
require.Nil(t, out)
}
func TestSIPTrunkList(t *testing.T) {
s := redisStoreDocker(t)
testIter(t, func(ctx context.Context, id string) error {
if strings.HasSuffix(id, "0") {
return s.StoreSIPTrunk(ctx, &livekit.SIPTrunkInfo{
SipTrunkId: id,
OutboundNumber: id,
})
}
return s.StoreSIPInboundTrunk(ctx, &livekit.SIPInboundTrunkInfo{
SipTrunkId: id,
Numbers: []string{id},
})
}, func(ctx context.Context, page *livekit.Pagination, ids []string) iters.PageIter[*livekit.SIPInboundTrunkInfo] {
return livekit.ListPageIter(s.ListSIPInboundTrunk, &livekit.ListSIPInboundTrunkRequest{
TrunkIds: ids, Page: page,
})
})
}
func TestSIPRuleList(t *testing.T) {
s := redisStoreDocker(t)
testIter(t, func(ctx context.Context, id string) error {
return s.StoreSIPDispatchRule(ctx, &livekit.SIPDispatchRuleInfo{
SipDispatchRuleId: id,
TrunkIds: []string{id},
})
}, func(ctx context.Context, page *livekit.Pagination, ids []string) iters.PageIter[*livekit.SIPDispatchRuleInfo] {
return livekit.ListPageIter(s.ListSIPDispatchRule, &livekit.ListSIPDispatchRuleRequest{
DispatchRuleIds: ids, Page: page,
})
})
}
type listItem interface {
ID() string
}
func allIDs[T listItem](t testing.TB, it iters.PageIter[T]) []string {
defer it.Close()
got, err := iters.AllPages(context.Background(), iters.MapPage(it, func(ctx context.Context, v T) (string, error) {
return v.ID(), nil
}))
require.NoError(t, err)
return got
}
func testIter[T listItem](
t *testing.T,
create func(ctx context.Context, id string) error,
list func(ctx context.Context, page *livekit.Pagination, ids []string) iters.PageIter[T],
) {
ctx := context.Background()
var all []string
for i := 0; i < 250; i++ {
id := fmt.Sprintf("%05d", i)
all = append(all, id)
err := create(ctx, id)
require.NoError(t, err)
}
// List everything with pagination disabled (legacy)
it := list(ctx, nil, nil)
got := allIDs(t, it)
require.Equal(t, all, got)
// List with pagination enabled
it = list(ctx, &livekit.Pagination{Limit: 10}, nil)
got = allIDs(t, it)
require.Equal(t, all, got)
// List with pagination enabled, custom ID
it = list(ctx, &livekit.Pagination{Limit: 10, AfterId: all[55]}, nil)
got = allIDs(t, it)
require.Equal(t, all[56:], got)
// List fixed IDs
it = list(ctx, &livekit.Pagination{Limit: 10, AfterId: all[5]}, []string{
all[10],
all[3],
"invalid",
all[8],
})
got = allIDs(t, it)
require.Equal(t, []string{
all[8],
all[10],
}, got)
}

View File

@@ -32,6 +32,10 @@ import (
"github.com/livekit/livekit-server/pkg/service"
)
func redisStoreDocker(t testing.TB) *service.RedisStore {
return service.NewRedisStore(redisClientDocker(t))
}
func redisStore(t testing.TB) *service.RedisStore {
return service.NewRedisStore(redisClient(t))
}

View File

@@ -10,11 +10,11 @@ import (
)
type FakeSIPStore struct {
DeleteSIPDispatchRuleStub func(context.Context, *livekit.SIPDispatchRuleInfo) error
DeleteSIPDispatchRuleStub func(context.Context, string) error
deleteSIPDispatchRuleMutex sync.RWMutex
deleteSIPDispatchRuleArgsForCall []struct {
arg1 context.Context
arg2 *livekit.SIPDispatchRuleInfo
arg2 string
}
deleteSIPDispatchRuleReturns struct {
result1 error
@@ -34,56 +34,60 @@ type FakeSIPStore struct {
deleteSIPTrunkReturnsOnCall map[int]struct {
result1 error
}
ListSIPDispatchRuleStub func(context.Context) ([]*livekit.SIPDispatchRuleInfo, error)
ListSIPDispatchRuleStub func(context.Context, *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error)
listSIPDispatchRuleMutex sync.RWMutex
listSIPDispatchRuleArgsForCall []struct {
arg1 context.Context
arg2 *livekit.ListSIPDispatchRuleRequest
}
listSIPDispatchRuleReturns struct {
result1 []*livekit.SIPDispatchRuleInfo
result1 *livekit.ListSIPDispatchRuleResponse
result2 error
}
listSIPDispatchRuleReturnsOnCall map[int]struct {
result1 []*livekit.SIPDispatchRuleInfo
result1 *livekit.ListSIPDispatchRuleResponse
result2 error
}
ListSIPInboundTrunkStub func(context.Context) ([]*livekit.SIPInboundTrunkInfo, error)
ListSIPInboundTrunkStub func(context.Context, *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error)
listSIPInboundTrunkMutex sync.RWMutex
listSIPInboundTrunkArgsForCall []struct {
arg1 context.Context
arg2 *livekit.ListSIPInboundTrunkRequest
}
listSIPInboundTrunkReturns struct {
result1 []*livekit.SIPInboundTrunkInfo
result1 *livekit.ListSIPInboundTrunkResponse
result2 error
}
listSIPInboundTrunkReturnsOnCall map[int]struct {
result1 []*livekit.SIPInboundTrunkInfo
result1 *livekit.ListSIPInboundTrunkResponse
result2 error
}
ListSIPOutboundTrunkStub func(context.Context) ([]*livekit.SIPOutboundTrunkInfo, error)
ListSIPOutboundTrunkStub func(context.Context, *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error)
listSIPOutboundTrunkMutex sync.RWMutex
listSIPOutboundTrunkArgsForCall []struct {
arg1 context.Context
arg2 *livekit.ListSIPOutboundTrunkRequest
}
listSIPOutboundTrunkReturns struct {
result1 []*livekit.SIPOutboundTrunkInfo
result1 *livekit.ListSIPOutboundTrunkResponse
result2 error
}
listSIPOutboundTrunkReturnsOnCall map[int]struct {
result1 []*livekit.SIPOutboundTrunkInfo
result1 *livekit.ListSIPOutboundTrunkResponse
result2 error
}
ListSIPTrunkStub func(context.Context) ([]*livekit.SIPTrunkInfo, error)
ListSIPTrunkStub func(context.Context, *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error)
listSIPTrunkMutex sync.RWMutex
listSIPTrunkArgsForCall []struct {
arg1 context.Context
arg2 *livekit.ListSIPTrunkRequest
}
listSIPTrunkReturns struct {
result1 []*livekit.SIPTrunkInfo
result1 *livekit.ListSIPTrunkResponse
result2 error
}
listSIPTrunkReturnsOnCall map[int]struct {
result1 []*livekit.SIPTrunkInfo
result1 *livekit.ListSIPTrunkResponse
result2 error
}
LoadSIPDispatchRuleStub func(context.Context, string) (*livekit.SIPDispatchRuleInfo, error)
@@ -142,6 +146,34 @@ type FakeSIPStore struct {
result1 *livekit.SIPTrunkInfo
result2 error
}
SelectSIPDispatchRuleStub func(context.Context, string) ([]*livekit.SIPDispatchRuleInfo, error)
selectSIPDispatchRuleMutex sync.RWMutex
selectSIPDispatchRuleArgsForCall []struct {
arg1 context.Context
arg2 string
}
selectSIPDispatchRuleReturns struct {
result1 []*livekit.SIPDispatchRuleInfo
result2 error
}
selectSIPDispatchRuleReturnsOnCall map[int]struct {
result1 []*livekit.SIPDispatchRuleInfo
result2 error
}
SelectSIPInboundTrunkStub func(context.Context, string) ([]*livekit.SIPInboundTrunkInfo, error)
selectSIPInboundTrunkMutex sync.RWMutex
selectSIPInboundTrunkArgsForCall []struct {
arg1 context.Context
arg2 string
}
selectSIPInboundTrunkReturns struct {
result1 []*livekit.SIPInboundTrunkInfo
result2 error
}
selectSIPInboundTrunkReturnsOnCall map[int]struct {
result1 []*livekit.SIPInboundTrunkInfo
result2 error
}
StoreSIPDispatchRuleStub func(context.Context, *livekit.SIPDispatchRuleInfo) error
storeSIPDispatchRuleMutex sync.RWMutex
storeSIPDispatchRuleArgsForCall []struct {
@@ -194,12 +226,12 @@ type FakeSIPStore struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeSIPStore) DeleteSIPDispatchRule(arg1 context.Context, arg2 *livekit.SIPDispatchRuleInfo) error {
func (fake *FakeSIPStore) DeleteSIPDispatchRule(arg1 context.Context, arg2 string) error {
fake.deleteSIPDispatchRuleMutex.Lock()
ret, specificReturn := fake.deleteSIPDispatchRuleReturnsOnCall[len(fake.deleteSIPDispatchRuleArgsForCall)]
fake.deleteSIPDispatchRuleArgsForCall = append(fake.deleteSIPDispatchRuleArgsForCall, struct {
arg1 context.Context
arg2 *livekit.SIPDispatchRuleInfo
arg2 string
}{arg1, arg2})
stub := fake.DeleteSIPDispatchRuleStub
fakeReturns := fake.deleteSIPDispatchRuleReturns
@@ -220,13 +252,13 @@ func (fake *FakeSIPStore) DeleteSIPDispatchRuleCallCount() int {
return len(fake.deleteSIPDispatchRuleArgsForCall)
}
func (fake *FakeSIPStore) DeleteSIPDispatchRuleCalls(stub func(context.Context, *livekit.SIPDispatchRuleInfo) error) {
func (fake *FakeSIPStore) DeleteSIPDispatchRuleCalls(stub func(context.Context, string) error) {
fake.deleteSIPDispatchRuleMutex.Lock()
defer fake.deleteSIPDispatchRuleMutex.Unlock()
fake.DeleteSIPDispatchRuleStub = stub
}
func (fake *FakeSIPStore) DeleteSIPDispatchRuleArgsForCall(i int) (context.Context, *livekit.SIPDispatchRuleInfo) {
func (fake *FakeSIPStore) DeleteSIPDispatchRuleArgsForCall(i int) (context.Context, string) {
fake.deleteSIPDispatchRuleMutex.RLock()
defer fake.deleteSIPDispatchRuleMutex.RUnlock()
argsForCall := fake.deleteSIPDispatchRuleArgsForCall[i]
@@ -318,18 +350,19 @@ func (fake *FakeSIPStore) DeleteSIPTrunkReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeSIPStore) ListSIPDispatchRule(arg1 context.Context) ([]*livekit.SIPDispatchRuleInfo, error) {
func (fake *FakeSIPStore) ListSIPDispatchRule(arg1 context.Context, arg2 *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) {
fake.listSIPDispatchRuleMutex.Lock()
ret, specificReturn := fake.listSIPDispatchRuleReturnsOnCall[len(fake.listSIPDispatchRuleArgsForCall)]
fake.listSIPDispatchRuleArgsForCall = append(fake.listSIPDispatchRuleArgsForCall, struct {
arg1 context.Context
}{arg1})
arg2 *livekit.ListSIPDispatchRuleRequest
}{arg1, arg2})
stub := fake.ListSIPDispatchRuleStub
fakeReturns := fake.listSIPDispatchRuleReturns
fake.recordInvocation("ListSIPDispatchRule", []interface{}{arg1})
fake.recordInvocation("ListSIPDispatchRule", []interface{}{arg1, arg2})
fake.listSIPDispatchRuleMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@@ -343,57 +376,58 @@ func (fake *FakeSIPStore) ListSIPDispatchRuleCallCount() int {
return len(fake.listSIPDispatchRuleArgsForCall)
}
func (fake *FakeSIPStore) ListSIPDispatchRuleCalls(stub func(context.Context) ([]*livekit.SIPDispatchRuleInfo, error)) {
func (fake *FakeSIPStore) ListSIPDispatchRuleCalls(stub func(context.Context, *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error)) {
fake.listSIPDispatchRuleMutex.Lock()
defer fake.listSIPDispatchRuleMutex.Unlock()
fake.ListSIPDispatchRuleStub = stub
}
func (fake *FakeSIPStore) ListSIPDispatchRuleArgsForCall(i int) context.Context {
func (fake *FakeSIPStore) ListSIPDispatchRuleArgsForCall(i int) (context.Context, *livekit.ListSIPDispatchRuleRequest) {
fake.listSIPDispatchRuleMutex.RLock()
defer fake.listSIPDispatchRuleMutex.RUnlock()
argsForCall := fake.listSIPDispatchRuleArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) ListSIPDispatchRuleReturns(result1 []*livekit.SIPDispatchRuleInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPDispatchRuleReturns(result1 *livekit.ListSIPDispatchRuleResponse, result2 error) {
fake.listSIPDispatchRuleMutex.Lock()
defer fake.listSIPDispatchRuleMutex.Unlock()
fake.ListSIPDispatchRuleStub = nil
fake.listSIPDispatchRuleReturns = struct {
result1 []*livekit.SIPDispatchRuleInfo
result1 *livekit.ListSIPDispatchRuleResponse
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPDispatchRuleReturnsOnCall(i int, result1 []*livekit.SIPDispatchRuleInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPDispatchRuleReturnsOnCall(i int, result1 *livekit.ListSIPDispatchRuleResponse, result2 error) {
fake.listSIPDispatchRuleMutex.Lock()
defer fake.listSIPDispatchRuleMutex.Unlock()
fake.ListSIPDispatchRuleStub = nil
if fake.listSIPDispatchRuleReturnsOnCall == nil {
fake.listSIPDispatchRuleReturnsOnCall = make(map[int]struct {
result1 []*livekit.SIPDispatchRuleInfo
result1 *livekit.ListSIPDispatchRuleResponse
result2 error
})
}
fake.listSIPDispatchRuleReturnsOnCall[i] = struct {
result1 []*livekit.SIPDispatchRuleInfo
result1 *livekit.ListSIPDispatchRuleResponse
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPInboundTrunk(arg1 context.Context) ([]*livekit.SIPInboundTrunkInfo, error) {
func (fake *FakeSIPStore) ListSIPInboundTrunk(arg1 context.Context, arg2 *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) {
fake.listSIPInboundTrunkMutex.Lock()
ret, specificReturn := fake.listSIPInboundTrunkReturnsOnCall[len(fake.listSIPInboundTrunkArgsForCall)]
fake.listSIPInboundTrunkArgsForCall = append(fake.listSIPInboundTrunkArgsForCall, struct {
arg1 context.Context
}{arg1})
arg2 *livekit.ListSIPInboundTrunkRequest
}{arg1, arg2})
stub := fake.ListSIPInboundTrunkStub
fakeReturns := fake.listSIPInboundTrunkReturns
fake.recordInvocation("ListSIPInboundTrunk", []interface{}{arg1})
fake.recordInvocation("ListSIPInboundTrunk", []interface{}{arg1, arg2})
fake.listSIPInboundTrunkMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@@ -407,57 +441,58 @@ func (fake *FakeSIPStore) ListSIPInboundTrunkCallCount() int {
return len(fake.listSIPInboundTrunkArgsForCall)
}
func (fake *FakeSIPStore) ListSIPInboundTrunkCalls(stub func(context.Context) ([]*livekit.SIPInboundTrunkInfo, error)) {
func (fake *FakeSIPStore) ListSIPInboundTrunkCalls(stub func(context.Context, *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error)) {
fake.listSIPInboundTrunkMutex.Lock()
defer fake.listSIPInboundTrunkMutex.Unlock()
fake.ListSIPInboundTrunkStub = stub
}
func (fake *FakeSIPStore) ListSIPInboundTrunkArgsForCall(i int) context.Context {
func (fake *FakeSIPStore) ListSIPInboundTrunkArgsForCall(i int) (context.Context, *livekit.ListSIPInboundTrunkRequest) {
fake.listSIPInboundTrunkMutex.RLock()
defer fake.listSIPInboundTrunkMutex.RUnlock()
argsForCall := fake.listSIPInboundTrunkArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) ListSIPInboundTrunkReturns(result1 []*livekit.SIPInboundTrunkInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPInboundTrunkReturns(result1 *livekit.ListSIPInboundTrunkResponse, result2 error) {
fake.listSIPInboundTrunkMutex.Lock()
defer fake.listSIPInboundTrunkMutex.Unlock()
fake.ListSIPInboundTrunkStub = nil
fake.listSIPInboundTrunkReturns = struct {
result1 []*livekit.SIPInboundTrunkInfo
result1 *livekit.ListSIPInboundTrunkResponse
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPInboundTrunkReturnsOnCall(i int, result1 []*livekit.SIPInboundTrunkInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPInboundTrunkReturnsOnCall(i int, result1 *livekit.ListSIPInboundTrunkResponse, result2 error) {
fake.listSIPInboundTrunkMutex.Lock()
defer fake.listSIPInboundTrunkMutex.Unlock()
fake.ListSIPInboundTrunkStub = nil
if fake.listSIPInboundTrunkReturnsOnCall == nil {
fake.listSIPInboundTrunkReturnsOnCall = make(map[int]struct {
result1 []*livekit.SIPInboundTrunkInfo
result1 *livekit.ListSIPInboundTrunkResponse
result2 error
})
}
fake.listSIPInboundTrunkReturnsOnCall[i] = struct {
result1 []*livekit.SIPInboundTrunkInfo
result1 *livekit.ListSIPInboundTrunkResponse
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPOutboundTrunk(arg1 context.Context) ([]*livekit.SIPOutboundTrunkInfo, error) {
func (fake *FakeSIPStore) ListSIPOutboundTrunk(arg1 context.Context, arg2 *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) {
fake.listSIPOutboundTrunkMutex.Lock()
ret, specificReturn := fake.listSIPOutboundTrunkReturnsOnCall[len(fake.listSIPOutboundTrunkArgsForCall)]
fake.listSIPOutboundTrunkArgsForCall = append(fake.listSIPOutboundTrunkArgsForCall, struct {
arg1 context.Context
}{arg1})
arg2 *livekit.ListSIPOutboundTrunkRequest
}{arg1, arg2})
stub := fake.ListSIPOutboundTrunkStub
fakeReturns := fake.listSIPOutboundTrunkReturns
fake.recordInvocation("ListSIPOutboundTrunk", []interface{}{arg1})
fake.recordInvocation("ListSIPOutboundTrunk", []interface{}{arg1, arg2})
fake.listSIPOutboundTrunkMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@@ -471,57 +506,58 @@ func (fake *FakeSIPStore) ListSIPOutboundTrunkCallCount() int {
return len(fake.listSIPOutboundTrunkArgsForCall)
}
func (fake *FakeSIPStore) ListSIPOutboundTrunkCalls(stub func(context.Context) ([]*livekit.SIPOutboundTrunkInfo, error)) {
func (fake *FakeSIPStore) ListSIPOutboundTrunkCalls(stub func(context.Context, *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error)) {
fake.listSIPOutboundTrunkMutex.Lock()
defer fake.listSIPOutboundTrunkMutex.Unlock()
fake.ListSIPOutboundTrunkStub = stub
}
func (fake *FakeSIPStore) ListSIPOutboundTrunkArgsForCall(i int) context.Context {
func (fake *FakeSIPStore) ListSIPOutboundTrunkArgsForCall(i int) (context.Context, *livekit.ListSIPOutboundTrunkRequest) {
fake.listSIPOutboundTrunkMutex.RLock()
defer fake.listSIPOutboundTrunkMutex.RUnlock()
argsForCall := fake.listSIPOutboundTrunkArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) ListSIPOutboundTrunkReturns(result1 []*livekit.SIPOutboundTrunkInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPOutboundTrunkReturns(result1 *livekit.ListSIPOutboundTrunkResponse, result2 error) {
fake.listSIPOutboundTrunkMutex.Lock()
defer fake.listSIPOutboundTrunkMutex.Unlock()
fake.ListSIPOutboundTrunkStub = nil
fake.listSIPOutboundTrunkReturns = struct {
result1 []*livekit.SIPOutboundTrunkInfo
result1 *livekit.ListSIPOutboundTrunkResponse
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPOutboundTrunkReturnsOnCall(i int, result1 []*livekit.SIPOutboundTrunkInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPOutboundTrunkReturnsOnCall(i int, result1 *livekit.ListSIPOutboundTrunkResponse, result2 error) {
fake.listSIPOutboundTrunkMutex.Lock()
defer fake.listSIPOutboundTrunkMutex.Unlock()
fake.ListSIPOutboundTrunkStub = nil
if fake.listSIPOutboundTrunkReturnsOnCall == nil {
fake.listSIPOutboundTrunkReturnsOnCall = make(map[int]struct {
result1 []*livekit.SIPOutboundTrunkInfo
result1 *livekit.ListSIPOutboundTrunkResponse
result2 error
})
}
fake.listSIPOutboundTrunkReturnsOnCall[i] = struct {
result1 []*livekit.SIPOutboundTrunkInfo
result1 *livekit.ListSIPOutboundTrunkResponse
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPTrunk(arg1 context.Context) ([]*livekit.SIPTrunkInfo, error) {
func (fake *FakeSIPStore) ListSIPTrunk(arg1 context.Context, arg2 *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) {
fake.listSIPTrunkMutex.Lock()
ret, specificReturn := fake.listSIPTrunkReturnsOnCall[len(fake.listSIPTrunkArgsForCall)]
fake.listSIPTrunkArgsForCall = append(fake.listSIPTrunkArgsForCall, struct {
arg1 context.Context
}{arg1})
arg2 *livekit.ListSIPTrunkRequest
}{arg1, arg2})
stub := fake.ListSIPTrunkStub
fakeReturns := fake.listSIPTrunkReturns
fake.recordInvocation("ListSIPTrunk", []interface{}{arg1})
fake.recordInvocation("ListSIPTrunk", []interface{}{arg1, arg2})
fake.listSIPTrunkMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@@ -535,41 +571,41 @@ func (fake *FakeSIPStore) ListSIPTrunkCallCount() int {
return len(fake.listSIPTrunkArgsForCall)
}
func (fake *FakeSIPStore) ListSIPTrunkCalls(stub func(context.Context) ([]*livekit.SIPTrunkInfo, error)) {
func (fake *FakeSIPStore) ListSIPTrunkCalls(stub func(context.Context, *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error)) {
fake.listSIPTrunkMutex.Lock()
defer fake.listSIPTrunkMutex.Unlock()
fake.ListSIPTrunkStub = stub
}
func (fake *FakeSIPStore) ListSIPTrunkArgsForCall(i int) context.Context {
func (fake *FakeSIPStore) ListSIPTrunkArgsForCall(i int) (context.Context, *livekit.ListSIPTrunkRequest) {
fake.listSIPTrunkMutex.RLock()
defer fake.listSIPTrunkMutex.RUnlock()
argsForCall := fake.listSIPTrunkArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) ListSIPTrunkReturns(result1 []*livekit.SIPTrunkInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPTrunkReturns(result1 *livekit.ListSIPTrunkResponse, result2 error) {
fake.listSIPTrunkMutex.Lock()
defer fake.listSIPTrunkMutex.Unlock()
fake.ListSIPTrunkStub = nil
fake.listSIPTrunkReturns = struct {
result1 []*livekit.SIPTrunkInfo
result1 *livekit.ListSIPTrunkResponse
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) ListSIPTrunkReturnsOnCall(i int, result1 []*livekit.SIPTrunkInfo, result2 error) {
func (fake *FakeSIPStore) ListSIPTrunkReturnsOnCall(i int, result1 *livekit.ListSIPTrunkResponse, result2 error) {
fake.listSIPTrunkMutex.Lock()
defer fake.listSIPTrunkMutex.Unlock()
fake.ListSIPTrunkStub = nil
if fake.listSIPTrunkReturnsOnCall == nil {
fake.listSIPTrunkReturnsOnCall = make(map[int]struct {
result1 []*livekit.SIPTrunkInfo
result1 *livekit.ListSIPTrunkResponse
result2 error
})
}
fake.listSIPTrunkReturnsOnCall[i] = struct {
result1 []*livekit.SIPTrunkInfo
result1 *livekit.ListSIPTrunkResponse
result2 error
}{result1, result2}
}
@@ -834,6 +870,136 @@ func (fake *FakeSIPStore) LoadSIPTrunkReturnsOnCall(i int, result1 *livekit.SIPT
}{result1, result2}
}
func (fake *FakeSIPStore) SelectSIPDispatchRule(arg1 context.Context, arg2 string) ([]*livekit.SIPDispatchRuleInfo, error) {
fake.selectSIPDispatchRuleMutex.Lock()
ret, specificReturn := fake.selectSIPDispatchRuleReturnsOnCall[len(fake.selectSIPDispatchRuleArgsForCall)]
fake.selectSIPDispatchRuleArgsForCall = append(fake.selectSIPDispatchRuleArgsForCall, struct {
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.SelectSIPDispatchRuleStub
fakeReturns := fake.selectSIPDispatchRuleReturns
fake.recordInvocation("SelectSIPDispatchRule", []interface{}{arg1, arg2})
fake.selectSIPDispatchRuleMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeSIPStore) SelectSIPDispatchRuleCallCount() int {
fake.selectSIPDispatchRuleMutex.RLock()
defer fake.selectSIPDispatchRuleMutex.RUnlock()
return len(fake.selectSIPDispatchRuleArgsForCall)
}
func (fake *FakeSIPStore) SelectSIPDispatchRuleCalls(stub func(context.Context, string) ([]*livekit.SIPDispatchRuleInfo, error)) {
fake.selectSIPDispatchRuleMutex.Lock()
defer fake.selectSIPDispatchRuleMutex.Unlock()
fake.SelectSIPDispatchRuleStub = stub
}
func (fake *FakeSIPStore) SelectSIPDispatchRuleArgsForCall(i int) (context.Context, string) {
fake.selectSIPDispatchRuleMutex.RLock()
defer fake.selectSIPDispatchRuleMutex.RUnlock()
argsForCall := fake.selectSIPDispatchRuleArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) SelectSIPDispatchRuleReturns(result1 []*livekit.SIPDispatchRuleInfo, result2 error) {
fake.selectSIPDispatchRuleMutex.Lock()
defer fake.selectSIPDispatchRuleMutex.Unlock()
fake.SelectSIPDispatchRuleStub = nil
fake.selectSIPDispatchRuleReturns = struct {
result1 []*livekit.SIPDispatchRuleInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) SelectSIPDispatchRuleReturnsOnCall(i int, result1 []*livekit.SIPDispatchRuleInfo, result2 error) {
fake.selectSIPDispatchRuleMutex.Lock()
defer fake.selectSIPDispatchRuleMutex.Unlock()
fake.SelectSIPDispatchRuleStub = nil
if fake.selectSIPDispatchRuleReturnsOnCall == nil {
fake.selectSIPDispatchRuleReturnsOnCall = make(map[int]struct {
result1 []*livekit.SIPDispatchRuleInfo
result2 error
})
}
fake.selectSIPDispatchRuleReturnsOnCall[i] = struct {
result1 []*livekit.SIPDispatchRuleInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) SelectSIPInboundTrunk(arg1 context.Context, arg2 string) ([]*livekit.SIPInboundTrunkInfo, error) {
fake.selectSIPInboundTrunkMutex.Lock()
ret, specificReturn := fake.selectSIPInboundTrunkReturnsOnCall[len(fake.selectSIPInboundTrunkArgsForCall)]
fake.selectSIPInboundTrunkArgsForCall = append(fake.selectSIPInboundTrunkArgsForCall, struct {
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.SelectSIPInboundTrunkStub
fakeReturns := fake.selectSIPInboundTrunkReturns
fake.recordInvocation("SelectSIPInboundTrunk", []interface{}{arg1, arg2})
fake.selectSIPInboundTrunkMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeSIPStore) SelectSIPInboundTrunkCallCount() int {
fake.selectSIPInboundTrunkMutex.RLock()
defer fake.selectSIPInboundTrunkMutex.RUnlock()
return len(fake.selectSIPInboundTrunkArgsForCall)
}
func (fake *FakeSIPStore) SelectSIPInboundTrunkCalls(stub func(context.Context, string) ([]*livekit.SIPInboundTrunkInfo, error)) {
fake.selectSIPInboundTrunkMutex.Lock()
defer fake.selectSIPInboundTrunkMutex.Unlock()
fake.SelectSIPInboundTrunkStub = stub
}
func (fake *FakeSIPStore) SelectSIPInboundTrunkArgsForCall(i int) (context.Context, string) {
fake.selectSIPInboundTrunkMutex.RLock()
defer fake.selectSIPInboundTrunkMutex.RUnlock()
argsForCall := fake.selectSIPInboundTrunkArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeSIPStore) SelectSIPInboundTrunkReturns(result1 []*livekit.SIPInboundTrunkInfo, result2 error) {
fake.selectSIPInboundTrunkMutex.Lock()
defer fake.selectSIPInboundTrunkMutex.Unlock()
fake.SelectSIPInboundTrunkStub = nil
fake.selectSIPInboundTrunkReturns = struct {
result1 []*livekit.SIPInboundTrunkInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) SelectSIPInboundTrunkReturnsOnCall(i int, result1 []*livekit.SIPInboundTrunkInfo, result2 error) {
fake.selectSIPInboundTrunkMutex.Lock()
defer fake.selectSIPInboundTrunkMutex.Unlock()
fake.SelectSIPInboundTrunkStub = nil
if fake.selectSIPInboundTrunkReturnsOnCall == nil {
fake.selectSIPInboundTrunkReturnsOnCall = make(map[int]struct {
result1 []*livekit.SIPInboundTrunkInfo
result2 error
})
}
fake.selectSIPInboundTrunkReturnsOnCall[i] = struct {
result1 []*livekit.SIPInboundTrunkInfo
result2 error
}{result1, result2}
}
func (fake *FakeSIPStore) StoreSIPDispatchRule(arg1 context.Context, arg2 *livekit.SIPDispatchRuleInfo) error {
fake.storeSIPDispatchRuleMutex.Lock()
ret, specificReturn := fake.storeSIPDispatchRuleReturnsOnCall[len(fake.storeSIPDispatchRuleArgsForCall)]
@@ -1105,6 +1271,10 @@ func (fake *FakeSIPStore) Invocations() map[string][][]interface{} {
defer fake.loadSIPOutboundTrunkMutex.RUnlock()
fake.loadSIPTrunkMutex.RLock()
defer fake.loadSIPTrunkMutex.RUnlock()
fake.selectSIPDispatchRuleMutex.RLock()
defer fake.selectSIPDispatchRuleMutex.RUnlock()
fake.selectSIPInboundTrunkMutex.RLock()
defer fake.selectSIPInboundTrunkMutex.RUnlock()
fake.storeSIPDispatchRuleMutex.RLock()
defer fake.storeSIPDispatchRuleMutex.RUnlock()
fake.storeSIPInboundTrunkMutex.RLock()

View File

@@ -16,9 +16,9 @@ package service
import (
"context"
"errors"
"time"
"github.com/dennwc/iters"
"github.com/twitchtv/twirp"
"google.golang.org/protobuf/types/known/emptypb"
@@ -88,12 +88,12 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT
}
// Validate all trunks including the new one first.
list, err := s.store.ListSIPInboundTrunk(ctx)
it, err := ListSIPInboundTrunk(ctx, s.store, &livekit.ListSIPInboundTrunkRequest{}, info.AsInbound())
if err != nil {
return nil, err
}
list = append(list, info.AsInbound())
if err = sip.ValidateTrunks(list); err != nil {
defer it.Close()
if err = sip.ValidateTrunksIter(it); err != nil {
return nil, err
}
@@ -125,12 +125,14 @@ func (s *SIPService) CreateSIPInboundTrunk(ctx context.Context, req *livekit.Cre
// Keep ID empty still, so that validation can print "<new>" instead of a non-existent ID in the error.
// Validate all trunks including the new one first.
list, err := s.store.ListSIPInboundTrunk(ctx)
it, err := ListSIPInboundTrunk(ctx, s.store, &livekit.ListSIPInboundTrunkRequest{
Numbers: req.GetTrunk().GetNumbers(),
}, info)
if err != nil {
return nil, err
}
list = append(list, info)
if err = sip.ValidateTrunks(list); err != nil {
defer it.Close()
if err = sip.ValidateTrunksIter(it); err != nil {
return nil, err
}
@@ -215,13 +217,38 @@ func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunk
if s.store == nil {
return nil, ErrSIPNotConnected
}
it := livekit.ListPageIter(s.store.ListSIPTrunk, req)
defer it.Close()
trunks, err := s.store.ListSIPTrunk(ctx)
items, err := iters.AllPages(ctx, it)
if err != nil {
return nil, err
}
return &livekit.ListSIPTrunkResponse{Items: items}, nil
}
return &livekit.ListSIPTrunkResponse{Items: trunks}, nil
func ListSIPInboundTrunk(ctx context.Context, s SIPStore, req *livekit.ListSIPInboundTrunkRequest, add ...*livekit.SIPInboundTrunkInfo) (iters.Iter[*livekit.SIPInboundTrunkInfo], error) {
if s == nil {
return nil, ErrSIPNotConnected
}
pages := livekit.ListPageIter(s.ListSIPInboundTrunk, req)
it := iters.PagesAsIter(ctx, pages)
if len(add) != 0 {
it = iters.MultiIter(true, it, iters.Slice(add))
}
return it, nil
}
func ListSIPOutboundTrunk(ctx context.Context, s SIPStore, req *livekit.ListSIPOutboundTrunkRequest, add ...*livekit.SIPOutboundTrunkInfo) (iters.Iter[*livekit.SIPOutboundTrunkInfo], error) {
if s == nil {
return nil, ErrSIPNotConnected
}
pages := livekit.ListPageIter(s.ListSIPOutboundTrunk, req)
it := iters.PagesAsIter(ctx, pages)
if len(add) != 0 {
it = iters.MultiIter(true, it, iters.Slice(add))
}
return it, nil
}
func (s *SIPService) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) {
@@ -231,29 +258,17 @@ func (s *SIPService) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListS
if s.store == nil {
return nil, ErrSIPNotConnected
}
var trunks []*livekit.SIPInboundTrunkInfo
if len(req.TrunkIds) != 0 {
trunks = make([]*livekit.SIPInboundTrunkInfo, len(req.TrunkIds))
for i, id := range req.TrunkIds {
t, err := s.store.LoadSIPInboundTrunk(ctx, id)
if errors.Is(err, ErrSIPTrunkNotFound) {
continue // keep nil in slice
} else if err != nil {
return nil, err
}
trunks[i] = t
}
} else {
var err error
trunks, err = s.store.ListSIPInboundTrunk(ctx)
if err != nil {
return nil, err
}
it, err := ListSIPInboundTrunk(ctx, s.store, req)
if err != nil {
return nil, err
}
trunks = req.FilterSlice(trunks)
defer it.Close()
return &livekit.ListSIPInboundTrunkResponse{Items: trunks}, nil
items, err := iters.All(it)
if err != nil {
return nil, err
}
return &livekit.ListSIPInboundTrunkResponse{Items: items}, nil
}
func (s *SIPService) ListSIPOutboundTrunk(ctx context.Context, req *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) {
@@ -263,29 +278,17 @@ func (s *SIPService) ListSIPOutboundTrunk(ctx context.Context, req *livekit.List
if s.store == nil {
return nil, ErrSIPNotConnected
}
var trunks []*livekit.SIPOutboundTrunkInfo
if len(req.TrunkIds) != 0 {
trunks = make([]*livekit.SIPOutboundTrunkInfo, len(req.TrunkIds))
for i, id := range req.TrunkIds {
t, err := s.store.LoadSIPOutboundTrunk(ctx, id)
if errors.Is(err, ErrSIPTrunkNotFound) {
continue // keep nil in slice
} else if err != nil {
return nil, err
}
trunks[i] = t
}
} else {
var err error
trunks, err = s.store.ListSIPOutboundTrunk(ctx)
if err != nil {
return nil, err
}
it, err := ListSIPOutboundTrunk(ctx, s.store, req)
if err != nil {
return nil, err
}
trunks = req.FilterSlice(trunks)
defer it.Close()
return &livekit.ListSIPOutboundTrunkResponse{Items: trunks}, nil
items, err := iters.All(it)
if err != nil {
return nil, err
}
return &livekit.ListSIPOutboundTrunkResponse{Items: items}, nil
}
func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) {
@@ -335,12 +338,14 @@ func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.Cre
}
// Validate all rules including the new one first.
list, err := s.store.ListSIPDispatchRule(ctx)
it, err := ListSIPDispatchRule(ctx, s.store, &livekit.ListSIPDispatchRuleRequest{
TrunkIds: req.TrunkIds,
}, info)
if err != nil {
return nil, err
}
list = append(list, info)
if err = sip.ValidateDispatchRules(list); err != nil {
defer it.Close()
if _, err = sip.ValidateDispatchRulesIter(it); err != nil {
return nil, err
}
@@ -352,6 +357,18 @@ func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.Cre
return info, nil
}
func ListSIPDispatchRule(ctx context.Context, s SIPStore, req *livekit.ListSIPDispatchRuleRequest, add ...*livekit.SIPDispatchRuleInfo) (iters.Iter[*livekit.SIPDispatchRuleInfo], error) {
if s == nil {
return nil, ErrSIPNotConnected
}
pages := livekit.ListPageIter(s.ListSIPDispatchRule, req)
it := iters.PagesAsIter(ctx, pages)
if len(add) != 0 {
it = iters.MultiIter(true, it, iters.Slice(add))
}
return it, nil
}
func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) {
if err := EnsureSIPAdminPermission(ctx); err != nil {
return nil, twirpAuthError(err)
@@ -359,29 +376,17 @@ func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListS
if s.store == nil {
return nil, ErrSIPNotConnected
}
var rules []*livekit.SIPDispatchRuleInfo
if len(req.DispatchRuleIds) != 0 {
rules = make([]*livekit.SIPDispatchRuleInfo, len(req.DispatchRuleIds))
for i, id := range req.DispatchRuleIds {
r, err := s.store.LoadSIPDispatchRule(ctx, id)
if errors.Is(err, ErrSIPDispatchRuleNotFound) {
continue // keep nil in slice
} else if err != nil {
return nil, err
}
rules[i] = r
}
} else {
var err error
rules, err = s.store.ListSIPDispatchRule(ctx)
if err != nil {
return nil, err
}
it, err := ListSIPDispatchRule(ctx, s.store, req)
if err != nil {
return nil, err
}
rules = req.FilterSlice(rules)
defer it.Close()
return &livekit.ListSIPDispatchRuleResponse{Items: rules}, nil
items, err := iters.All(it)
if err != nil {
return nil, err
}
return &livekit.ListSIPDispatchRuleResponse{Items: items}, nil
}
func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.DeleteSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) {
@@ -400,7 +405,7 @@ func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.Del
return nil, err
}
if err = s.store.DeleteSIPDispatchRule(ctx, info); err != nil {
if err = s.store.DeleteSIPDispatchRule(ctx, info.SipDispatchRuleId); err != nil {
return nil, err
}

View File

@@ -25,6 +25,24 @@ import (
"github.com/livekit/livekit-server/pkg/service"
)
func redisClientDocker(t testing.TB) *redis.Client {
addr := runRedis(t)
cli := redis.NewClient(&redis.Options{
Addr: addr,
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := cli.Ping(ctx).Err(); err != nil {
_ = cli.Close()
t.Fatal(err)
}
t.Cleanup(func() {
_ = cli.Close()
})
return cli
}
func redisClient(t testing.TB) *redis.Client {
cli := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
@@ -42,21 +60,7 @@ func redisClient(t testing.TB) *redis.Client {
t.Logf("local redis not available: %v", err)
t.Logf("starting redis in docker")
addr := runRedis(t)
cli = redis.NewClient(&redis.Options{
Addr: addr,
})
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err = cli.Ping(ctx).Err(); err != nil {
_ = cli.Close()
t.Fatal(err)
}
t.Cleanup(func() {
_ = cli.Close()
})
return cli
return redisClientDocker(t)
}
func TestIsValidDomain(t *testing.T) {