From 60a09cb4bee17ce8d720ff5381798ea24b800850 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Thu, 20 Feb 2025 13:13:21 +0200 Subject: [PATCH] Implement SIP iterators. (#3332) --- go.mod | 4 +- go.sum | 4 +- pkg/service/interfaces.go | 10 +- pkg/service/ioservice_sip.go | 48 +++- pkg/service/ioservice_sip_test.go | 122 +++++++++ pkg/service/redisstore.go | 118 ++++++++- pkg/service/redisstore_sip.go | 114 +++++--- pkg/service/redisstore_sip_test.go | 172 +++++++++--- pkg/service/redisstore_test.go | 4 + pkg/service/servicefakes/fake_sipstore.go | 302 +++++++++++++++++----- pkg/service/sip.go | 157 +++++------ pkg/service/utils_test.go | 34 +-- 12 files changed, 836 insertions(+), 253 deletions(-) create mode 100644 pkg/service/ioservice_sip_test.go diff --git a/go.mod b/go.mod index 40ebb02f6..1649c8258 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.23.6 require ( github.com/bep/debounce v1.2.1 github.com/d5/tengo/v2 v2.17.0 + github.com/dennwc/iters v1.0.1 github.com/dustin/go-humanize v1.0.1 github.com/elliotchance/orderedmap/v2 v2.7.0 github.com/florianl/go-tc v0.4.4 @@ -22,7 +23,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 - github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83 + github.com/livekit/protocol v1.33.1-0.20250218100713-bfe8101478c7 github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -77,7 +78,6 @@ require ( github.com/containerd/continuity v0.4.3 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dennwc/iters v1.0.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/docker/cli v26.1.4+incompatible // indirect github.com/docker/docker v27.1.1+incompatible // indirect diff --git a/go.sum b/go.sum index 55cff33b0..df97a7e09 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY= -github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83 h1:1HFZ41AaFE+disN7Md9g0MQNpnw9Y2p3QKbjYNtjQjA= -github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So= +github.com/livekit/protocol v1.33.1-0.20250218100713-bfe8101478c7 h1:c40dw41dbIm9Wdeezv8fuYfQicbFotYji3f7CrVa7lU= +github.com/livekit/protocol v1.33.1-0.20250218100713-bfe8101478c7/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 890820c56..e2a6978c5 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -87,15 +87,15 @@ type SIPStore interface { LoadSIPTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPTrunkInfo, error) LoadSIPInboundTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPInboundTrunkInfo, error) LoadSIPOutboundTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPOutboundTrunkInfo, error) - ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error) - ListSIPInboundTrunk(ctx context.Context) ([]*livekit.SIPInboundTrunkInfo, error) - ListSIPOutboundTrunk(ctx context.Context) ([]*livekit.SIPOutboundTrunkInfo, error) + ListSIPTrunk(ctx context.Context, opts *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) + ListSIPInboundTrunk(ctx context.Context, opts *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) + ListSIPOutboundTrunk(ctx context.Context, opts *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) DeleteSIPTrunk(ctx context.Context, sipTrunkID string) error StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) (*livekit.SIPDispatchRuleInfo, error) - ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error) - DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error + ListSIPDispatchRule(ctx context.Context, opts *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) + DeleteSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) error } //counterfeiter:generate . AgentStore diff --git a/pkg/service/ioservice_sip.go b/pkg/service/ioservice_sip.go index 8c4204828..cadb69aec 100644 --- a/pkg/service/ioservice_sip.go +++ b/pkg/service/ioservice_sip.go @@ -19,6 +19,7 @@ import ( "errors" "net/netip" + "github.com/dennwc/iters" "github.com/twitchtv/twirp" "github.com/livekit/protocol/livekit" @@ -30,23 +31,54 @@ import ( // matchSIPTrunk finds a SIP Trunk definition matching the request. // Returns nil if no rules matched or an error if there are conflicting definitions. func (s *IOInfoService) matchSIPTrunk(ctx context.Context, trunkID, calling, called string, srcIP netip.Addr) (*livekit.SIPInboundTrunkInfo, error) { - trunks, err := s.ss.ListSIPInboundTrunk(ctx) - if err != nil { - return nil, err + if s.ss == nil { + return nil, ErrSIPNotConnected } - return sip.MatchTrunk(trunks, srcIP, calling, called) + if trunkID != "" { + // This is a best-effort optimization. Fallthrough to listing trunks if it doesn't work. + if tr, err := s.ss.LoadSIPInboundTrunk(ctx, trunkID); err == nil { + tr, err = sip.MatchTrunkIter(iters.Slice([]*livekit.SIPInboundTrunkInfo{tr}), srcIP, calling, called) + if err == nil { + return tr, nil + } + } + } + it := s.SelectSIPInboundTrunk(ctx, called) + return sip.MatchTrunkIter(it, srcIP, calling, called) +} + +func (s *IOInfoService) SelectSIPInboundTrunk(ctx context.Context, called string) iters.Iter[*livekit.SIPInboundTrunkInfo] { + it := livekit.ListPageIter(s.ss.ListSIPInboundTrunk, &livekit.ListSIPInboundTrunkRequest{ + Numbers: []string{called}, + }) + return iters.PagesAsIter(ctx, it) } // matchSIPDispatchRule finds the best dispatch rule matching the request parameters. Returns an error if no rule matched. // Trunk parameter can be nil, in which case only wildcard dispatch rules will be effective (ones without Trunk IDs). func (s *IOInfoService) matchSIPDispatchRule(ctx context.Context, trunk *livekit.SIPInboundTrunkInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) { + if s.ss == nil { + return nil, ErrSIPNotConnected + } + var trunkID string + if trunk != nil { + trunkID = trunk.SipTrunkId + } // Trunk can still be nil here in case none matched or were defined. // This is still fine, but only in case we'll match exactly one wildcard dispatch rule. - rules, err := s.ss.ListSIPDispatchRule(ctx) - if err != nil { - return nil, err + it := s.SelectSIPDispatchRule(ctx, trunkID) + return sip.MatchDispatchRuleIter(trunk, it, req) +} + +func (s *IOInfoService) SelectSIPDispatchRule(ctx context.Context, trunkID string) iters.Iter[*livekit.SIPDispatchRuleInfo] { + var trunkIDs []string + if trunkID != "" { + trunkIDs = []string{trunkID} } - return sip.MatchDispatchRule(trunk, rules, req) + it := livekit.ListPageIter(s.ss.ListSIPDispatchRule, &livekit.ListSIPDispatchRuleRequest{ + TrunkIds: trunkIDs, + }) + return iters.PagesAsIter(ctx, it) } func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.EvaluateSIPDispatchRulesRequest) (*rpc.EvaluateSIPDispatchRulesResponse, error) { diff --git a/pkg/service/ioservice_sip_test.go b/pkg/service/ioservice_sip_test.go new file mode 100644 index 000000000..2a8b75288 --- /dev/null +++ b/pkg/service/ioservice_sip_test.go @@ -0,0 +1,122 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service_test + +import ( + "context" + "github.com/dennwc/iters" + "github.com/livekit/livekit-server/pkg/service" + "github.com/livekit/psrpc" + "slices" + "testing" + + "github.com/livekit/protocol/livekit" + "github.com/stretchr/testify/require" +) + +func ioStoreDocker(t testing.TB) (*service.IOInfoService, *service.RedisStore) { + r := redisClientDocker(t) + bus := psrpc.NewRedisMessageBus(r) + rs := service.NewRedisStore(r) + io, err := service.NewIOInfoService(bus, rs, rs, rs, nil) + require.NoError(t, err) + return io, rs +} + +func TestSIPTrunkSelect(t *testing.T) { + ctx := context.Background() + s, rs := ioStoreDocker(t) + + for _, tr := range []*livekit.SIPInboundTrunkInfo{ + {SipTrunkId: "any", Numbers: nil}, + {SipTrunkId: "B", Numbers: []string{"B1", "B2"}}, + {SipTrunkId: "BC", Numbers: []string{"B1", "C1"}}, + } { + err := rs.StoreSIPInboundTrunk(ctx, tr) + require.NoError(t, err) + } + + for _, tr := range []*livekit.SIPTrunkInfo{ + {SipTrunkId: "old-any", OutboundNumber: ""}, + {SipTrunkId: "old-A", OutboundNumber: "A"}, + } { + err := rs.StoreSIPTrunk(ctx, tr) + require.NoError(t, err) + } + + for _, c := range []struct { + number string + exp []string + }{ + {"A", []string{"old-A", "old-any", "any"}}, + {"B1", []string{"B", "BC", "old-any", "any"}}, + {"B2", []string{"B", "old-any", "any"}}, + {"C1", []string{"BC", "old-any", "any"}}, + {"wrong", []string{"old-any", "any"}}, + } { + t.Run(c.number, func(t *testing.T) { + it := s.SelectSIPInboundTrunk(ctx, c.number) + defer it.Close() + list, err := iters.All(it) + require.NoError(t, err) + var ids []string + for _, v := range list { + ids = append(ids, v.SipTrunkId) + } + slices.Sort(c.exp) + slices.Sort(ids) + require.Equal(t, c.exp, ids) + }) + } +} + +func TestSIPRuleSelect(t *testing.T) { + ctx := context.Background() + s, rs := ioStoreDocker(t) + + for _, r := range []*livekit.SIPDispatchRuleInfo{ + {SipDispatchRuleId: "any", TrunkIds: nil}, + {SipDispatchRuleId: "B", TrunkIds: []string{"B1", "B2"}}, + {SipDispatchRuleId: "BC", TrunkIds: []string{"B1", "C1"}}, + } { + err := rs.StoreSIPDispatchRule(ctx, r) + require.NoError(t, err) + } + + for _, c := range []struct { + trunk string + exp []string + }{ + {"A", []string{"any"}}, + {"B1", []string{"B", "BC", "any"}}, + {"B2", []string{"B", "any"}}, + {"C1", []string{"BC", "any"}}, + {"wrong", []string{"any"}}, + } { + t.Run(c.trunk, func(t *testing.T) { + it := s.SelectSIPDispatchRule(ctx, c.trunk) + defer it.Close() + list, err := iters.All(it) + require.NoError(t, err) + var ids []string + for _, v := range list { + ids = append(ids, v.SipDispatchRuleId) + } + slices.Sort(c.exp) + slices.Sort(ids) + require.Equal(t, c.exp, ids) + }) + } +} diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index eddf82789..45576d37f 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -17,6 +17,7 @@ package service import ( "context" "fmt" + "slices" "strconv" "strings" "time" @@ -858,7 +859,7 @@ func (s *RedisStore) DeleteAgentDispatch(_ context.Context, dispatch *livekit.Ag func (s *RedisStore) ListAgentDispatches(_ context.Context, roomName livekit.RoomName) ([]*livekit.AgentDispatch, error) { key := AgentDispatchPrefix + string(roomName) - dispatches, err := redisLoadMany[livekit.AgentDispatch](s.ctx, s, key) + dispatches, err := redisLoadAll[livekit.AgentDispatch](s.ctx, s, key) if err != nil { return nil, err } @@ -869,7 +870,7 @@ func (s *RedisStore) ListAgentDispatches(_ context.Context, roomName livekit.Roo } key = AgentJobPrefix + string(roomName) - jobs, err := redisLoadMany[livekit.Job](s.ctx, s, key) + jobs, err := redisLoadAll[livekit.Job](s.ctx, s, key) if err != nil { return nil, err } @@ -936,10 +937,12 @@ func redisStoreOne(ctx context.Context, s *RedisStore, key, id string, p proto.M return s.rc.HSet(s.ctx, key, id, data).Err() } -func redisLoadOne[T any, P interface { +type protoMsg[T any] interface { *T proto.Message -}](ctx context.Context, s *RedisStore, key, id string, notFoundErr error) (P, error) { +} + +func redisLoadOne[T any, P protoMsg[T]](ctx context.Context, s *RedisStore, key, id string, notFoundErr error) (P, error) { data, err := s.rc.HGet(s.ctx, key, id).Result() if err == redis.Nil { return nil, notFoundErr @@ -954,11 +957,8 @@ func redisLoadOne[T any, P interface { return p, err } -func redisLoadMany[T any, P interface { - *T - proto.Message -}](ctx context.Context, s *RedisStore, key string) ([]P, error) { - data, err := s.rc.HGetAll(s.ctx, key).Result() +func redisLoadAll[T any, P protoMsg[T]](ctx context.Context, s *RedisStore, key string) ([]P, error) { + data, err := s.rc.HVals(s.ctx, key).Result() if err == redis.Nil { return nil, nil } else if err != nil { @@ -976,3 +976,103 @@ func redisLoadMany[T any, P interface { return list, nil } + +func redisLoadBatch[T any, P protoMsg[T]](ctx context.Context, s *RedisStore, key string, ids []string, keepEmpty bool) ([]P, error) { + data, err := s.rc.HMGet(s.ctx, key, ids...).Result() + if err == redis.Nil { + if keepEmpty { + return make([]P, len(ids)), nil + } + return nil, nil + } else if err != nil { + return nil, err + } + if !keepEmpty { + list := make([]P, 0, len(data)) + for _, v := range data { + if d, ok := v.(string); ok { + var p P = new(T) + if err = proto.Unmarshal([]byte(d), p); err != nil { + return list, err + } + list = append(list, p) + } + } + return list, nil + } + // Keep zero values where ID was not found. + list := make([]P, len(ids)) + for i := range ids { + if d, ok := data[i].(string); ok { + var p P = new(T) + if err = proto.Unmarshal([]byte(d), p); err != nil { + return list, err + } + list[i] = p + } + } + return list, nil +} + +func redisIDs(ctx context.Context, s *RedisStore, key string) ([]string, error) { + list, err := s.rc.HKeys(s.ctx, key).Result() + if err == redis.Nil { + return nil, nil + } else if err != nil { + return nil, err + } + slices.Sort(list) + return list, nil +} + +type protoEntity[T any] interface { + protoMsg[T] + ID() string +} + +func redisIterPage[T any, P protoEntity[T]](ctx context.Context, s *RedisStore, key string, page *livekit.Pagination) ([]P, error) { + if page == nil { + return redisLoadAll[T, P](ctx, s, key) + } + ids, err := redisIDs(ctx, s, key) + if err != nil { + return nil, err + } + if len(ids) == 0 { + return nil, nil + } + if page.AfterId != "" { + i, ok := slices.BinarySearch(ids, page.AfterId) + if ok { + i++ + } + ids = ids[i:] + if len(ids) == 0 { + return nil, nil + } + } + limit := 1000 + if page.Limit > 0 { + limit = int(page.Limit) + } + if len(ids) > limit { + ids = ids[:limit] + } + return redisLoadBatch[T, P](ctx, s, key, ids, false) +} + +func sortProtos[T any, P protoEntity[T]](arr []P) { + slices.SortFunc(arr, func(a, b P) int { + return strings.Compare(a.ID(), b.ID()) + }) +} + +func sortPage[T any, P protoEntity[T]](items []P, page *livekit.Pagination) []P { + sortProtos(items) + if page != nil { + if limit := int(page.Limit); limit > 0 && len(items) > limit { + items = items[:limit] + } + } + return items +} diff --git a/pkg/service/redisstore_sip.go b/pkg/service/redisstore_sip.go index f8c2e0917..e6dc10899 100644 --- a/pkg/service/redisstore_sip.go +++ b/pkg/service/redisstore_sip.go @@ -114,68 +114,108 @@ func (s *RedisStore) DeleteSIPTrunk(ctx context.Context, id string) error { return err } -func (s *RedisStore) listSIPLegacyTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error) { - return redisLoadMany[livekit.SIPTrunkInfo](ctx, s, SIPTrunkKey) +func (s *RedisStore) listSIPLegacyTrunk(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPTrunkInfo, error) { + return redisIterPage[livekit.SIPTrunkInfo](ctx, s, SIPTrunkKey, page) } -func (s *RedisStore) listSIPInboundTrunk(ctx context.Context) ([]*livekit.SIPInboundTrunkInfo, error) { - return redisLoadMany[livekit.SIPInboundTrunkInfo](ctx, s, SIPInboundTrunkKey) +func (s *RedisStore) listSIPInboundTrunk(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPInboundTrunkInfo, error) { + return redisIterPage[livekit.SIPInboundTrunkInfo](ctx, s, SIPInboundTrunkKey, page) } -func (s *RedisStore) listSIPOutboundTrunk(ctx context.Context) ([]*livekit.SIPOutboundTrunkInfo, error) { - return redisLoadMany[livekit.SIPOutboundTrunkInfo](ctx, s, SIPOutboundTrunkKey) +func (s *RedisStore) listSIPOutboundTrunk(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPOutboundTrunkInfo, error) { + return redisIterPage[livekit.SIPOutboundTrunkInfo](ctx, s, SIPOutboundTrunkKey, page) } -func (s *RedisStore) ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error) { - infos, err := s.listSIPLegacyTrunk(ctx) +func (s *RedisStore) listSIPDispatchRule(ctx context.Context, page *livekit.Pagination) ([]*livekit.SIPDispatchRuleInfo, error) { + return redisIterPage[livekit.SIPDispatchRuleInfo](ctx, s, SIPDispatchRuleKey, page) +} + +func (s *RedisStore) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) { + var items []*livekit.SIPTrunkInfo + old, err := s.listSIPLegacyTrunk(ctx, req.Page) if err != nil { return nil, err } - in, err := s.listSIPInboundTrunk(ctx) + for _, t := range old { + v := t + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } + } + in, err := s.listSIPInboundTrunk(ctx, req.Page) if err != nil { - return infos, err + return nil, err } for _, t := range in { - infos = append(infos, t.AsTrunkInfo()) + v := t.AsTrunkInfo() + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } } - out, err := s.listSIPOutboundTrunk(ctx) + out, err := s.listSIPOutboundTrunk(ctx, req.Page) if err != nil { - return infos, err + return nil, err } for _, t := range out { - infos = append(infos, t.AsTrunkInfo()) + v := t.AsTrunkInfo() + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } } - return infos, nil + items = sortPage(items, req.Page) + return &livekit.ListSIPTrunkResponse{Items: items}, nil } -func (s *RedisStore) ListSIPInboundTrunk(ctx context.Context) (infos []*livekit.SIPInboundTrunkInfo, err error) { - in, err := s.listSIPInboundTrunk(ctx) +func (s *RedisStore) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) { + var items []*livekit.SIPInboundTrunkInfo + in, err := s.listSIPInboundTrunk(ctx, req.Page) if err != nil { - return in, err + return nil, err } - old, err := s.listSIPLegacyTrunk(ctx) + for _, t := range in { + v := t + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } + } + old, err := s.listSIPLegacyTrunk(ctx, req.Page) if err != nil { return nil, err } for _, t := range old { - in = append(in, t.AsInbound()) + v := t.AsInbound() + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } } - return in, nil + items = sortPage(items, req.Page) + return &livekit.ListSIPInboundTrunkResponse{Items: items}, nil } -func (s *RedisStore) ListSIPOutboundTrunk(ctx context.Context) (infos []*livekit.SIPOutboundTrunkInfo, err error) { - out, err := s.listSIPOutboundTrunk(ctx) +func (s *RedisStore) ListSIPOutboundTrunk(ctx context.Context, req *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) { + var items []*livekit.SIPOutboundTrunkInfo + out, err := s.listSIPOutboundTrunk(ctx, req.Page) if err != nil { - return out, err + return nil, err } - old, err := s.listSIPLegacyTrunk(ctx) + for _, t := range out { + v := t + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } + } + old, err := s.listSIPLegacyTrunk(ctx, req.Page) if err != nil { return nil, err } for _, t := range old { - out = append(out, t.AsOutbound()) + v := t.AsOutbound() + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } } - return out, nil + items = sortPage(items, req.Page) + return &livekit.ListSIPOutboundTrunkResponse{Items: items}, nil } func (s *RedisStore) StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error { @@ -186,10 +226,22 @@ func (s *RedisStore) LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleId return redisLoadOne[livekit.SIPDispatchRuleInfo](ctx, s, SIPDispatchRuleKey, sipDispatchRuleId, ErrSIPDispatchRuleNotFound) } -func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error { - return s.rc.HDel(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId).Err() +func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, sipDispatchRuleId string) error { + return s.rc.HDel(s.ctx, SIPDispatchRuleKey, sipDispatchRuleId).Err() } -func (s *RedisStore) ListSIPDispatchRule(ctx context.Context) (infos []*livekit.SIPDispatchRuleInfo, err error) { - return redisLoadMany[livekit.SIPDispatchRuleInfo](ctx, s, SIPDispatchRuleKey) +func (s *RedisStore) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) { + var items []*livekit.SIPDispatchRuleInfo + out, err := s.listSIPDispatchRule(ctx, req.Page) + if err != nil { + return nil, err + } + for _, t := range out { + v := t + if req.Filter(v) && req.Page.Filter(v) { + items = append(items, v) + } + } + items = sortPage(items, req.Page) + return &livekit.ListSIPDispatchRuleResponse{Items: items}, nil } diff --git a/pkg/service/redisstore_sip_test.go b/pkg/service/redisstore_sip_test.go index 970240f79..bfa029d4a 100644 --- a/pkg/service/redisstore_sip_test.go +++ b/pkg/service/redisstore_sip_test.go @@ -16,10 +16,12 @@ package service_test import ( "context" + "fmt" "slices" "strings" "testing" + "github.com/dennwc/iters" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" @@ -31,14 +33,14 @@ import ( func TestSIPStoreDispatch(t *testing.T) { ctx := context.Background() - rs := redisStore(t) + rs := redisStoreDocker(t) id := guid.New(utils.SIPDispatchRulePrefix) // No dispatch rules initially. - list, err := rs.ListSIPDispatchRule(ctx) + list, err := rs.ListSIPDispatchRule(ctx, &livekit.ListSIPDispatchRuleRequest{}) require.NoError(t, err) - require.Empty(t, list) + require.Empty(t, list.Items) // Loading non-existent dispatch should return proper not found error. got, err := rs.LoadSIPDispatchRule(ctx, id) @@ -69,21 +71,21 @@ func TestSIPStoreDispatch(t *testing.T) { require.True(t, proto.Equal(rule, got)) // Listing - list, err = rs.ListSIPDispatchRule(ctx) + list, err = rs.ListSIPDispatchRule(ctx, &livekit.ListSIPDispatchRuleRequest{}) require.NoError(t, err) - require.Len(t, list, 1) - require.True(t, proto.Equal(rule, list[0])) + require.Len(t, list.Items, 1) + require.True(t, proto.Equal(rule, list.Items[0])) // Deletion. Should not return error if not exists. - err = rs.DeleteSIPDispatchRule(ctx, &livekit.SIPDispatchRuleInfo{SipDispatchRuleId: id}) + err = rs.DeleteSIPDispatchRule(ctx, id) require.NoError(t, err) - err = rs.DeleteSIPDispatchRule(ctx, &livekit.SIPDispatchRuleInfo{SipDispatchRuleId: id}) + err = rs.DeleteSIPDispatchRule(ctx, id) require.NoError(t, err) // Check that it's deleted. - list, err = rs.ListSIPDispatchRule(ctx) + list, err = rs.ListSIPDispatchRule(ctx, &livekit.ListSIPDispatchRuleRequest{}) require.NoError(t, err) - require.Empty(t, list) + require.Empty(t, list.Items) got, err = rs.LoadSIPDispatchRule(ctx, id) require.Equal(t, service.ErrSIPDispatchRuleNotFound, err) @@ -92,7 +94,7 @@ func TestSIPStoreDispatch(t *testing.T) { func TestSIPStoreTrunk(t *testing.T) { ctx := context.Background() - rs := redisStore(t) + rs := redisStoreDocker(t) oldID := guid.New(utils.SIPTrunkPrefix) inID := guid.New(utils.SIPTrunkPrefix) @@ -100,25 +102,25 @@ func TestSIPStoreTrunk(t *testing.T) { // No trunks initially. Check legacy, inbound, outbound. // Loading non-existent trunk should return proper not found error. - oldList, err := rs.ListSIPTrunk(ctx) + oldList, err := rs.ListSIPTrunk(ctx, &livekit.ListSIPTrunkRequest{}) require.NoError(t, err) - require.Empty(t, oldList) + require.Empty(t, oldList.Items) old, err := rs.LoadSIPTrunk(ctx, oldID) require.Equal(t, service.ErrSIPTrunkNotFound, err) require.Nil(t, old) - inList, err := rs.ListSIPInboundTrunk(ctx) + inList, err := rs.ListSIPInboundTrunk(ctx, &livekit.ListSIPInboundTrunkRequest{}) require.NoError(t, err) - require.Empty(t, inList) + require.Empty(t, inList.Items) in, err := rs.LoadSIPInboundTrunk(ctx, oldID) require.Equal(t, service.ErrSIPTrunkNotFound, err) require.Nil(t, in) - outList, err := rs.ListSIPOutboundTrunk(ctx) + outList, err := rs.ListSIPOutboundTrunk(ctx, &livekit.ListSIPOutboundTrunkRequest{}) require.NoError(t, err) - require.Empty(t, outList) + require.Empty(t, outList.Items) out, err := rs.LoadSIPOutboundTrunk(ctx, oldID) require.Equal(t, service.ErrSIPTrunkNotFound, err) @@ -187,33 +189,33 @@ func TestSIPStoreTrunk(t *testing.T) { require.True(t, proto.Equal(oldT.AsOutbound(), outT2)) // Listing (always shows legacy + new) - listOld, err := rs.ListSIPTrunk(ctx) + listOld, err := rs.ListSIPTrunk(ctx, &livekit.ListSIPTrunkRequest{}) require.NoError(t, err) - require.Len(t, listOld, 3) - slices.SortFunc(listOld, func(a, b *livekit.SIPTrunkInfo) int { + require.Len(t, listOld.Items, 3) + slices.SortFunc(listOld.Items, func(a, b *livekit.SIPTrunkInfo) int { return strings.Compare(a.Name, b.Name) }) - require.True(t, proto.Equal(inT.AsTrunkInfo(), listOld[0])) - require.True(t, proto.Equal(oldT, listOld[1])) - require.True(t, proto.Equal(outT.AsTrunkInfo(), listOld[2])) + require.True(t, proto.Equal(inT.AsTrunkInfo(), listOld.Items[0])) + require.True(t, proto.Equal(oldT, listOld.Items[1])) + require.True(t, proto.Equal(outT.AsTrunkInfo(), listOld.Items[2])) - listIn, err := rs.ListSIPInboundTrunk(ctx) + listIn, err := rs.ListSIPInboundTrunk(ctx, &livekit.ListSIPInboundTrunkRequest{}) require.NoError(t, err) - require.Len(t, listIn, 2) - slices.SortFunc(listIn, func(a, b *livekit.SIPInboundTrunkInfo) int { + require.Len(t, listIn.Items, 2) + slices.SortFunc(listIn.Items, func(a, b *livekit.SIPInboundTrunkInfo) int { return strings.Compare(a.Name, b.Name) }) - require.True(t, proto.Equal(inT, listIn[0])) - require.True(t, proto.Equal(oldT.AsInbound(), listIn[1])) + require.True(t, proto.Equal(inT, listIn.Items[0])) + require.True(t, proto.Equal(oldT.AsInbound(), listIn.Items[1])) - listOut, err := rs.ListSIPOutboundTrunk(ctx) + listOut, err := rs.ListSIPOutboundTrunk(ctx, &livekit.ListSIPOutboundTrunkRequest{}) require.NoError(t, err) - require.Len(t, listOut, 2) - slices.SortFunc(listOut, func(a, b *livekit.SIPOutboundTrunkInfo) int { + require.Len(t, listOut.Items, 2) + slices.SortFunc(listOut.Items, func(a, b *livekit.SIPOutboundTrunkInfo) int { return strings.Compare(a.Name, b.Name) }) - require.True(t, proto.Equal(oldT.AsOutbound(), listOut[0])) - require.True(t, proto.Equal(outT, listOut[1])) + require.True(t, proto.Equal(oldT.AsOutbound(), listOut.Items[0])) + require.True(t, proto.Equal(outT, listOut.Items[1])) // Deletion. Should not return error if not exists. err = rs.DeleteSIPTrunk(ctx, oldID) @@ -237,17 +239,17 @@ func TestSIPStoreTrunk(t *testing.T) { require.NoError(t, err) // Check everything is deleted. - oldList, err = rs.ListSIPTrunk(ctx) + oldList, err = rs.ListSIPTrunk(ctx, &livekit.ListSIPTrunkRequest{}) require.NoError(t, err) - require.Empty(t, oldList) + require.Empty(t, oldList.Items) - inList, err = rs.ListSIPInboundTrunk(ctx) + inList, err = rs.ListSIPInboundTrunk(ctx, &livekit.ListSIPInboundTrunkRequest{}) require.NoError(t, err) - require.Empty(t, inList) + require.Empty(t, inList.Items) - outList, err = rs.ListSIPOutboundTrunk(ctx) + outList, err = rs.ListSIPOutboundTrunk(ctx, &livekit.ListSIPOutboundTrunkRequest{}) require.NoError(t, err) - require.Empty(t, outList) + require.Empty(t, outList.Items) old, err = rs.LoadSIPTrunk(ctx, oldID) require.Equal(t, service.ErrSIPTrunkNotFound, err) @@ -261,3 +263,95 @@ func TestSIPStoreTrunk(t *testing.T) { require.Equal(t, service.ErrSIPTrunkNotFound, err) require.Nil(t, out) } + +func TestSIPTrunkList(t *testing.T) { + s := redisStoreDocker(t) + + testIter(t, func(ctx context.Context, id string) error { + if strings.HasSuffix(id, "0") { + return s.StoreSIPTrunk(ctx, &livekit.SIPTrunkInfo{ + SipTrunkId: id, + OutboundNumber: id, + }) + } + return s.StoreSIPInboundTrunk(ctx, &livekit.SIPInboundTrunkInfo{ + SipTrunkId: id, + Numbers: []string{id}, + }) + }, func(ctx context.Context, page *livekit.Pagination, ids []string) iters.PageIter[*livekit.SIPInboundTrunkInfo] { + return livekit.ListPageIter(s.ListSIPInboundTrunk, &livekit.ListSIPInboundTrunkRequest{ + TrunkIds: ids, Page: page, + }) + }) +} + +func TestSIPRuleList(t *testing.T) { + s := redisStoreDocker(t) + + testIter(t, func(ctx context.Context, id string) error { + return s.StoreSIPDispatchRule(ctx, &livekit.SIPDispatchRuleInfo{ + SipDispatchRuleId: id, + TrunkIds: []string{id}, + }) + }, func(ctx context.Context, page *livekit.Pagination, ids []string) iters.PageIter[*livekit.SIPDispatchRuleInfo] { + return livekit.ListPageIter(s.ListSIPDispatchRule, &livekit.ListSIPDispatchRuleRequest{ + DispatchRuleIds: ids, Page: page, + }) + }) +} + +type listItem interface { + ID() string +} + +func allIDs[T listItem](t testing.TB, it iters.PageIter[T]) []string { + defer it.Close() + got, err := iters.AllPages(context.Background(), iters.MapPage(it, func(ctx context.Context, v T) (string, error) { + return v.ID(), nil + })) + require.NoError(t, err) + return got +} + +func testIter[T listItem]( + t *testing.T, + create func(ctx context.Context, id string) error, + list func(ctx context.Context, page *livekit.Pagination, ids []string) iters.PageIter[T], +) { + ctx := context.Background() + var all []string + for i := 0; i < 250; i++ { + id := fmt.Sprintf("%05d", i) + all = append(all, id) + err := create(ctx, id) + require.NoError(t, err) + } + + // List everything with pagination disabled (legacy) + it := list(ctx, nil, nil) + got := allIDs(t, it) + require.Equal(t, all, got) + + // List with pagination enabled + it = list(ctx, &livekit.Pagination{Limit: 10}, nil) + got = allIDs(t, it) + require.Equal(t, all, got) + + // List with pagination enabled, custom ID + it = list(ctx, &livekit.Pagination{Limit: 10, AfterId: all[55]}, nil) + got = allIDs(t, it) + require.Equal(t, all[56:], got) + + // List fixed IDs + it = list(ctx, &livekit.Pagination{Limit: 10, AfterId: all[5]}, []string{ + all[10], + all[3], + "invalid", + all[8], + }) + got = allIDs(t, it) + require.Equal(t, []string{ + all[8], + all[10], + }, got) +} diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index d584c4102..e84a6c41f 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -32,6 +32,10 @@ import ( "github.com/livekit/livekit-server/pkg/service" ) +func redisStoreDocker(t testing.TB) *service.RedisStore { + return service.NewRedisStore(redisClientDocker(t)) +} + func redisStore(t testing.TB) *service.RedisStore { return service.NewRedisStore(redisClient(t)) } diff --git a/pkg/service/servicefakes/fake_sipstore.go b/pkg/service/servicefakes/fake_sipstore.go index a7da774ec..055018fe0 100644 --- a/pkg/service/servicefakes/fake_sipstore.go +++ b/pkg/service/servicefakes/fake_sipstore.go @@ -10,11 +10,11 @@ import ( ) type FakeSIPStore struct { - DeleteSIPDispatchRuleStub func(context.Context, *livekit.SIPDispatchRuleInfo) error + DeleteSIPDispatchRuleStub func(context.Context, string) error deleteSIPDispatchRuleMutex sync.RWMutex deleteSIPDispatchRuleArgsForCall []struct { arg1 context.Context - arg2 *livekit.SIPDispatchRuleInfo + arg2 string } deleteSIPDispatchRuleReturns struct { result1 error @@ -34,56 +34,60 @@ type FakeSIPStore struct { deleteSIPTrunkReturnsOnCall map[int]struct { result1 error } - ListSIPDispatchRuleStub func(context.Context) ([]*livekit.SIPDispatchRuleInfo, error) + ListSIPDispatchRuleStub func(context.Context, *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) listSIPDispatchRuleMutex sync.RWMutex listSIPDispatchRuleArgsForCall []struct { arg1 context.Context + arg2 *livekit.ListSIPDispatchRuleRequest } listSIPDispatchRuleReturns struct { - result1 []*livekit.SIPDispatchRuleInfo + result1 *livekit.ListSIPDispatchRuleResponse result2 error } listSIPDispatchRuleReturnsOnCall map[int]struct { - result1 []*livekit.SIPDispatchRuleInfo + result1 *livekit.ListSIPDispatchRuleResponse result2 error } - ListSIPInboundTrunkStub func(context.Context) ([]*livekit.SIPInboundTrunkInfo, error) + ListSIPInboundTrunkStub func(context.Context, *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) listSIPInboundTrunkMutex sync.RWMutex listSIPInboundTrunkArgsForCall []struct { arg1 context.Context + arg2 *livekit.ListSIPInboundTrunkRequest } listSIPInboundTrunkReturns struct { - result1 []*livekit.SIPInboundTrunkInfo + result1 *livekit.ListSIPInboundTrunkResponse result2 error } listSIPInboundTrunkReturnsOnCall map[int]struct { - result1 []*livekit.SIPInboundTrunkInfo + result1 *livekit.ListSIPInboundTrunkResponse result2 error } - ListSIPOutboundTrunkStub func(context.Context) ([]*livekit.SIPOutboundTrunkInfo, error) + ListSIPOutboundTrunkStub func(context.Context, *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) listSIPOutboundTrunkMutex sync.RWMutex listSIPOutboundTrunkArgsForCall []struct { arg1 context.Context + arg2 *livekit.ListSIPOutboundTrunkRequest } listSIPOutboundTrunkReturns struct { - result1 []*livekit.SIPOutboundTrunkInfo + result1 *livekit.ListSIPOutboundTrunkResponse result2 error } listSIPOutboundTrunkReturnsOnCall map[int]struct { - result1 []*livekit.SIPOutboundTrunkInfo + result1 *livekit.ListSIPOutboundTrunkResponse result2 error } - ListSIPTrunkStub func(context.Context) ([]*livekit.SIPTrunkInfo, error) + ListSIPTrunkStub func(context.Context, *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) listSIPTrunkMutex sync.RWMutex listSIPTrunkArgsForCall []struct { arg1 context.Context + arg2 *livekit.ListSIPTrunkRequest } listSIPTrunkReturns struct { - result1 []*livekit.SIPTrunkInfo + result1 *livekit.ListSIPTrunkResponse result2 error } listSIPTrunkReturnsOnCall map[int]struct { - result1 []*livekit.SIPTrunkInfo + result1 *livekit.ListSIPTrunkResponse result2 error } LoadSIPDispatchRuleStub func(context.Context, string) (*livekit.SIPDispatchRuleInfo, error) @@ -142,6 +146,34 @@ type FakeSIPStore struct { result1 *livekit.SIPTrunkInfo result2 error } + SelectSIPDispatchRuleStub func(context.Context, string) ([]*livekit.SIPDispatchRuleInfo, error) + selectSIPDispatchRuleMutex sync.RWMutex + selectSIPDispatchRuleArgsForCall []struct { + arg1 context.Context + arg2 string + } + selectSIPDispatchRuleReturns struct { + result1 []*livekit.SIPDispatchRuleInfo + result2 error + } + selectSIPDispatchRuleReturnsOnCall map[int]struct { + result1 []*livekit.SIPDispatchRuleInfo + result2 error + } + SelectSIPInboundTrunkStub func(context.Context, string) ([]*livekit.SIPInboundTrunkInfo, error) + selectSIPInboundTrunkMutex sync.RWMutex + selectSIPInboundTrunkArgsForCall []struct { + arg1 context.Context + arg2 string + } + selectSIPInboundTrunkReturns struct { + result1 []*livekit.SIPInboundTrunkInfo + result2 error + } + selectSIPInboundTrunkReturnsOnCall map[int]struct { + result1 []*livekit.SIPInboundTrunkInfo + result2 error + } StoreSIPDispatchRuleStub func(context.Context, *livekit.SIPDispatchRuleInfo) error storeSIPDispatchRuleMutex sync.RWMutex storeSIPDispatchRuleArgsForCall []struct { @@ -194,12 +226,12 @@ type FakeSIPStore struct { invocationsMutex sync.RWMutex } -func (fake *FakeSIPStore) DeleteSIPDispatchRule(arg1 context.Context, arg2 *livekit.SIPDispatchRuleInfo) error { +func (fake *FakeSIPStore) DeleteSIPDispatchRule(arg1 context.Context, arg2 string) error { fake.deleteSIPDispatchRuleMutex.Lock() ret, specificReturn := fake.deleteSIPDispatchRuleReturnsOnCall[len(fake.deleteSIPDispatchRuleArgsForCall)] fake.deleteSIPDispatchRuleArgsForCall = append(fake.deleteSIPDispatchRuleArgsForCall, struct { arg1 context.Context - arg2 *livekit.SIPDispatchRuleInfo + arg2 string }{arg1, arg2}) stub := fake.DeleteSIPDispatchRuleStub fakeReturns := fake.deleteSIPDispatchRuleReturns @@ -220,13 +252,13 @@ func (fake *FakeSIPStore) DeleteSIPDispatchRuleCallCount() int { return len(fake.deleteSIPDispatchRuleArgsForCall) } -func (fake *FakeSIPStore) DeleteSIPDispatchRuleCalls(stub func(context.Context, *livekit.SIPDispatchRuleInfo) error) { +func (fake *FakeSIPStore) DeleteSIPDispatchRuleCalls(stub func(context.Context, string) error) { fake.deleteSIPDispatchRuleMutex.Lock() defer fake.deleteSIPDispatchRuleMutex.Unlock() fake.DeleteSIPDispatchRuleStub = stub } -func (fake *FakeSIPStore) DeleteSIPDispatchRuleArgsForCall(i int) (context.Context, *livekit.SIPDispatchRuleInfo) { +func (fake *FakeSIPStore) DeleteSIPDispatchRuleArgsForCall(i int) (context.Context, string) { fake.deleteSIPDispatchRuleMutex.RLock() defer fake.deleteSIPDispatchRuleMutex.RUnlock() argsForCall := fake.deleteSIPDispatchRuleArgsForCall[i] @@ -318,18 +350,19 @@ func (fake *FakeSIPStore) DeleteSIPTrunkReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeSIPStore) ListSIPDispatchRule(arg1 context.Context) ([]*livekit.SIPDispatchRuleInfo, error) { +func (fake *FakeSIPStore) ListSIPDispatchRule(arg1 context.Context, arg2 *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) { fake.listSIPDispatchRuleMutex.Lock() ret, specificReturn := fake.listSIPDispatchRuleReturnsOnCall[len(fake.listSIPDispatchRuleArgsForCall)] fake.listSIPDispatchRuleArgsForCall = append(fake.listSIPDispatchRuleArgsForCall, struct { arg1 context.Context - }{arg1}) + arg2 *livekit.ListSIPDispatchRuleRequest + }{arg1, arg2}) stub := fake.ListSIPDispatchRuleStub fakeReturns := fake.listSIPDispatchRuleReturns - fake.recordInvocation("ListSIPDispatchRule", []interface{}{arg1}) + fake.recordInvocation("ListSIPDispatchRule", []interface{}{arg1, arg2}) fake.listSIPDispatchRuleMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 @@ -343,57 +376,58 @@ func (fake *FakeSIPStore) ListSIPDispatchRuleCallCount() int { return len(fake.listSIPDispatchRuleArgsForCall) } -func (fake *FakeSIPStore) ListSIPDispatchRuleCalls(stub func(context.Context) ([]*livekit.SIPDispatchRuleInfo, error)) { +func (fake *FakeSIPStore) ListSIPDispatchRuleCalls(stub func(context.Context, *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error)) { fake.listSIPDispatchRuleMutex.Lock() defer fake.listSIPDispatchRuleMutex.Unlock() fake.ListSIPDispatchRuleStub = stub } -func (fake *FakeSIPStore) ListSIPDispatchRuleArgsForCall(i int) context.Context { +func (fake *FakeSIPStore) ListSIPDispatchRuleArgsForCall(i int) (context.Context, *livekit.ListSIPDispatchRuleRequest) { fake.listSIPDispatchRuleMutex.RLock() defer fake.listSIPDispatchRuleMutex.RUnlock() argsForCall := fake.listSIPDispatchRuleArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeSIPStore) ListSIPDispatchRuleReturns(result1 []*livekit.SIPDispatchRuleInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPDispatchRuleReturns(result1 *livekit.ListSIPDispatchRuleResponse, result2 error) { fake.listSIPDispatchRuleMutex.Lock() defer fake.listSIPDispatchRuleMutex.Unlock() fake.ListSIPDispatchRuleStub = nil fake.listSIPDispatchRuleReturns = struct { - result1 []*livekit.SIPDispatchRuleInfo + result1 *livekit.ListSIPDispatchRuleResponse result2 error }{result1, result2} } -func (fake *FakeSIPStore) ListSIPDispatchRuleReturnsOnCall(i int, result1 []*livekit.SIPDispatchRuleInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPDispatchRuleReturnsOnCall(i int, result1 *livekit.ListSIPDispatchRuleResponse, result2 error) { fake.listSIPDispatchRuleMutex.Lock() defer fake.listSIPDispatchRuleMutex.Unlock() fake.ListSIPDispatchRuleStub = nil if fake.listSIPDispatchRuleReturnsOnCall == nil { fake.listSIPDispatchRuleReturnsOnCall = make(map[int]struct { - result1 []*livekit.SIPDispatchRuleInfo + result1 *livekit.ListSIPDispatchRuleResponse result2 error }) } fake.listSIPDispatchRuleReturnsOnCall[i] = struct { - result1 []*livekit.SIPDispatchRuleInfo + result1 *livekit.ListSIPDispatchRuleResponse result2 error }{result1, result2} } -func (fake *FakeSIPStore) ListSIPInboundTrunk(arg1 context.Context) ([]*livekit.SIPInboundTrunkInfo, error) { +func (fake *FakeSIPStore) ListSIPInboundTrunk(arg1 context.Context, arg2 *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) { fake.listSIPInboundTrunkMutex.Lock() ret, specificReturn := fake.listSIPInboundTrunkReturnsOnCall[len(fake.listSIPInboundTrunkArgsForCall)] fake.listSIPInboundTrunkArgsForCall = append(fake.listSIPInboundTrunkArgsForCall, struct { arg1 context.Context - }{arg1}) + arg2 *livekit.ListSIPInboundTrunkRequest + }{arg1, arg2}) stub := fake.ListSIPInboundTrunkStub fakeReturns := fake.listSIPInboundTrunkReturns - fake.recordInvocation("ListSIPInboundTrunk", []interface{}{arg1}) + fake.recordInvocation("ListSIPInboundTrunk", []interface{}{arg1, arg2}) fake.listSIPInboundTrunkMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 @@ -407,57 +441,58 @@ func (fake *FakeSIPStore) ListSIPInboundTrunkCallCount() int { return len(fake.listSIPInboundTrunkArgsForCall) } -func (fake *FakeSIPStore) ListSIPInboundTrunkCalls(stub func(context.Context) ([]*livekit.SIPInboundTrunkInfo, error)) { +func (fake *FakeSIPStore) ListSIPInboundTrunkCalls(stub func(context.Context, *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error)) { fake.listSIPInboundTrunkMutex.Lock() defer fake.listSIPInboundTrunkMutex.Unlock() fake.ListSIPInboundTrunkStub = stub } -func (fake *FakeSIPStore) ListSIPInboundTrunkArgsForCall(i int) context.Context { +func (fake *FakeSIPStore) ListSIPInboundTrunkArgsForCall(i int) (context.Context, *livekit.ListSIPInboundTrunkRequest) { fake.listSIPInboundTrunkMutex.RLock() defer fake.listSIPInboundTrunkMutex.RUnlock() argsForCall := fake.listSIPInboundTrunkArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeSIPStore) ListSIPInboundTrunkReturns(result1 []*livekit.SIPInboundTrunkInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPInboundTrunkReturns(result1 *livekit.ListSIPInboundTrunkResponse, result2 error) { fake.listSIPInboundTrunkMutex.Lock() defer fake.listSIPInboundTrunkMutex.Unlock() fake.ListSIPInboundTrunkStub = nil fake.listSIPInboundTrunkReturns = struct { - result1 []*livekit.SIPInboundTrunkInfo + result1 *livekit.ListSIPInboundTrunkResponse result2 error }{result1, result2} } -func (fake *FakeSIPStore) ListSIPInboundTrunkReturnsOnCall(i int, result1 []*livekit.SIPInboundTrunkInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPInboundTrunkReturnsOnCall(i int, result1 *livekit.ListSIPInboundTrunkResponse, result2 error) { fake.listSIPInboundTrunkMutex.Lock() defer fake.listSIPInboundTrunkMutex.Unlock() fake.ListSIPInboundTrunkStub = nil if fake.listSIPInboundTrunkReturnsOnCall == nil { fake.listSIPInboundTrunkReturnsOnCall = make(map[int]struct { - result1 []*livekit.SIPInboundTrunkInfo + result1 *livekit.ListSIPInboundTrunkResponse result2 error }) } fake.listSIPInboundTrunkReturnsOnCall[i] = struct { - result1 []*livekit.SIPInboundTrunkInfo + result1 *livekit.ListSIPInboundTrunkResponse result2 error }{result1, result2} } -func (fake *FakeSIPStore) ListSIPOutboundTrunk(arg1 context.Context) ([]*livekit.SIPOutboundTrunkInfo, error) { +func (fake *FakeSIPStore) ListSIPOutboundTrunk(arg1 context.Context, arg2 *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) { fake.listSIPOutboundTrunkMutex.Lock() ret, specificReturn := fake.listSIPOutboundTrunkReturnsOnCall[len(fake.listSIPOutboundTrunkArgsForCall)] fake.listSIPOutboundTrunkArgsForCall = append(fake.listSIPOutboundTrunkArgsForCall, struct { arg1 context.Context - }{arg1}) + arg2 *livekit.ListSIPOutboundTrunkRequest + }{arg1, arg2}) stub := fake.ListSIPOutboundTrunkStub fakeReturns := fake.listSIPOutboundTrunkReturns - fake.recordInvocation("ListSIPOutboundTrunk", []interface{}{arg1}) + fake.recordInvocation("ListSIPOutboundTrunk", []interface{}{arg1, arg2}) fake.listSIPOutboundTrunkMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 @@ -471,57 +506,58 @@ func (fake *FakeSIPStore) ListSIPOutboundTrunkCallCount() int { return len(fake.listSIPOutboundTrunkArgsForCall) } -func (fake *FakeSIPStore) ListSIPOutboundTrunkCalls(stub func(context.Context) ([]*livekit.SIPOutboundTrunkInfo, error)) { +func (fake *FakeSIPStore) ListSIPOutboundTrunkCalls(stub func(context.Context, *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error)) { fake.listSIPOutboundTrunkMutex.Lock() defer fake.listSIPOutboundTrunkMutex.Unlock() fake.ListSIPOutboundTrunkStub = stub } -func (fake *FakeSIPStore) ListSIPOutboundTrunkArgsForCall(i int) context.Context { +func (fake *FakeSIPStore) ListSIPOutboundTrunkArgsForCall(i int) (context.Context, *livekit.ListSIPOutboundTrunkRequest) { fake.listSIPOutboundTrunkMutex.RLock() defer fake.listSIPOutboundTrunkMutex.RUnlock() argsForCall := fake.listSIPOutboundTrunkArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeSIPStore) ListSIPOutboundTrunkReturns(result1 []*livekit.SIPOutboundTrunkInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPOutboundTrunkReturns(result1 *livekit.ListSIPOutboundTrunkResponse, result2 error) { fake.listSIPOutboundTrunkMutex.Lock() defer fake.listSIPOutboundTrunkMutex.Unlock() fake.ListSIPOutboundTrunkStub = nil fake.listSIPOutboundTrunkReturns = struct { - result1 []*livekit.SIPOutboundTrunkInfo + result1 *livekit.ListSIPOutboundTrunkResponse result2 error }{result1, result2} } -func (fake *FakeSIPStore) ListSIPOutboundTrunkReturnsOnCall(i int, result1 []*livekit.SIPOutboundTrunkInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPOutboundTrunkReturnsOnCall(i int, result1 *livekit.ListSIPOutboundTrunkResponse, result2 error) { fake.listSIPOutboundTrunkMutex.Lock() defer fake.listSIPOutboundTrunkMutex.Unlock() fake.ListSIPOutboundTrunkStub = nil if fake.listSIPOutboundTrunkReturnsOnCall == nil { fake.listSIPOutboundTrunkReturnsOnCall = make(map[int]struct { - result1 []*livekit.SIPOutboundTrunkInfo + result1 *livekit.ListSIPOutboundTrunkResponse result2 error }) } fake.listSIPOutboundTrunkReturnsOnCall[i] = struct { - result1 []*livekit.SIPOutboundTrunkInfo + result1 *livekit.ListSIPOutboundTrunkResponse result2 error }{result1, result2} } -func (fake *FakeSIPStore) ListSIPTrunk(arg1 context.Context) ([]*livekit.SIPTrunkInfo, error) { +func (fake *FakeSIPStore) ListSIPTrunk(arg1 context.Context, arg2 *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) { fake.listSIPTrunkMutex.Lock() ret, specificReturn := fake.listSIPTrunkReturnsOnCall[len(fake.listSIPTrunkArgsForCall)] fake.listSIPTrunkArgsForCall = append(fake.listSIPTrunkArgsForCall, struct { arg1 context.Context - }{arg1}) + arg2 *livekit.ListSIPTrunkRequest + }{arg1, arg2}) stub := fake.ListSIPTrunkStub fakeReturns := fake.listSIPTrunkReturns - fake.recordInvocation("ListSIPTrunk", []interface{}{arg1}) + fake.recordInvocation("ListSIPTrunk", []interface{}{arg1, arg2}) fake.listSIPTrunkMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 @@ -535,41 +571,41 @@ func (fake *FakeSIPStore) ListSIPTrunkCallCount() int { return len(fake.listSIPTrunkArgsForCall) } -func (fake *FakeSIPStore) ListSIPTrunkCalls(stub func(context.Context) ([]*livekit.SIPTrunkInfo, error)) { +func (fake *FakeSIPStore) ListSIPTrunkCalls(stub func(context.Context, *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error)) { fake.listSIPTrunkMutex.Lock() defer fake.listSIPTrunkMutex.Unlock() fake.ListSIPTrunkStub = stub } -func (fake *FakeSIPStore) ListSIPTrunkArgsForCall(i int) context.Context { +func (fake *FakeSIPStore) ListSIPTrunkArgsForCall(i int) (context.Context, *livekit.ListSIPTrunkRequest) { fake.listSIPTrunkMutex.RLock() defer fake.listSIPTrunkMutex.RUnlock() argsForCall := fake.listSIPTrunkArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeSIPStore) ListSIPTrunkReturns(result1 []*livekit.SIPTrunkInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPTrunkReturns(result1 *livekit.ListSIPTrunkResponse, result2 error) { fake.listSIPTrunkMutex.Lock() defer fake.listSIPTrunkMutex.Unlock() fake.ListSIPTrunkStub = nil fake.listSIPTrunkReturns = struct { - result1 []*livekit.SIPTrunkInfo + result1 *livekit.ListSIPTrunkResponse result2 error }{result1, result2} } -func (fake *FakeSIPStore) ListSIPTrunkReturnsOnCall(i int, result1 []*livekit.SIPTrunkInfo, result2 error) { +func (fake *FakeSIPStore) ListSIPTrunkReturnsOnCall(i int, result1 *livekit.ListSIPTrunkResponse, result2 error) { fake.listSIPTrunkMutex.Lock() defer fake.listSIPTrunkMutex.Unlock() fake.ListSIPTrunkStub = nil if fake.listSIPTrunkReturnsOnCall == nil { fake.listSIPTrunkReturnsOnCall = make(map[int]struct { - result1 []*livekit.SIPTrunkInfo + result1 *livekit.ListSIPTrunkResponse result2 error }) } fake.listSIPTrunkReturnsOnCall[i] = struct { - result1 []*livekit.SIPTrunkInfo + result1 *livekit.ListSIPTrunkResponse result2 error }{result1, result2} } @@ -834,6 +870,136 @@ func (fake *FakeSIPStore) LoadSIPTrunkReturnsOnCall(i int, result1 *livekit.SIPT }{result1, result2} } +func (fake *FakeSIPStore) SelectSIPDispatchRule(arg1 context.Context, arg2 string) ([]*livekit.SIPDispatchRuleInfo, error) { + fake.selectSIPDispatchRuleMutex.Lock() + ret, specificReturn := fake.selectSIPDispatchRuleReturnsOnCall[len(fake.selectSIPDispatchRuleArgsForCall)] + fake.selectSIPDispatchRuleArgsForCall = append(fake.selectSIPDispatchRuleArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.SelectSIPDispatchRuleStub + fakeReturns := fake.selectSIPDispatchRuleReturns + fake.recordInvocation("SelectSIPDispatchRule", []interface{}{arg1, arg2}) + fake.selectSIPDispatchRuleMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeSIPStore) SelectSIPDispatchRuleCallCount() int { + fake.selectSIPDispatchRuleMutex.RLock() + defer fake.selectSIPDispatchRuleMutex.RUnlock() + return len(fake.selectSIPDispatchRuleArgsForCall) +} + +func (fake *FakeSIPStore) SelectSIPDispatchRuleCalls(stub func(context.Context, string) ([]*livekit.SIPDispatchRuleInfo, error)) { + fake.selectSIPDispatchRuleMutex.Lock() + defer fake.selectSIPDispatchRuleMutex.Unlock() + fake.SelectSIPDispatchRuleStub = stub +} + +func (fake *FakeSIPStore) SelectSIPDispatchRuleArgsForCall(i int) (context.Context, string) { + fake.selectSIPDispatchRuleMutex.RLock() + defer fake.selectSIPDispatchRuleMutex.RUnlock() + argsForCall := fake.selectSIPDispatchRuleArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeSIPStore) SelectSIPDispatchRuleReturns(result1 []*livekit.SIPDispatchRuleInfo, result2 error) { + fake.selectSIPDispatchRuleMutex.Lock() + defer fake.selectSIPDispatchRuleMutex.Unlock() + fake.SelectSIPDispatchRuleStub = nil + fake.selectSIPDispatchRuleReturns = struct { + result1 []*livekit.SIPDispatchRuleInfo + result2 error + }{result1, result2} +} + +func (fake *FakeSIPStore) SelectSIPDispatchRuleReturnsOnCall(i int, result1 []*livekit.SIPDispatchRuleInfo, result2 error) { + fake.selectSIPDispatchRuleMutex.Lock() + defer fake.selectSIPDispatchRuleMutex.Unlock() + fake.SelectSIPDispatchRuleStub = nil + if fake.selectSIPDispatchRuleReturnsOnCall == nil { + fake.selectSIPDispatchRuleReturnsOnCall = make(map[int]struct { + result1 []*livekit.SIPDispatchRuleInfo + result2 error + }) + } + fake.selectSIPDispatchRuleReturnsOnCall[i] = struct { + result1 []*livekit.SIPDispatchRuleInfo + result2 error + }{result1, result2} +} + +func (fake *FakeSIPStore) SelectSIPInboundTrunk(arg1 context.Context, arg2 string) ([]*livekit.SIPInboundTrunkInfo, error) { + fake.selectSIPInboundTrunkMutex.Lock() + ret, specificReturn := fake.selectSIPInboundTrunkReturnsOnCall[len(fake.selectSIPInboundTrunkArgsForCall)] + fake.selectSIPInboundTrunkArgsForCall = append(fake.selectSIPInboundTrunkArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.SelectSIPInboundTrunkStub + fakeReturns := fake.selectSIPInboundTrunkReturns + fake.recordInvocation("SelectSIPInboundTrunk", []interface{}{arg1, arg2}) + fake.selectSIPInboundTrunkMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeSIPStore) SelectSIPInboundTrunkCallCount() int { + fake.selectSIPInboundTrunkMutex.RLock() + defer fake.selectSIPInboundTrunkMutex.RUnlock() + return len(fake.selectSIPInboundTrunkArgsForCall) +} + +func (fake *FakeSIPStore) SelectSIPInboundTrunkCalls(stub func(context.Context, string) ([]*livekit.SIPInboundTrunkInfo, error)) { + fake.selectSIPInboundTrunkMutex.Lock() + defer fake.selectSIPInboundTrunkMutex.Unlock() + fake.SelectSIPInboundTrunkStub = stub +} + +func (fake *FakeSIPStore) SelectSIPInboundTrunkArgsForCall(i int) (context.Context, string) { + fake.selectSIPInboundTrunkMutex.RLock() + defer fake.selectSIPInboundTrunkMutex.RUnlock() + argsForCall := fake.selectSIPInboundTrunkArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeSIPStore) SelectSIPInboundTrunkReturns(result1 []*livekit.SIPInboundTrunkInfo, result2 error) { + fake.selectSIPInboundTrunkMutex.Lock() + defer fake.selectSIPInboundTrunkMutex.Unlock() + fake.SelectSIPInboundTrunkStub = nil + fake.selectSIPInboundTrunkReturns = struct { + result1 []*livekit.SIPInboundTrunkInfo + result2 error + }{result1, result2} +} + +func (fake *FakeSIPStore) SelectSIPInboundTrunkReturnsOnCall(i int, result1 []*livekit.SIPInboundTrunkInfo, result2 error) { + fake.selectSIPInboundTrunkMutex.Lock() + defer fake.selectSIPInboundTrunkMutex.Unlock() + fake.SelectSIPInboundTrunkStub = nil + if fake.selectSIPInboundTrunkReturnsOnCall == nil { + fake.selectSIPInboundTrunkReturnsOnCall = make(map[int]struct { + result1 []*livekit.SIPInboundTrunkInfo + result2 error + }) + } + fake.selectSIPInboundTrunkReturnsOnCall[i] = struct { + result1 []*livekit.SIPInboundTrunkInfo + result2 error + }{result1, result2} +} + func (fake *FakeSIPStore) StoreSIPDispatchRule(arg1 context.Context, arg2 *livekit.SIPDispatchRuleInfo) error { fake.storeSIPDispatchRuleMutex.Lock() ret, specificReturn := fake.storeSIPDispatchRuleReturnsOnCall[len(fake.storeSIPDispatchRuleArgsForCall)] @@ -1105,6 +1271,10 @@ func (fake *FakeSIPStore) Invocations() map[string][][]interface{} { defer fake.loadSIPOutboundTrunkMutex.RUnlock() fake.loadSIPTrunkMutex.RLock() defer fake.loadSIPTrunkMutex.RUnlock() + fake.selectSIPDispatchRuleMutex.RLock() + defer fake.selectSIPDispatchRuleMutex.RUnlock() + fake.selectSIPInboundTrunkMutex.RLock() + defer fake.selectSIPInboundTrunkMutex.RUnlock() fake.storeSIPDispatchRuleMutex.RLock() defer fake.storeSIPDispatchRuleMutex.RUnlock() fake.storeSIPInboundTrunkMutex.RLock() diff --git a/pkg/service/sip.go b/pkg/service/sip.go index 604f4e702..64f571c35 100644 --- a/pkg/service/sip.go +++ b/pkg/service/sip.go @@ -16,9 +16,9 @@ package service import ( "context" - "errors" "time" + "github.com/dennwc/iters" "github.com/twitchtv/twirp" "google.golang.org/protobuf/types/known/emptypb" @@ -88,12 +88,12 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT } // Validate all trunks including the new one first. - list, err := s.store.ListSIPInboundTrunk(ctx) + it, err := ListSIPInboundTrunk(ctx, s.store, &livekit.ListSIPInboundTrunkRequest{}, info.AsInbound()) if err != nil { return nil, err } - list = append(list, info.AsInbound()) - if err = sip.ValidateTrunks(list); err != nil { + defer it.Close() + if err = sip.ValidateTrunksIter(it); err != nil { return nil, err } @@ -125,12 +125,14 @@ func (s *SIPService) CreateSIPInboundTrunk(ctx context.Context, req *livekit.Cre // Keep ID empty still, so that validation can print "" instead of a non-existent ID in the error. // Validate all trunks including the new one first. - list, err := s.store.ListSIPInboundTrunk(ctx) + it, err := ListSIPInboundTrunk(ctx, s.store, &livekit.ListSIPInboundTrunkRequest{ + Numbers: req.GetTrunk().GetNumbers(), + }, info) if err != nil { return nil, err } - list = append(list, info) - if err = sip.ValidateTrunks(list); err != nil { + defer it.Close() + if err = sip.ValidateTrunksIter(it); err != nil { return nil, err } @@ -215,13 +217,38 @@ func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunk if s.store == nil { return nil, ErrSIPNotConnected } + it := livekit.ListPageIter(s.store.ListSIPTrunk, req) + defer it.Close() - trunks, err := s.store.ListSIPTrunk(ctx) + items, err := iters.AllPages(ctx, it) if err != nil { return nil, err } + return &livekit.ListSIPTrunkResponse{Items: items}, nil +} - return &livekit.ListSIPTrunkResponse{Items: trunks}, nil +func ListSIPInboundTrunk(ctx context.Context, s SIPStore, req *livekit.ListSIPInboundTrunkRequest, add ...*livekit.SIPInboundTrunkInfo) (iters.Iter[*livekit.SIPInboundTrunkInfo], error) { + if s == nil { + return nil, ErrSIPNotConnected + } + pages := livekit.ListPageIter(s.ListSIPInboundTrunk, req) + it := iters.PagesAsIter(ctx, pages) + if len(add) != 0 { + it = iters.MultiIter(true, it, iters.Slice(add)) + } + return it, nil +} + +func ListSIPOutboundTrunk(ctx context.Context, s SIPStore, req *livekit.ListSIPOutboundTrunkRequest, add ...*livekit.SIPOutboundTrunkInfo) (iters.Iter[*livekit.SIPOutboundTrunkInfo], error) { + if s == nil { + return nil, ErrSIPNotConnected + } + pages := livekit.ListPageIter(s.ListSIPOutboundTrunk, req) + it := iters.PagesAsIter(ctx, pages) + if len(add) != 0 { + it = iters.MultiIter(true, it, iters.Slice(add)) + } + return it, nil } func (s *SIPService) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) { @@ -231,29 +258,17 @@ func (s *SIPService) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListS if s.store == nil { return nil, ErrSIPNotConnected } - - var trunks []*livekit.SIPInboundTrunkInfo - if len(req.TrunkIds) != 0 { - trunks = make([]*livekit.SIPInboundTrunkInfo, len(req.TrunkIds)) - for i, id := range req.TrunkIds { - t, err := s.store.LoadSIPInboundTrunk(ctx, id) - if errors.Is(err, ErrSIPTrunkNotFound) { - continue // keep nil in slice - } else if err != nil { - return nil, err - } - trunks[i] = t - } - } else { - var err error - trunks, err = s.store.ListSIPInboundTrunk(ctx) - if err != nil { - return nil, err - } + it, err := ListSIPInboundTrunk(ctx, s.store, req) + if err != nil { + return nil, err } - trunks = req.FilterSlice(trunks) + defer it.Close() - return &livekit.ListSIPInboundTrunkResponse{Items: trunks}, nil + items, err := iters.All(it) + if err != nil { + return nil, err + } + return &livekit.ListSIPInboundTrunkResponse{Items: items}, nil } func (s *SIPService) ListSIPOutboundTrunk(ctx context.Context, req *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) { @@ -263,29 +278,17 @@ func (s *SIPService) ListSIPOutboundTrunk(ctx context.Context, req *livekit.List if s.store == nil { return nil, ErrSIPNotConnected } - - var trunks []*livekit.SIPOutboundTrunkInfo - if len(req.TrunkIds) != 0 { - trunks = make([]*livekit.SIPOutboundTrunkInfo, len(req.TrunkIds)) - for i, id := range req.TrunkIds { - t, err := s.store.LoadSIPOutboundTrunk(ctx, id) - if errors.Is(err, ErrSIPTrunkNotFound) { - continue // keep nil in slice - } else if err != nil { - return nil, err - } - trunks[i] = t - } - } else { - var err error - trunks, err = s.store.ListSIPOutboundTrunk(ctx) - if err != nil { - return nil, err - } + it, err := ListSIPOutboundTrunk(ctx, s.store, req) + if err != nil { + return nil, err } - trunks = req.FilterSlice(trunks) + defer it.Close() - return &livekit.ListSIPOutboundTrunkResponse{Items: trunks}, nil + items, err := iters.All(it) + if err != nil { + return nil, err + } + return &livekit.ListSIPOutboundTrunkResponse{Items: items}, nil } func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) { @@ -335,12 +338,14 @@ func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.Cre } // Validate all rules including the new one first. - list, err := s.store.ListSIPDispatchRule(ctx) + it, err := ListSIPDispatchRule(ctx, s.store, &livekit.ListSIPDispatchRuleRequest{ + TrunkIds: req.TrunkIds, + }, info) if err != nil { return nil, err } - list = append(list, info) - if err = sip.ValidateDispatchRules(list); err != nil { + defer it.Close() + if _, err = sip.ValidateDispatchRulesIter(it); err != nil { return nil, err } @@ -352,6 +357,18 @@ func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.Cre return info, nil } +func ListSIPDispatchRule(ctx context.Context, s SIPStore, req *livekit.ListSIPDispatchRuleRequest, add ...*livekit.SIPDispatchRuleInfo) (iters.Iter[*livekit.SIPDispatchRuleInfo], error) { + if s == nil { + return nil, ErrSIPNotConnected + } + pages := livekit.ListPageIter(s.ListSIPDispatchRule, req) + it := iters.PagesAsIter(ctx, pages) + if len(add) != 0 { + it = iters.MultiIter(true, it, iters.Slice(add)) + } + return it, nil +} + func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) { if err := EnsureSIPAdminPermission(ctx); err != nil { return nil, twirpAuthError(err) @@ -359,29 +376,17 @@ func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListS if s.store == nil { return nil, ErrSIPNotConnected } - - var rules []*livekit.SIPDispatchRuleInfo - if len(req.DispatchRuleIds) != 0 { - rules = make([]*livekit.SIPDispatchRuleInfo, len(req.DispatchRuleIds)) - for i, id := range req.DispatchRuleIds { - r, err := s.store.LoadSIPDispatchRule(ctx, id) - if errors.Is(err, ErrSIPDispatchRuleNotFound) { - continue // keep nil in slice - } else if err != nil { - return nil, err - } - rules[i] = r - } - } else { - var err error - rules, err = s.store.ListSIPDispatchRule(ctx) - if err != nil { - return nil, err - } + it, err := ListSIPDispatchRule(ctx, s.store, req) + if err != nil { + return nil, err } - rules = req.FilterSlice(rules) + defer it.Close() - return &livekit.ListSIPDispatchRuleResponse{Items: rules}, nil + items, err := iters.All(it) + if err != nil { + return nil, err + } + return &livekit.ListSIPDispatchRuleResponse{Items: items}, nil } func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.DeleteSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) { @@ -400,7 +405,7 @@ func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.Del return nil, err } - if err = s.store.DeleteSIPDispatchRule(ctx, info); err != nil { + if err = s.store.DeleteSIPDispatchRule(ctx, info.SipDispatchRuleId); err != nil { return nil, err } diff --git a/pkg/service/utils_test.go b/pkg/service/utils_test.go index 62d2975b8..5d8cdcaf0 100644 --- a/pkg/service/utils_test.go +++ b/pkg/service/utils_test.go @@ -25,6 +25,24 @@ import ( "github.com/livekit/livekit-server/pkg/service" ) +func redisClientDocker(t testing.TB) *redis.Client { + addr := runRedis(t) + cli := redis.NewClient(&redis.Options{ + Addr: addr, + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := cli.Ping(ctx).Err(); err != nil { + _ = cli.Close() + t.Fatal(err) + } + t.Cleanup(func() { + _ = cli.Close() + }) + return cli +} + func redisClient(t testing.TB) *redis.Client { cli := redis.NewClient(&redis.Options{ Addr: "localhost:6379", @@ -42,21 +60,7 @@ func redisClient(t testing.TB) *redis.Client { t.Logf("local redis not available: %v", err) t.Logf("starting redis in docker") - addr := runRedis(t) - cli = redis.NewClient(&redis.Options{ - Addr: addr, - }) - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - if err = cli.Ping(ctx).Err(); err != nil { - _ = cli.Close() - t.Fatal(err) - } - t.Cleanup(func() { - _ = cli.Close() - }) - return cli + return redisClientDocker(t) } func TestIsValidDomain(t *testing.T) {