diff --git a/cmd/server/commands.go b/cmd/server/commands.go index 59dbe8a57..9a15fe54e 100644 --- a/cmd/server/commands.go +++ b/cmd/server/commands.go @@ -16,7 +16,7 @@ import ( "github.com/livekit/livekit-server/pkg/service" ) -func generateKeys(c *cli.Context) error { +func generateKeys(_ *cli.Context) error { apiKey := utils.NewGuid(utils.APIKeyPrefix) secret := utils.RandomSecret() fmt.Println("API Key: ", apiKey) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 223b08e0e..cd06d2796 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -3,12 +3,13 @@ package serverlogger import ( "github.com/go-logr/logr" "github.com/go-logr/zapr" - "github.com/livekit/livekit-server/pkg/sfu" - "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/protocol/logger" "github.com/pion/logging" "go.uber.org/zap" "go.uber.org/zap/zapcore" + + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) var ( @@ -23,10 +24,6 @@ func LoggerFactory() logging.LoggerFactory { return defaultFactory } -func SetLoggerFactory(lf logging.LoggerFactory) { - defaultFactory = lf -} - // Note: only pass in logr.Logger with default depth func SetLogger(l logr.Logger) { logger.SetLogger(l, "livekit") diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index b7b0c83da..34354b8b0 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -4,11 +4,9 @@ import ( "context" "github.com/go-redis/redis/v8" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "google.golang.org/protobuf/proto" - - "github.com/livekit/livekit-server/pkg/config" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -60,6 +58,12 @@ type Router interface { Start() error Drain() Stop() + + // OnNewParticipantRTC is called to start a new participant's RTC connection + OnNewParticipantRTC(callback NewParticipantCallback) + + // OnRTCMessage is called to execute actions on the RTC node + OnRTCMessage(callback RTCMessageCallback) } type MessageRouter interface { @@ -70,15 +74,9 @@ type MessageRouter interface { WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error WriteNodeRTC(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error - - // OnNewParticipantRTC is called to start a new participant's RTC connection - OnNewParticipantRTC(callback NewParticipantCallback) - - // OnRTCMessage is called to execute actions on the RTC node - OnRTCMessage(callback RTCMessageCallback) } -func CreateRouter(conf *config.Config, rc *redis.Client, node LocalNode) Router { +func CreateRouter(rc *redis.Client, node LocalNode) Router { if rc != nil { return NewRedisRouter(node, rc) } diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 12d316c3a..1c8452c92 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -5,7 +5,7 @@ import ( "sync" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "google.golang.org/protobuf/proto" @@ -35,18 +35,18 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter { } } -func (r *LocalRouter) GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) { +func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ string) (*livekit.Node, error) { r.lock.Lock() defer r.lock.Unlock() node := proto.Clone((*livekit.Node)(r.currentNode)).(*livekit.Node) return node, nil } -func (r *LocalRouter) SetNodeForRoom(ctx context.Context, roomName, nodeId string) error { +func (r *LocalRouter) SetNodeForRoom(_ context.Context, _, _ string) error { return nil } -func (r *LocalRouter) ClearRoomState(ctx context.Context, roomName string) error { +func (r *LocalRouter) ClearRoomState(_ context.Context, _ string) error { // do nothing return nil } @@ -63,8 +63,8 @@ func (r *LocalRouter) RemoveDeadNodes() error { return nil } -func (r *LocalRouter) GetNode(nodeId string) (*livekit.Node, error) { - if nodeId == r.currentNode.Id { +func (r *LocalRouter) GetNode(nodeID string) (*livekit.Node, error) { + if nodeID == r.currentNode.Id { return r.currentNode, nil } return nil, ErrNotFound @@ -76,7 +76,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) { }, nil } -func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) { +func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionID string, reqSink MessageSink, resSource MessageSource, err error) { // treat it as a new participant connecting if r.onNewParticipant == nil { err = ErrHandlerNotDefined @@ -110,7 +110,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName strin return pi.Identity, reqChan, resChan, nil } -func (r *LocalRouter) WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { +func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { if r.rtcMessageChan.IsClosed() { // create a new one r.rtcMessageChan = NewMessageChannel() @@ -124,7 +124,7 @@ func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName, identity strin return r.WriteNodeRTC(ctx, r.currentNode.Id, msg) } -func (r *LocalRouter) WriteNodeRTC(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error { +func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error { if r.rtcMessageChan.IsClosed() { // create a new one r.rtcMessageChan = NewMessageChannel() diff --git a/pkg/routing/messagechannel_test.go b/pkg/routing/messagechannel_test.go index c601b2c09..22d8c9f2e 100644 --- a/pkg/routing/messagechannel_test.go +++ b/pkg/routing/messagechannel_test.go @@ -4,7 +4,7 @@ import ( "sync" "testing" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/livekit-server/pkg/routing" ) @@ -25,12 +25,12 @@ func TestMessageChannel_WriteMessageClosed(t *testing.T) { go func() { defer wg.Done() for i := 0; i < 100; i++ { - m.WriteMessage(&livekit.RTCNodeMessage{}) + _ = m.WriteMessage(&livekit.RTCNodeMessage{}) } }() - m.WriteMessage(&livekit.RTCNodeMessage{}) + _ = m.WriteMessage(&livekit.RTCNodeMessage{}) m.Close() - m.WriteMessage(&livekit.RTCNodeMessage{}) + _ = m.WriteMessage(&livekit.RTCNodeMessage{}) wg.Wait() } diff --git a/pkg/routing/node.go b/pkg/routing/node.go index 844c1666e..e39f5ac8c 100644 --- a/pkg/routing/node.go +++ b/pkg/routing/node.go @@ -4,7 +4,7 @@ import ( "runtime" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" diff --git a/pkg/routing/redis.go b/pkg/routing/redis.go index 0367fdc8a..a1c36ab62 100644 --- a/pkg/routing/redis.go +++ b/pkg/routing/redis.go @@ -4,7 +4,7 @@ import ( "context" "github.com/go-redis/redis/v8" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "google.golang.org/protobuf/proto" ) @@ -61,7 +61,7 @@ func publishRTCMessage(rc *redis.Client, nodeId string, participantKey string, m return err } - //logger.Debugw("publishing to rtc", "rtcChannel", rtcNodeChannel(nodeId), + // logger.Debugw("publishing to rtc", "rtcChannel", rtcNodeChannel(nodeId), // "message", rm.Message) return rc.Publish(redisCtx, rtcNodeChannel(nodeId), data).Err() } @@ -87,7 +87,7 @@ func publishSignalMessage(rc *redis.Client, nodeId string, connectionId string, return err } - //logger.Debugw("publishing to signal", "signalChannel", signalNodeChannel(nodeId), + // logger.Debugw("publishing to signal", "signalChannel", signalNodeChannel(nodeId), // "message", rm.Message) return rc.Publish(redisCtx, signalNodeChannel(nodeId), data).Err() } @@ -155,7 +155,7 @@ func (s *SignalNodeSink) Close() { if !s.isClosed.TrySet(true) { return } - publishSignalMessage(s.rc, s.nodeId, s.connectionId, &livekit.EndSession{}) + _ = publishSignalMessage(s.rc, s.nodeId, s.connectionId, &livekit.EndSession{}) if s.onClose != nil { s.onClose() } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 8be8258e4..ad601841f 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -5,7 +5,7 @@ import ( "time" "github.com/go-redis/redis/v8" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/pkg/errors" @@ -75,30 +75,30 @@ func (r *RedisRouter) RemoveDeadNodes() error { return nil } -func (r *RedisRouter) GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) { - nodeId, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result() +func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName string) (*livekit.Node, error) { + nodeID, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result() if err == redis.Nil { return nil, ErrNotFound } else if err != nil { return nil, errors.Wrap(err, "could not get node for room") } - return r.GetNode(nodeId) + return r.GetNode(nodeID) } -func (r *RedisRouter) SetNodeForRoom(ctx context.Context, roomName, nodeId string) error { - return r.rc.HSet(r.ctx, NodeRoomKey, roomName, nodeId).Err() +func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName, nodeID string) error { + return r.rc.HSet(r.ctx, NodeRoomKey, roomName, nodeID).Err() } -func (r *RedisRouter) ClearRoomState(ctx context.Context, roomName string) error { +func (r *RedisRouter) ClearRoomState(_ context.Context, roomName string) error { if err := r.rc.HDel(r.ctx, NodeRoomKey, roomName).Err(); err != nil { return errors.Wrap(err, "could not clear room state") } return nil } -func (r *RedisRouter) GetNode(nodeId string) (*livekit.Node, error) { - data, err := r.rc.HGet(r.ctx, NodesKey, nodeId).Result() +func (r *RedisRouter) GetNode(nodeID string) (*livekit.Node, error) { + data, err := r.rc.HGet(r.ctx, NodesKey, nodeID).Result() if err == redis.Nil { return nil, ErrNotFound } else if err != nil { @@ -128,7 +128,7 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) { } // StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue -func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) { +func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionID string, reqSink MessageSink, resSource MessageSource, err error) { // find the node where the room is hosted at rtcNode, err := r.GetNodeForRoom(ctx, roomName) if err != nil { @@ -136,11 +136,11 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin } // create a new connection id - connectionId = utils.NewGuid("CO_") + connectionID = utils.NewGuid("CO_") pKey := participantKey(roomName, pi.Identity) // map signal & rtc nodes - if err = r.setParticipantSignalNode(connectionId, r.currentNode.Id); err != nil { + if err = r.setParticipantSignalNode(connectionID, r.currentNode.Id); err != nil { return } @@ -152,7 +152,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin Identity: pi.Identity, Metadata: pi.Metadata, // connection id is to allow the RTC node to identify where to route the message back to - ConnectionId: connectionId, + ConnectionId: connectionID, Reconnect: pi.Reconnect, Permission: pi.Permission, AutoSubscribe: pi.AutoSubscribe, @@ -165,11 +165,11 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin } // index by connectionId, since there may be multiple connections for the participant - resChan := r.getOrCreateMessageChannel(r.responseChannels, connectionId) - return connectionId, sink, resChan, nil + resChan := r.getOrCreateMessageChannel(r.responseChannels, connectionID) + return connectionID, sink, resChan, nil } -func (r *RedisRouter) WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { +func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { pkey := participantKey(roomName, identity) rtcNode, err := r.getParticipantRTCNode(pkey) if err != nil { @@ -190,7 +190,7 @@ func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName, identity strin return r.WriteNodeRTC(ctx, node.Id, msg) } -func (r *RedisRouter) WriteNodeRTC(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { +func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { rtcSink := NewRTCNodeSink(r.rc, rtcNodeID, msg.ParticipantKey) return r.writeRTCMessage(rtcSink, msg) } @@ -280,7 +280,9 @@ func (r *RedisRouter) Start() error { func (r *RedisRouter) Drain() { r.currentNode.State = livekit.NodeState_SHUTTING_DOWN - r.RegisterNode() + if err := r.RegisterNode(); err != nil { + logger.Errorw("failed to mark as draining", err, "nodeID", r.currentNode.Id) + } } func (r *RedisRouter) Stop() { diff --git a/pkg/routing/selector/interfaces.go b/pkg/routing/selector/interfaces.go index ee06e9e1d..6b97d51b5 100644 --- a/pkg/routing/selector/interfaces.go +++ b/pkg/routing/selector/interfaces.go @@ -3,7 +3,7 @@ package selector import ( "errors" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/livekit-server/pkg/config" ) diff --git a/pkg/routing/selector/random.go b/pkg/routing/selector/random.go index 179c1f7ed..5f5b02826 100644 --- a/pkg/routing/selector/random.go +++ b/pkg/routing/selector/random.go @@ -1,7 +1,7 @@ package selector import ( - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/thoas/go-funk" ) diff --git a/pkg/routing/selector/regionaware.go b/pkg/routing/selector/regionaware.go index 9c148e90d..3ab3c6ca3 100644 --- a/pkg/routing/selector/regionaware.go +++ b/pkg/routing/selector/regionaware.go @@ -3,7 +3,7 @@ package selector import ( "math" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/thoas/go-funk" "github.com/livekit/livekit-server/pkg/config" diff --git a/pkg/routing/selector/regionaware_test.go b/pkg/routing/selector/regionaware_test.go index b176264a2..ea6055a2a 100644 --- a/pkg/routing/selector/regionaware_test.go +++ b/pkg/routing/selector/regionaware_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/stretchr/testify/require" diff --git a/pkg/routing/selector/sysload.go b/pkg/routing/selector/sysload.go index 40f1e27c6..91fcfc6d4 100644 --- a/pkg/routing/selector/sysload.go +++ b/pkg/routing/selector/sysload.go @@ -1,7 +1,7 @@ package selector import ( - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/thoas/go-funk" ) diff --git a/pkg/routing/selector/sysload_test.go b/pkg/routing/selector/sysload_test.go index 916494b80..9ff2e7bd3 100644 --- a/pkg/routing/selector/sysload_test.go +++ b/pkg/routing/selector/sysload_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/routing/selector" @@ -31,22 +31,22 @@ var ( ) func TestSystemLoadSelector_SelectNode(t *testing.T) { - selector := selector.SystemLoadSelector{SysloadLimit: 1.0} + sel := selector.SystemLoadSelector{SysloadLimit: 1.0} - nodes := []*livekit.Node{} - _, err := selector.SelectNode(nodes) + var nodes []*livekit.Node + _, err := sel.SelectNode(nodes) require.Error(t, err, "should error no available nodes") // Select a node with high load when no nodes with low load are available nodes = []*livekit.Node{nodeLoadHigh} - if _, err := selector.SelectNode(nodes); err != nil { + if _, err := sel.SelectNode(nodes); err != nil { t.Error(err) } // Select a node with low load when available nodes = []*livekit.Node{nodeLoadLow, nodeLoadHigh} for i := 0; i < 5; i++ { - node, err := selector.SelectNode(nodes) + node, err := sel.SelectNode(nodes) if err != nil { t.Error(err) } diff --git a/pkg/routing/selector/utils.go b/pkg/routing/selector/utils.go index aab4963c2..cd0f4476c 100644 --- a/pkg/routing/selector/utils.go +++ b/pkg/routing/selector/utils.go @@ -3,9 +3,10 @@ package selector import ( "time" - "github.com/livekit/livekit-server/pkg/config" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/thoas/go-funk" + + "github.com/livekit/livekit-server/pkg/config" ) const AvailableSeconds = 5 diff --git a/pkg/routing/selector/utils_test.go b/pkg/routing/selector/utils_test.go index f93d006ab..305ab22fa 100644 --- a/pkg/routing/selector/utils_test.go +++ b/pkg/routing/selector/utils_test.go @@ -4,9 +4,10 @@ import ( "testing" "time" - "github.com/livekit/livekit-server/pkg/routing/selector" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/routing/selector" ) func TestIsAvailable(t *testing.T) { diff --git a/pkg/rtc/audiolevel_test.go b/pkg/rtc/audiolevel_test.go index fefc9d40a..d9eb8cd09 100644 --- a/pkg/rtc/audiolevel_test.go +++ b/pkg/rtc/audiolevel_test.go @@ -3,8 +3,9 @@ package rtc_test import ( "testing" - "github.com/livekit/livekit-server/pkg/rtc" "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/rtc" ) const ( diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index 8c5eaae9e..ca0b3b3d8 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -1,7 +1,7 @@ package rtc_test import ( - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/rtc/types" diff --git a/pkg/rtc/mediaengine.go b/pkg/rtc/mediaengine.go index 93d39650e..4f9aa0b6a 100644 --- a/pkg/rtc/mediaengine.go +++ b/pkg/rtc/mediaengine.go @@ -3,7 +3,7 @@ package rtc import ( "strings" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" ) diff --git a/pkg/rtc/mediaengine_test.go b/pkg/rtc/mediaengine_test.go index 6ae3b73ce..183f6f799 100644 --- a/pkg/rtc/mediaengine_test.go +++ b/pkg/rtc/mediaengine_test.go @@ -3,7 +3,7 @@ package rtc import ( "testing" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" ) diff --git a/pkg/rtc/mediatrack_test.go b/pkg/rtc/mediatrack_test.go index 46401ab10..20b9cdcb1 100644 --- a/pkg/rtc/mediatrack_test.go +++ b/pkg/rtc/mediatrack_test.go @@ -21,9 +21,8 @@ func TestTrackInfo(t *testing.T) { Muted: true, } - ti2 := ti mt := NewMediaTrack(&webrtc.TrackRemote{}, MediaTrackParams{ - TrackInfo: &ti2, + TrackInfo: &ti, }) outInfo := mt.ToProto() require.Equal(t, ti.Muted, outInfo.Muted) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7db5cb950..3640e4345 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -17,12 +17,11 @@ import ( "github.com/pkg/errors" "google.golang.org/protobuf/proto" - "github.com/livekit/livekit-server/pkg/sfu" - "github.com/livekit/livekit-server/pkg/sfu/twcc" - "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/version" @@ -289,7 +288,7 @@ func (p *ParticipantImpl) OnClose(callback func(types.Participant)) { func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer webrtc.SessionDescription, err error) { p.params.Logger.Debugw("answering pub offer", "state", p.State().String(), - //"sdp", sdp.SDP, + // "sdp", sdp.SDP, ) if err = p.publisher.SetRemoteDescription(sdp); err != nil { diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 568f12f2e..3131c1415 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" @@ -92,8 +92,8 @@ func TestTrackPublishing(t *testing.T) { t.Run("sends back trackPublished event", func(t *testing.T) { p := newParticipantForTest("test") - //track := &typesfakes.FakePublishedTrack{} - //track.IDReturns("id") + // track := &typesfakes.FakePublishedTrack{} + // track.IDReturns("id") sink := p.params.Sink.(*routingfakes.FakeMessageSink) p.AddTrack(&livekit.AddTrackRequest{ Cid: "cid", @@ -115,8 +115,8 @@ func TestTrackPublishing(t *testing.T) { t.Run("should not allow adding of duplicate tracks", func(t *testing.T) { p := newParticipantForTest("test") - //track := &typesfakes.FakePublishedTrack{} - //track.IDReturns("id") + // track := &typesfakes.FakePublishedTrack{} + // track.IDReturns("id") sink := p.params.Sink.(*routingfakes.FakeMessageSink) p.AddTrack(&livekit.AddTrackRequest{ Cid: "cid", diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index b474e1d3b..4c016d1ca 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/config" diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index a3bcc937f..bf76061b7 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -1,9 +1,10 @@ package rtc import ( - "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + + "github.com/livekit/livekit-server/pkg/rtc/types" ) func HandleParticipantSignal(room types.Room, participant types.Participant, req *livekit.SignalRequest, pLogger logger.Logger) error { diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index e56b7fec9..763945ffe 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -5,11 +5,12 @@ import ( "time" "github.com/bep/debounce" - "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/pion/webrtc/v3" + + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu" ) const ( diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index 4ee73507e..e30c3b188 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" diff --git a/pkg/service/auth_test.go b/pkg/service/auth_test.go index cadce07a5..67f8d0686 100644 --- a/pkg/service/auth_test.go +++ b/pkg/service/auth_test.go @@ -5,10 +5,11 @@ import ( "net/http/httptest" "testing" - "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/auth/authfakes" "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/service" ) func TestAuthMiddleware(t *testing.T) { diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index ebb3df150..b4b3f99cc 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -4,7 +4,7 @@ import ( "context" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate diff --git a/pkg/service/localroomstore.go b/pkg/service/localroomstore.go index fd35e379e..b7c96b20e 100644 --- a/pkg/service/localroomstore.go +++ b/pkg/service/localroomstore.go @@ -5,7 +5,7 @@ import ( "sync" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" ) // encapsulates CRUD operations for room settings @@ -26,7 +26,7 @@ func NewLocalRoomStore() *LocalRoomStore { } } -func (p *LocalRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) error { +func (p *LocalRoomStore) StoreRoom(_ context.Context, room *livekit.Room) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } @@ -36,7 +36,7 @@ func (p *LocalRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) erro return nil } -func (p *LocalRoomStore) LoadRoom(ctx context.Context, name string) (*livekit.Room, error) { +func (p *LocalRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -47,7 +47,7 @@ func (p *LocalRoomStore) LoadRoom(ctx context.Context, name string) (*livekit.Ro return room, nil } -func (p *LocalRoomStore) ListRooms(ctx context.Context) ([]*livekit.Room, error) { +func (p *LocalRoomStore) ListRooms(_ context.Context) ([]*livekit.Room, error) { p.lock.RLock() defer p.lock.RUnlock() rooms := make([]*livekit.Room, 0, len(p.rooms)) @@ -73,18 +73,18 @@ func (p *LocalRoomStore) DeleteRoom(ctx context.Context, name string) error { return nil } -func (p *LocalRoomStore) LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) { +func (p *LocalRoomStore) LockRoom(_ context.Context, _ string, _ time.Duration) (string, error) { // local rooms lock & unlock globally p.globalLock.Lock() return "", nil } -func (p *LocalRoomStore) UnlockRoom(ctx context.Context, name string, uid string) error { +func (p *LocalRoomStore) UnlockRoom(_ context.Context, _ string, _ string) error { p.globalLock.Unlock() return nil } -func (p *LocalRoomStore) StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error { +func (p *LocalRoomStore) StoreParticipant(_ context.Context, roomName string, participant *livekit.ParticipantInfo) error { p.lock.Lock() defer p.lock.Unlock() roomParticipants := p.participants[roomName] @@ -96,7 +96,7 @@ func (p *LocalRoomStore) StoreParticipant(ctx context.Context, roomName string, return nil } -func (p *LocalRoomStore) LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *LocalRoomStore) LoadParticipant(_ context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -111,7 +111,7 @@ func (p *LocalRoomStore) LoadParticipant(ctx context.Context, roomName, identity return participant, nil } -func (p *LocalRoomStore) ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { +func (p *LocalRoomStore) ListParticipants(_ context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -129,7 +129,7 @@ func (p *LocalRoomStore) ListParticipants(ctx context.Context, roomName string) return items, nil } -func (p *LocalRoomStore) DeleteParticipant(ctx context.Context, roomName, identity string) error { +func (p *LocalRoomStore) DeleteParticipant(_ context.Context, roomName, identity string) error { p.lock.Lock() defer p.lock.Unlock() diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go index 7712098bf..ab84a37e3 100644 --- a/pkg/service/recordingservice.go +++ b/pkg/service/recordingservice.go @@ -4,7 +4,7 @@ import ( "context" "errors" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/recording" "github.com/livekit/protocol/utils" @@ -46,14 +46,14 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star return nil, errors.New("recording not configured (redis required)") } - // reserve a recorde - recordingId, err := recording.ReserveRecorder(s.bus) + // reserve a recorder + recordingID, err := recording.ReserveRecorder(s.bus) if err != nil { return nil, err } // start the recording - err = recording.RPC(ctx, s.bus, recordingId, &livekit.RecordingRequest{ + err = recording.RPC(ctx, s.bus, recordingID, &livekit.RecordingRequest{ Request: &livekit.RecordingRequest_Start{ Start: req, }, @@ -63,16 +63,16 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star } ri := &livekit.RecordingInfo{ - Id: recordingId, + Id: recordingID, Active: true, } if template := req.Input.(*livekit.StartRecordingRequest_Template); template != nil { ri.RoomName = template.Template.RoomName } - logger.Debugw("recording started", "recordingID", recordingId) + logger.Debugw("recording started", "recordingID", recordingID) s.telemetry.RecordingStarted(ctx, ri) - return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil + return &livekit.StartRecordingResponse{RecordingId: recordingID}, nil } func (s *RecordingService) AddOutput(ctx context.Context, req *livekit.AddOutputRequest) (*emptypb.Empty, error) { diff --git a/pkg/service/redisroomstore.go b/pkg/service/redisroomstore.go index 49123b976..904ca39c3 100644 --- a/pkg/service/redisroomstore.go +++ b/pkg/service/redisroomstore.go @@ -5,7 +5,7 @@ import ( "time" "github.com/go-redis/redis/v8" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/pkg/errors" "google.golang.org/protobuf/proto" @@ -35,7 +35,7 @@ func NewRedisRoomStore(rc *redis.Client) *RedisRoomStore { } } -func (p *RedisRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) error { +func (p *RedisRoomStore) StoreRoom(_ context.Context, room *livekit.Room) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } @@ -54,7 +54,7 @@ func (p *RedisRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) erro return nil } -func (p *RedisRoomStore) LoadRoom(ctx context.Context, name string) (*livekit.Room, error) { +func (p *RedisRoomStore) LoadRoom(_ context.Context, name string) (*livekit.Room, error) { data, err := p.rc.HGet(p.ctx, RoomsKey, name).Result() if err != nil { if err == redis.Nil { @@ -72,7 +72,7 @@ func (p *RedisRoomStore) LoadRoom(ctx context.Context, name string) (*livekit.Ro return &room, nil } -func (p *RedisRoomStore) ListRooms(ctx context.Context) ([]*livekit.Room, error) { +func (p *RedisRoomStore) ListRooms(_ context.Context) ([]*livekit.Room, error) { items, err := p.rc.HVals(p.ctx, RoomsKey).Result() if err != nil && err != redis.Nil { return nil, errors.Wrap(err, "could not get rooms") @@ -105,7 +105,7 @@ func (p *RedisRoomStore) DeleteRoom(ctx context.Context, name string) error { return err } -func (p *RedisRoomStore) LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) { +func (p *RedisRoomStore) LockRoom(_ context.Context, name string, duration time.Duration) (string, error) { token := utils.NewGuid("LOCK") key := RoomLockPrefix + name @@ -130,7 +130,7 @@ func (p *RedisRoomStore) LockRoom(ctx context.Context, name string, duration tim return "", ErrRoomLockFailed } -func (p *RedisRoomStore) UnlockRoom(ctx context.Context, name string, uid string) error { +func (p *RedisRoomStore) UnlockRoom(_ context.Context, name string, uid string) error { key := RoomLockPrefix + name val, err := p.rc.Get(p.ctx, key).Result() @@ -147,7 +147,7 @@ func (p *RedisRoomStore) UnlockRoom(ctx context.Context, name string, uid string return p.rc.Del(p.ctx, key).Err() } -func (p *RedisRoomStore) StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error { +func (p *RedisRoomStore) StoreParticipant(_ context.Context, roomName string, participant *livekit.ParticipantInfo) error { key := RoomParticipantsPrefix + roomName data, err := proto.Marshal(participant) @@ -158,7 +158,7 @@ func (p *RedisRoomStore) StoreParticipant(ctx context.Context, roomName string, return p.rc.HSet(p.ctx, key, participant.Identity, data).Err() } -func (p *RedisRoomStore) LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *RedisRoomStore) LoadParticipant(_ context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { key := RoomParticipantsPrefix + roomName data, err := p.rc.HGet(p.ctx, key, identity).Result() if err == redis.Nil { @@ -174,7 +174,7 @@ func (p *RedisRoomStore) LoadParticipant(ctx context.Context, roomName, identity return &pi, nil } -func (p *RedisRoomStore) ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { +func (p *RedisRoomStore) ListParticipants(_ context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { key := RoomParticipantsPrefix + roomName items, err := p.rc.HVals(p.ctx, key).Result() if err == redis.Nil { @@ -194,7 +194,7 @@ func (p *RedisRoomStore) ListParticipants(ctx context.Context, roomName string) return participants, nil } -func (p *RedisRoomStore) DeleteParticipant(ctx context.Context, roomName, identity string) error { +func (p *RedisRoomStore) DeleteParticipant(_ context.Context, roomName, identity string) error { key := RoomParticipantsPrefix + roomName return p.rc.HDel(p.ctx, key, identity).Err() diff --git a/pkg/service/redisroomstore_test.go b/pkg/service/redisroomstore_test.go index 71d974775..6f6c08962 100644 --- a/pkg/service/redisroomstore_test.go +++ b/pkg/service/redisroomstore_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/service" @@ -18,7 +18,7 @@ func TestParticipantPersistence(t *testing.T) { rs := service.NewRedisRoomStore(redisClient()) roomName := "room1" - rs.DeleteRoom(ctx, roomName) + _ = rs.DeleteRoom(ctx, roomName) p := &livekit.ParticipantInfo{ Sid: "PA_test", @@ -93,7 +93,7 @@ func TestRoomLock(t *testing.T) { // release after 2 ms time.Sleep(2 * time.Millisecond) atomic.StoreUint32(&unlocked, 1) - rs.UnlockRoom(ctx, roomName, token) + _ = rs.UnlockRoom(ctx, roomName, token) wg.Wait() }) @@ -106,6 +106,6 @@ func TestRoomLock(t *testing.T) { time.Sleep(lockInterval + time.Millisecond) token2, err := rs.LockRoom(ctx, roomName, lockInterval) require.NoError(t, err) - rs.UnlockRoom(ctx, roomName, token2) + _ = rs.UnlockRoom(ctx, roomName, token2) }) } diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 93c2f452f..670cc9d22 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -4,7 +4,7 @@ import ( "context" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" @@ -86,8 +86,8 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre } // select a new node - nodeId := req.NodeId - if nodeId == "" { + nodeID := req.NodeId + if nodeID == "" { nodes, err := r.router.ListNodes() if err != nil { return nil, err @@ -98,11 +98,11 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre return nil, err } - nodeId = node.Id + nodeID = node.Id } - logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeId) - err = r.router.SetNodeForRoom(ctx, rm.Name, nodeId) + logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeID) + err = r.router.SetNodeForRoom(ctx, rm.Name, nodeID) if err != nil { return nil, err } diff --git a/pkg/service/roomallocator_test.go b/pkg/service/roomallocator_test.go index d851030f2..a7a28955c 100644 --- a/pkg/service/roomallocator_test.go +++ b/pkg/service/roomallocator_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/config" diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 55be6e73a..3992ed61a 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -66,7 +66,7 @@ func NewLocalRoomManager( return r, nil } -func (r *RoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room { +func (r *RoomManager) GetRoom(_ context.Context, roomName string) *rtc.Room { r.lock.RLock() defer r.lock.RUnlock() return r.rooms[roomName] @@ -101,7 +101,7 @@ func (r *RoomManager) DeleteRoom(ctx context.Context, roomName string) error { return err } -// CleanupRooms cleans up after old rooms that have been around for awhile +// CleanupRooms cleans up after old rooms that have been around for a while func (r *RoomManager) CleanupRooms() error { // cleanup rooms that have been left for over a day ctx := context.Background() @@ -381,7 +381,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici } // handles RTC messages resulted from Room API calls -func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) { +func (r *RoomManager) handleRTCMessage(_ context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index f0898b983..15d37257a 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -3,7 +3,7 @@ package service import ( "context" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/pkg/errors" "github.com/thoas/go-funk" "github.com/twitchtv/twirp" @@ -40,7 +40,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return } -func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (res *livekit.ListRoomsResponse, err error) { +func (s *RoomService) ListRooms(ctx context.Context, _ *livekit.ListRoomsRequest) (res *livekit.ListRoomsResponse, err error) { err = EnsureListPermission(ctx) if err != nil { return nil, twirpAuthError(err) diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index 4ec3e9770..3c035dc90 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -4,12 +4,13 @@ import ( "context" "testing" + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" + "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/routing/routingfakes" "github.com/livekit/livekit-server/pkg/service" "github.com/livekit/livekit-server/pkg/service/servicefakes" - "github.com/livekit/protocol/auth" - livekit "github.com/livekit/protocol/livekit" - "github.com/stretchr/testify/require" ) const grantsKey = "grants" diff --git a/pkg/service/utils.go b/pkg/service/utils.go index cb83508ac..c7c4c0475 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -5,7 +5,7 @@ import ( "regexp" "github.com/livekit/protocol/auth" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" ) diff --git a/pkg/service/utils_test.go b/pkg/service/utils_test.go index c664540b9..20114296e 100644 --- a/pkg/service/utils_test.go +++ b/pkg/service/utils_test.go @@ -4,8 +4,9 @@ import ( "testing" "github.com/go-redis/redis/v8" - "github.com/livekit/livekit-server/pkg/service" "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/service" ) func redisClient() *redis.Client { diff --git a/pkg/service/wire.go b/pkg/service/wire.go index df3827ba1..11ebce294 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -10,9 +10,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/wire" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/pkg/errors" - "github.com/livekit/protocol/auth" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 29265178c..0570b3e6f 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,8 +1,7 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//go:build !wireinject -// +build !wireinject +//+build !wireinject package service @@ -28,7 +27,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - router := routing.CreateRouter(conf, client, currentNode) + router := routing.CreateRouter(client, currentNode) roomStore := createStore(client) roomAllocator, err := NewRoomAllocator(conf, router, roomStore) if err != nil { @@ -72,7 +71,7 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi if err != nil { return nil, err } - router := routing.CreateRouter(conf, client, currentNode) + router := routing.CreateRouter(client, currentNode) return router, nil } diff --git a/pkg/service/wsprotocol.go b/pkg/service/wsprotocol.go index 65e536364..d67d775c8 100644 --- a/pkg/service/wsprotocol.go +++ b/pkg/service/wsprotocol.go @@ -5,7 +5,7 @@ import ( "time" "github.com/gorilla/websocket" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" diff --git a/pkg/sfu/atomic.go b/pkg/sfu/atomic.go index 058a5d86e..c02acd3ff 100644 --- a/pkg/sfu/atomic.go +++ b/pkg/sfu/atomic.go @@ -25,30 +25,6 @@ func (a *atomicUint8) get() uint8 { return uint8(atomic.LoadUint32((*uint32)(a))) } -type atomicUint16 uint32 - -func (a *atomicUint16) set(value uint16) { - atomic.StoreUint32((*uint32)(a), uint32(value)) -} - -func (a *atomicUint16) get() uint16 { - return uint16(atomic.LoadUint32((*uint32)(a))) -} - -func (a *atomicUint16) add(value uint16) { - atomic.AddUint32((*uint32)(a), uint32(value)) -} - -type atomicInt32 int32 - -func (a *atomicInt32) set(value int32) { - atomic.StoreInt32((*int32)(a), value) -} - -func (a *atomicInt32) get() int32 { - return atomic.LoadInt32((*int32)(a)) -} - type atomicUint32 uint32 func (a *atomicUint32) set(value uint32) { diff --git a/pkg/sfu/buffer/bucket_test.go b/pkg/sfu/buffer/bucket_test.go index 9943cb73a..b9079f95a 100644 --- a/pkg/sfu/buffer/bucket_test.go +++ b/pkg/sfu/buffer/bucket_test.go @@ -49,7 +49,7 @@ func Test_queue(t *testing.T) { buf, err := p.Marshal() assert.NoError(t, err) assert.NotPanics(t, func() { - q.AddPacket(buf, p.SequenceNumber, true) + _, _ = q.AddPacket(buf, p.SequenceNumber, true) }) } var expectedSN uint16 @@ -70,7 +70,7 @@ func Test_queue(t *testing.T) { buf, err := np2.Marshal() assert.NoError(t, err) expectedSN = 8 - q.AddPacket(buf, 8, false) + _, _ = q.AddPacket(buf, 8, false) i, err = q.GetPacket(buff, expectedSN) assert.NoError(t, err) err = np.Unmarshal(buff[:i]) @@ -109,7 +109,7 @@ func Test_queue_edges(t *testing.T) { buf, err := p.Marshal() assert.NoError(t, err) assert.NotPanics(t, func() { - q.AddPacket(buf, p.SequenceNumber, true) + _, _ = q.AddPacket(buf, p.SequenceNumber, true) }) }) } @@ -130,7 +130,7 @@ func Test_queue_edges(t *testing.T) { } buf, err := np2.Marshal() assert.NoError(t, err) - q.AddPacket(buf, np2.SequenceNumber, false) + _, _ = q.AddPacket(buf, np2.SequenceNumber, false) i, err = q.GetPacket(buff, expectedSN+1) assert.NoError(t, err) err = np.Unmarshal(buff[:i]) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 2a211f366..ca4eccb89 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -17,13 +17,11 @@ import ( ) const ( - MaxSN = 1 << 16 - ReportDelta = 1e9 ) // Logger is an implementation of logr.Logger. If is not provided - will be turned off. -var Logger logr.Logger = logr.Discard() +var Logger = logr.Discard() type pendingPackets struct { arrivalTime int64 @@ -36,7 +34,7 @@ type ExtPacket struct { Packet rtp.Packet Payload interface{} KeyFrame bool - //audio level for voice, l&0x80 == 0 means audio level not present + // audio level for voice, l&0x80 == 0 means audio level not present AudioLevel uint8 RawPacket []byte } @@ -183,7 +181,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.logger.V(1).Info("NewBuffer", "MaxBitRate", o.MaxBitRate) } -// Write adds a RTP Packet, out of order, new packet may be arrived later +// Write adds an RTP Packet, out of order, new packet may be arrived later func (b *Buffer) Write(pkt []byte) (n int, err error) { b.Lock() defer b.Unlock() @@ -416,7 +414,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) { bitrates = make([]int64, len(b.bitrateHelper)) } for i := 0; i < len(b.bitrateHelper); i++ { - br := (8 * b.bitrateHelper[i] * int64(ReportDelta)) / int64(diff) + br := (8 * b.bitrateHelper[i] * int64(ReportDelta)) / diff bitrates[i] = br b.bitrateHelper[i] = 0 } @@ -681,7 +679,7 @@ func (s *SeqWrapHandler) Unwrap(seq uint16) (uint32, bool) { if delta > 0 && (int32(s.maxSeqNo)+delta-0x10000) >= 0 { // wrap backwards, should not less than 0 in this case: // at start time, received seq 1, set s.maxSeqNo =1 , - // then a out of order seq 65534 coming, we can't unwrap + // then an out of order seq 65534 coming, we can't unwrap // the seq to -2 delta -= 0x10000 } diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index ea23a90a9..4f150ba22 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -6,44 +6,12 @@ import ( "time" "github.com/pion/rtcp" - "github.com/pion/rtp" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/assert" ) -func CreateTestPacket(pktStamp *SequenceNumberAndTimeStamp) *rtp.Packet { - if pktStamp == nil { - return &rtp.Packet{ - Header: rtp.Header{}, - Payload: []byte{1, 2, 3}, - } - } - - return &rtp.Packet{ - Header: rtp.Header{ - SequenceNumber: pktStamp.SequenceNumber, - Timestamp: pktStamp.Timestamp, - }, - Payload: []byte{1, 2, 3}, - } -} - -type SequenceNumberAndTimeStamp struct { - SequenceNumber uint16 - Timestamp uint32 -} - -func CreateTestListPackets(snsAndTSs []SequenceNumberAndTimeStamp) (packetList []*rtp.Packet) { - for _, item := range snsAndTSs { - item := item - packetList = append(packetList, CreateTestPacket(&item)) - } - - return packetList -} - -var vp8Codec webrtc.RTPCodecParameters = webrtc.RTPCodecParameters{ +var vp8Codec = webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{ MimeType: "video/vp8", ClockRate: 90000, @@ -54,7 +22,7 @@ var vp8Codec webrtc.RTPCodecParameters = webrtc.RTPCodecParameters{ PayloadType: 96, } -var opusCodec webrtc.RTPCodecParameters = webrtc.RTPCodecParameters{ +var opusCodec = webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{ MimeType: "audio/opus", ClockRate: 48000, @@ -233,7 +201,7 @@ func TestNewBuffer(t *testing.T) { for _, p := range TestPackets { buf, _ := p.Marshal() - buff.Write(buf) + _, _ = buff.Write(buf) } // assert.Equal(t, 6, buff.PacketQueue.size) assert.Equal(t, uint32(1<<16), buff.seqHdlr.Cycles()) @@ -292,7 +260,7 @@ func TestSeqWrapHandler(t *testing.T) { assert.Equal(t, uint32(1), s.MaxSeqNo()) type caseInfo struct { - seqs []uint32 //{seq1, seq2, unwrap of seq2} + seqs []uint32 // {seq1, seq2, unwrap of seq2} newer bool // seq2 is newer than seq1 } // test normal case, name -> {seq1, seq2, unwrap of seq2} diff --git a/pkg/sfu/buffer/helpers.go b/pkg/sfu/buffer/helpers.go index 905b765a7..5cdd1cc13 100644 --- a/pkg/sfu/buffer/helpers.go +++ b/pkg/sfu/buffer/helpers.go @@ -70,7 +70,7 @@ type VP8 struct { } // Unmarshal parses the passed byte slice and stores the result in the VP8 this method is called upon -func (p *VP8) Unmarshal(payload []byte) error { +func (v *VP8) Unmarshal(payload []byte) error { if payload == nil { return errNilPacket } @@ -82,7 +82,7 @@ func (p *VP8) Unmarshal(payload []byte) error { } idx := 0 - p.FirstByte = payload[idx] + v.FirstByte = payload[idx] S := payload[idx]&0x10 > 0 // Check for extended bit control if payload[idx]&0x80 > 0 { @@ -103,7 +103,7 @@ func (p *VP8) Unmarshal(payload []byte) error { if payloadLen < idx+1 { return errShortPacket } - p.PictureIDPresent = 1 + v.PictureIDPresent = 1 pid := payload[idx] & 0x7f // Check if m is 1, then Picture ID is 15 bits if payload[idx]&0x80 > 0 { @@ -111,10 +111,10 @@ func (p *VP8) Unmarshal(payload []byte) error { if payloadLen < idx+1 { return errShortPacket } - p.MBit = true - p.PictureID = binary.BigEndian.Uint16([]byte{pid, payload[idx]}) + v.MBit = true + v.PictureID = binary.BigEndian.Uint16([]byte{pid, payload[idx]}) } else { - p.PictureID = uint16(pid) + v.PictureID = uint16(pid) } } // Check if TL0PICIDX is present @@ -123,12 +123,12 @@ func (p *VP8) Unmarshal(payload []byte) error { if payloadLen < idx+1 { return errShortPacket } - p.TL0PICIDXPresent = 1 + v.TL0PICIDXPresent = 1 - if int(idx) >= payloadLen { + if idx >= payloadLen { return errShortPacket } - p.TL0PICIDX = payload[idx] + v.TL0PICIDX = payload[idx] } if T || K { idx++ @@ -136,13 +136,13 @@ func (p *VP8) Unmarshal(payload []byte) error { return errShortPacket } if T { - p.TIDPresent = 1 - p.TID = (payload[idx] & 0xc0) >> 6 - p.Y = (payload[idx] & 0x20) >> 5 + v.TIDPresent = 1 + v.TID = (payload[idx] & 0xc0) >> 6 + v.Y = (payload[idx] & 0x20) >> 5 } if K { - p.KEYIDXPresent = 1 - p.KEYIDX = payload[idx] & 0x1f + v.KEYIDXPresent = 1 + v.KEYIDX = payload[idx] & 0x1f } } if idx >= payloadLen { @@ -153,16 +153,16 @@ func (p *VP8) Unmarshal(payload []byte) error { return errShortPacket } // Check is packet is a keyframe by looking at P bit in vp8 payload - p.IsKeyFrame = payload[idx]&0x01 == 0 && S + v.IsKeyFrame = payload[idx]&0x01 == 0 && S } else { idx++ if payloadLen < idx+1 { return errShortPacket } // Check is packet is a keyframe by looking at P bit in vp8 payload - p.IsKeyFrame = payload[idx]&0x01 == 0 && S + v.IsKeyFrame = payload[idx]&0x01 == 0 && S } - p.HeaderSize = idx + v.HeaderSize = idx return nil } @@ -189,16 +189,16 @@ func (v *VP8) MarshalTo(buf []byte) error { } } if v.TL0PICIDXPresent == 1 { - buf[idx] = byte(v.TL0PICIDX) + buf[idx] = v.TL0PICIDX idx++ } if v.TIDPresent == 1 || v.KEYIDXPresent == 1 { buf[idx] = 0 if v.TIDPresent == 1 { - buf[idx] = byte(v.TID<<6) | byte(v.Y<<5) + buf[idx] = v.TID<<6 | v.Y<<5 } if v.KEYIDXPresent == 1 { - buf[idx] |= byte(v.KEYIDX & 0x1f) + buf[idx] |= v.KEYIDX & 0x1f } idx++ } diff --git a/pkg/sfu/buffer/rtcpreader.go b/pkg/sfu/buffer/rtcpreader.go index 7472a2e96..16a58b923 100644 --- a/pkg/sfu/buffer/rtcpreader.go +++ b/pkg/sfu/buffer/rtcpreader.go @@ -8,7 +8,7 @@ import ( type RTCPReader struct { ssrc uint32 closed atomicBool - onPacket atomic.Value //func([]byte) + onPacket atomic.Value // func([]byte) onClose func() } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b3c267acb..59c7d58ed 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -19,7 +19,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" ) -// TrackSender defines a interface send media to remote peer +// TrackSender defines an interface send media to remote peer type TrackSender interface { UptrackLayersChange(availableLayers []uint16) WriteRTP(p *buffer.ExtPacket, layer int32) error @@ -45,7 +45,6 @@ var ( ErrNotVP8 = errors.New("not VP8") ErrOutOfOrderVP8PictureIdCacheMiss = errors.New("out-of-order VP8 picture id not found in cache") ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") - ErrNoRequiredBuff = errors.New("buff size if less than required") ) var ( @@ -156,7 +155,7 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Facto // Bind is called by the PeerConnection after negotiation is complete // This asserts that the code requested is supported by the remote peer. -// If so it setups all the state (SSRC and PayloadType) to have a call +// If so it sets up all the state (SSRC and PayloadType) to have a call func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) { parameters := webrtc.RTPCodecParameters{RTPCodecCapability: d.codec} if codec, err := codecParametersFuzzySearch(parameters, t.CodecParameters()); err == nil { @@ -227,7 +226,7 @@ func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver) { d.transceiver = transceiver } -// WriteRTP writes a RTP Packet to the DownTrack +// WriteRTP writes an RTP Packet to the DownTrack func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { var pool *[]byte defer func() { @@ -420,7 +419,7 @@ func (d *DownTrack) Close() { // Idea here is to send blank 1x1 key frames to flush the decoder buffer at the remote end. // Otherwise, with transceiver re-use last frame from previous stream is held in the // display buffer and there could be a brief moment where the previous stream is displayed. - d.writeBlankFrameRTP() + _ = d.writeBlankFrameRTP() d.closeOnce.Do(func() { Logger.V(1).Info("Closing sender", "peer_id", d.peerID, "kind", d.kind) diff --git a/pkg/sfu/errors.go b/pkg/sfu/errors.go index 891a19a28..22831db4c 100644 --- a/pkg/sfu/errors.go +++ b/pkg/sfu/errors.go @@ -1,8 +1 @@ package sfu - -import "errors" - -var ( - ErrSpatialNotSupported = errors.New("current track does not support simulcast/SVC") - ErrSpatialLayerNotFound = errors.New("the requested layer does not exist") -) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index f4a034254..55298b73d 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -26,13 +26,6 @@ const ( ForwardingStatusOptimal ) -type LayerDirection int - -const ( - LayerDirectionLowToHigh LayerDirection = iota - LayerDirectionHighToLow -) - type VideoStreamingChange int const ( @@ -154,11 +147,6 @@ var ( temporal: -1, } - MinLayers = VideoLayers{ - spatial: 0, - temporal: 0, - } - DefaultMaxLayers = VideoLayers{ spatial: DefaultMaxLayerSpatial, temporal: DefaultMaxLayerTemporal, @@ -539,7 +527,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransitio // 4. If not currently streaming, find the minimum layers that can unpause the stream. // // To summarize, co-operative streaming means - // - Try to keep tracks streaming, i. e. no pauses even if not at optimal layers + // - Try to keep tracks streaming, i.e. no pauses even if not at optimal layers // - Do not make an upgrade as it could affect other tracks // f.lock.Lock() @@ -1097,7 +1085,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in // this will take client subscription as the winning vote and // continue to stream current spatial layer till switch point. // That could lead to congesting the channel. - // LK-TODO: Improve the above case, i. e. distinguish server + // LK-TODO: Improve the above case, i.e. distinguish server // applied restriction from client requested restriction. // tp.shouldDrop = true diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 56a947c01..046f164ae 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -4,12 +4,11 @@ import ( "reflect" "testing" + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/testutils" - - "github.com/pion/webrtc/v3" - - "github.com/stretchr/testify/require" ) func disable(f *Forwarder) { @@ -180,7 +179,7 @@ func TestForwarderAllocate(t *testing.T) { require.Equal(t, expectedResult, result) require.Equal(t, expectedResult, f.lastAllocation) - // awaiting measurement, i. e. bitrates are not available, but layers available + // awaiting measurement, i.e. bitrates are not available, but layers available f.lastAllocation.state = VideoAllocationStateNone disable(f) f.UptrackLayersChange([]uint16{0}) @@ -494,7 +493,7 @@ func TestForwarderFinalizeAllocate(t *testing.T) { bandwidthRequested: 0, bandwidthDelta: 0, availableLayers: nil, - bitrates: [3][4]int64{}, + bitrates: Bitrates{{0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}}, targetLayers: InvalidLayers, distanceToDesired: 0, } @@ -653,7 +652,7 @@ func TestForwarderAllocateNextHigher(t *testing.T) { require.Equal(t, expectedResult, f.lastAllocation) require.False(t, boosted) - // move from (0, 0) -> (0, 1), i. e. a higher temporal layer is available in the same spatial layer + // move from (0, 0) -> (0, 1), i.e. a higher temporal layer is available in the same spatial layer expectedTargetLayers := VideoLayers{ spatial: 0, temporal: 1, @@ -674,7 +673,7 @@ func TestForwarderAllocateNextHigher(t *testing.T) { require.Equal(t, expectedTargetLayers, f.TargetLayers()) require.True(t, boosted) - // move from (0, 1) -> (1, 0), i. e. a higher spatial layer is available + // move from (0, 1) -> (1, 0), i.e. a higher spatial layer is available f.currentLayers.temporal = 1 expectedTargetLayers = VideoLayers{ spatial: 1, @@ -1399,7 +1398,7 @@ func TestForwardGetSnTsForPadding(t *testing.T) { f.currentLayers = InvalidLayers // send it through so that forwarder locks onto stream - f.GetTranslationParams(extPkt, 0) + _, _ = f.GetTranslationParams(extPkt, 0) // pause stream and get padding, it should still work disable(f) @@ -1411,7 +1410,7 @@ func TestForwardGetSnTsForPadding(t *testing.T) { numPadding := 5 clockRate := uint32(0) frameRate := uint32(5) - var sntsExpected []SnTs = make([]SnTs, numPadding) + var sntsExpected = make([]SnTs, numPadding) for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ sequenceNumber: 23333 + uint16(i) + 1, @@ -1467,7 +1466,7 @@ func TestForwardGetSnTsForBlankFrames(t *testing.T) { f.currentLayers = InvalidLayers // send it through so that forwarder locks onto stream - f.GetTranslationParams(extPkt, 0) + _, _ = f.GetTranslationParams(extPkt, 0) // should get back frame end needed as the last packet did not have RTP marker set snts, frameEndNeeded, err := f.GetSnTsForBlankFrames() @@ -1478,7 +1477,7 @@ func TestForwardGetSnTsForBlankFrames(t *testing.T) { numPadding := RTPBlankFramesMax + 1 clockRate := testutils.TestVP8Codec.ClockRate frameRate := uint32(30) - var sntsExpected []SnTs = make([]SnTs, numPadding) + var sntsExpected = make([]SnTs, numPadding) for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ sequenceNumber: 23333 + uint16(i) + 1, @@ -1538,7 +1537,7 @@ func TestForwardGetPaddingVP8(t *testing.T) { f.currentLayers = InvalidLayers // send it through so that forwarder locks onto stream - f.GetTranslationParams(extPkt, 0) + _, _ = f.GetTranslationParams(extPkt, 0) // getting padding with frame end needed, should repeat the last picture id expectedVP8 := buffer.VP8{ diff --git a/pkg/sfu/helpers.go b/pkg/sfu/helpers.go index bc67c1e2b..022eb4baf 100644 --- a/pkg/sfu/helpers.go +++ b/pkg/sfu/helpers.go @@ -4,7 +4,6 @@ import ( "strings" "time" - "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/pion/webrtc/v3" ) @@ -41,23 +40,6 @@ func codecParametersFuzzySearch(needle webrtc.RTPCodecParameters, haystack []web return webrtc.RTPCodecParameters{}, webrtc.ErrCodecNotFound } -func ntpToMillisSinceEpoch(ntp uint64) uint64 { - // ntp time since epoch calculate fractional ntp as milliseconds - // (lower 32 bits stored as 1/2^32 seconds) and add - // ntp seconds (stored in higher 32 bits) as milliseconds - return (((ntp & 0xFFFFFFFF) * 1000) >> 32) + ((ntp >> 32) * 1000) -} - -func fastForwardTimestampAmount(newestTimestamp uint32, referenceTimestamp uint32) uint32 { - if buffer.IsTimestampWrapAround(newestTimestamp, referenceTimestamp) { - return uint32(uint64(newestTimestamp) + 0x100000000 - uint64(referenceTimestamp)) - } - if newestTimestamp < referenceTimestamp { - return 0 - } - return newestTimestamp - referenceTimestamp -} - func (t ntpTime) Duration() time.Duration { sec := (t >> 32) * 1e9 frac := (t & 0xffffffff) * 1e9 diff --git a/pkg/sfu/prober.go b/pkg/sfu/prober.go index 0b9d40a08..59aa8c4a2 100644 --- a/pkg/sfu/prober.go +++ b/pkg/sfu/prober.go @@ -1,7 +1,7 @@ // // Design of Prober // -// Probing is to used to check for existence of excess channel capacity. +// Probing is used to to check for existence of excess channel capacity. // This is especially useful in the downstream direction of SFU. // SFU forwards audio/video streams from one or more publishers to // all the subscribers. But, the downstream channel of a subscriber @@ -75,7 +75,7 @@ // // A further assumption is that if there are multiple publishers for // a subscriber peer connection, all the publishers are not pacing -// in sync, i. e. each publisher's pacer is completely independent +// in sync, i.e. each publisher's pacer is completely independent // and SFU will be receiving the media packets with a good spread and // not clumped together. // @@ -112,7 +112,6 @@ import ( "time" "github.com/gammazero/deque" - "github.com/livekit/protocol/logger" ) @@ -262,7 +261,7 @@ func (p *Prober) run() { } } -//--------------------------------- +// --------------------------------- type Cluster struct { // LK-TODO-START @@ -290,7 +289,7 @@ func NewCluster(desiredRateBps int, expectedRateBps int, minDuration time.Durati expectedBytes := int((int64(expectedRateBps)*minDurationMs/time.Second.Milliseconds() + 7) / 8) // pace based on sending approximately 1000 bytes per probe - numProbes := int((desiredBytes - expectedBytes + 999) / 1000) + numProbes := (desiredBytes - expectedBytes + 999) / 1000 sleepDurationMicroSeconds := int(float64(minDurationMs*1000)/float64(numProbes) + 0.5) c := &Cluster{ desiredBytes: desiredBytes, diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index ad76b03ed..37ba438a8 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -18,7 +18,7 @@ import ( type Bitrates [DefaultMaxLayerSpatial + 1][DefaultMaxLayerTemporal + 1]int64 -// TrackReceiver defines a interface receive media from remote peer +// TrackReceiver defines an interface receive media from remote peer type TrackReceiver interface { TrackID() string StreamID() string @@ -31,7 +31,7 @@ type TrackReceiver interface { Codec() webrtc.RTPCodecCapability } -// Receiver defines a interface for a track receivers +// Receiver defines an interface for a track receivers type Receiver interface { TrackID() string StreamID() string diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index b09f44fd4..f758ccb1b 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -121,7 +121,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara // if padding only packet, can be dropped and sequence number adjusted // as it is contiguous and in order. That means this is the highest - // incoming sequence number and it is a good point to adjust + // incoming sequence number, and it is a good point to adjust // sequence number offset. if len(extPkt.Packet.Payload) == 0 { r.highestIncomingSN = extPkt.Packet.SequenceNumber @@ -134,10 +134,10 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara } // in-order incoming packet, may or may not be contiguous. - // In the case of loss (i. e. incoming sequence number is not contiguous), + // In the case of loss (i.e. incoming sequence number is not contiguous), // forward even if it is a padding only packet. With temporal scalability, // it is unclear if the current packet should be dropped if it is not - // contiguous. Hence forward anything that is not contiguous. + // contiguous. Hence, forward anything that is not contiguous. // Reference: http://www.rtcbits.com/2017/04/howto-implement-temporal-scalability.html mungedSN := extPkt.Packet.SequenceNumber - r.snOffset mungedTS := extPkt.Packet.Timestamp - r.tsOffset diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 9c0366006..648db709f 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -4,9 +4,9 @@ import ( "reflect" "testing" - "github.com/livekit/livekit-server/pkg/sfu/testutils" - "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/sfu/testutils" ) func TestSetLastSnTs(t *testing.T) { @@ -314,7 +314,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { numPadding := 10 clockRate := uint32(10) frameRate := uint32(5) - var sntsExpected []SnTs = make([]SnTs, numPadding) + var sntsExpected = make([]SnTs, numPadding) for i := 0; i < numPadding; i++ { sntsExpected[i] = SnTs{ sequenceNumber: 23333 + uint16(i) + 1, diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index bd58bd959..9896108e0 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -5,9 +5,9 @@ import ( "testing" "time" - "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) func Test_sequencer(t *testing.T) { diff --git a/pkg/sfu/sfu.go b/pkg/sfu/sfu.go index 92f409efa..ca5b3585f 100644 --- a/pkg/sfu/sfu.go +++ b/pkg/sfu/sfu.go @@ -7,7 +7,7 @@ import ( ) // Logger is an implementation of logr.Logger. If is not provided - will be turned off. -var Logger logr.Logger = logr.Discard() +var Logger = logr.Discard() var ( PacketFactory *sync.Pool diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 57ebd8ac8..a53891b4c 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -8,7 +8,6 @@ import ( "time" "github.com/livekit/protocol/logger" - "github.com/pion/rtcp" "github.com/pion/webrtc/v3" ) @@ -18,25 +17,25 @@ const ( EstimateEpsilon = 2000 // 2 kbps - GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom > 1 Mbps, don't probe - GratuitousProbePct = 10 - GratuitousProbeMinBps = 100 * 1000 // 100 kbps - GratuitousProbeMaxBps = 300 * 1000 // 300 kbps - GratuitousProbeMinDurationMs = 500 * time.Millisecond - GratuitousProbeMaxDurationMs = 600 * time.Millisecond + GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom > 1 Mbps, don't probe + GratuitousProbePct = 10 + GratuitousProbeMinBps = 100 * 1000 // 100 kbps + GratuitousProbeMaxBps = 300 * 1000 // 300 kbps + GratuitousProbeMinDuration = 500 * time.Millisecond + GratuitousProbeMaxDuration = 600 * time.Millisecond AudioLossWeight = 0.75 VideoLossWeight = 0.25 // LK-TODO-START // These constants will definitely require more tweaking. - // In fact, simple time tresholded rules most proably will not be enough. + // In fact, simple time threshold rules most probably will not be enough. // LK-TODO-END - EstimateCommitMs = 2 * 1000 * time.Millisecond // 2 seconds - ProbeWaitMs = 8 * 1000 * time.Millisecond // 8 seconds - BoostWaitMs = 5 * 1000 * time.Millisecond // 5 seconds - GratuitousProbeWaitMs = 8 * 1000 * time.Millisecond // 8 seconds - GratuitousProbeMoreWaitMs = 5 * 1000 * time.Millisecond // 5 seconds + EstimateCommit = 2 * 1000 * time.Millisecond // 2 seconds + ProbeWait = 8 * 1000 * time.Millisecond // 8 seconds + BoostWait = 5 * 1000 * time.Millisecond // 5 seconds + GratuitousProbeWait = 8 * 1000 * time.Millisecond // 8 seconds + GratuitousProbeMoreWait = 5 * 1000 * time.Millisecond // 5 seconds ) type State int @@ -200,7 +199,7 @@ func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) { func (s *StreamAllocator) initializeEstimate() { s.committedChannelCapacity = ChannelCapacityInfinity - s.lastCommitTime = time.Now().Add(-EstimateCommitMs) + s.lastCommitTime = time.Now().Add(-EstimateCommit) s.receivedEstimate = ChannelCapacityInfinity s.lastEstimateDecreaseTime = time.Now() @@ -406,7 +405,7 @@ func (s *StreamAllocator) handleSignalEstimate(event *Event) { // // A couple of things to keep in mind // - REMB reports could be sent gratuitously as a way of providing - // periodic feedback, i. e. even if the estimated capacity does not + // periodic feedback, i.e. even if the estimated capacity does not // change, there could be REMB packets on the wire. Those gratuitous // REMBs should not trigger anything bad. // - As each down track will issue this callback for the same REMB packet @@ -594,14 +593,14 @@ func (s *StreamAllocator) maybeCommitEstimate() (isDecreasing bool) { if math.Abs(float64(s.receivedEstimate)-float64(s.prevReceivedEstimate)) > EstimateEpsilon { // too large a change, wait for estimate to settle. // Unless estimate has been oscillating for too long. - if time.Since(s.lastCommitTime) < EstimateCommitMs { + if time.Since(s.lastCommitTime) < EstimateCommit { return } } // don't commit too often even if the change is small. // Small changes will also get picked up during periodic check. - if time.Since(s.lastCommitTime) < EstimateCommitMs { + if time.Since(s.lastCommitTime) < EstimateCommit { return } @@ -664,7 +663,7 @@ func (s *StreamAllocator) allocateTrack(track *Track) { return // LK-TODO-START // Should use the bits given back to start any paused track. - // Note layer downgrade may actually have positive delta (i. e. consume more bits) + // Note layer downgrade may actually have positive delta (i.e. consume more bits) // because of when the measurement is done. Watch for that. // LK-TODO-END } @@ -728,7 +727,7 @@ func (s *StreamAllocator) allocateAllTracks() { // // Goals: - // 1. Stream as many tracks as possible, i. e. no pauses. + // 1. Stream as many tracks as possible, i.e. no pauses. // 2. Try to give fair allocation to all track. // // Start with the lowest layers and give each track a chance at that layer and keep going up. @@ -905,9 +904,9 @@ func (s *StreamAllocator) isTimeToBoost() bool { // Checking against last estimate boost prevents multiple artificial boosts // in situations where multiple tracks become available in a short span. if !s.lastBoostTime.IsZero() { - return time.Since(s.lastBoostTime) > BoostWaitMs + return time.Since(s.lastBoostTime) > BoostWait } else { - return time.Since(s.lastEstimateDecreaseTime) > ProbeWaitMs + return time.Since(s.lastEstimateDecreaseTime) > ProbeWait } } @@ -916,12 +915,12 @@ func (s *StreamAllocator) resetBoost() { } func (s *StreamAllocator) maybeGratuitousProbe() bool { - if time.Since(s.lastEstimateDecreaseTime) < GratuitousProbeWaitMs || len(s.managedVideoTracksSorted) == 0 { + if time.Since(s.lastEstimateDecreaseTime) < GratuitousProbeWait || len(s.managedVideoTracksSorted) == 0 { return false } // don't gratuitously probe too often - if time.Since(s.lastGratuitousProbeTime) < GratuitousProbeMoreWaitMs { + if time.Since(s.lastGratuitousProbeTime) < GratuitousProbeMoreWait { return false } @@ -944,8 +943,8 @@ func (s *StreamAllocator) maybeGratuitousProbe() bool { s.prober.AddCluster( int(s.receivedEstimate+probeRateBps), int(expectedRateBps), - GratuitousProbeMinDurationMs, - GratuitousProbeMaxDurationMs, + GratuitousProbeMinDuration, + GratuitousProbeMaxDuration, ) s.lastGratuitousProbeTime = time.Now() @@ -957,7 +956,7 @@ func (s *StreamAllocator) resetGratuitousProbe() { s.lastGratuitousProbeTime = time.Now() } -//------------------------------------------------ +// ------------------------------------------------ type StreamState int @@ -1001,7 +1000,7 @@ func (s *StreamStateUpdate) Empty() bool { return len(s.StreamStates) == 0 } -//------------------------------------------------ +// ------------------------------------------------ type Track struct { downTrack *DownTrack @@ -1116,7 +1115,7 @@ func (t *Track) DistanceToDesired() int32 { return t.downTrack.DistanceToDesired() } -//------------------------------------------------ +// ------------------------------------------------ type TrackSorter []*Track @@ -1136,7 +1135,7 @@ func (t TrackSorter) Less(i, j int) bool { return t[i].maxLayers.temporal > t[j].maxLayers.temporal } -//------------------------------------------------ +// ------------------------------------------------ type MaxDistanceSorter []*Track @@ -1152,7 +1151,7 @@ func (m MaxDistanceSorter) Less(i, j int) bool { return m[i].DistanceToDesired() > m[j].DistanceToDesired() } -//------------------------------------------------ +// ------------------------------------------------ type MinDistanceSorter []*Track @@ -1168,4 +1167,4 @@ func (m MinDistanceSorter) Less(i, j int) bool { return m[i].DistanceToDesired() < m[j].DistanceToDesired() } -//------------------------------------------------ +// ------------------------------------------------ diff --git a/pkg/sfu/streamtracker_test.go b/pkg/sfu/streamtracker_test.go index d96ff6e7f..eef79edd3 100644 --- a/pkg/sfu/streamtracker_test.go +++ b/pkg/sfu/streamtracker_test.go @@ -4,9 +4,9 @@ import ( "testing" "time" - "github.com/livekit/livekit-server/pkg/testutils" - "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/testutils" ) func TestStreamTracker(t *testing.T) { @@ -44,7 +44,7 @@ func TestStreamTracker(t *testing.T) { }) require.Equal(t, StreamStatusActive, tracker.Status()) - // run a single interation + // run a single iteration tracker.detectChanges() require.Equal(t, StreamStatusStopped, tracker.Status()) require.True(t, callbackCalled.get()) diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index 09ec3e8b7..e9dc81eac 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -1,13 +1,13 @@ package testutils import ( - "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/pion/rtp" "github.com/pion/webrtc/v3" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) -//----------------------------------------------------------- +// ----------------------------------------------------------- type TestExtPacketParams struct { SetMarker bool @@ -23,7 +23,7 @@ type TestExtPacketParams struct { ArrivalTime int64 } -//----------------------------------------------------------- +// ----------------------------------------------------------- func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { packet := rtp.Packet{ @@ -56,7 +56,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { return ep, nil } -//-------------------------------------- +// -------------------------------------- func GetTestExtPacketVP8(params *TestExtPacketParams, vp8 *buffer.VP8) (*buffer.ExtPacket, error) { ep, err := GetTestExtPacket(params) @@ -69,7 +69,7 @@ func GetTestExtPacketVP8(params *TestExtPacketParams, vp8 *buffer.VP8) (*buffer. return ep, nil } -//-------------------------------------- +// -------------------------------------- var TestVP8Codec = webrtc.RTPCodecCapability{ MimeType: "video/vp8", @@ -81,4 +81,4 @@ var TestOpusCodec = webrtc.RTPCodecCapability{ ClockRate: 48000, } -//-------------------------------------- +// -------------------------------------- diff --git a/pkg/sfu/vp8munger.go b/pkg/sfu/vp8munger.go index 80f9210cc..ddebd3e01 100644 --- a/pkg/sfu/vp8munger.go +++ b/pkg/sfu/vp8munger.go @@ -185,7 +185,7 @@ func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumb } // in-order incoming sequence number, may or may not be contiguous. - // In the case of loss (i. e. incoming sequence number is not contiguous), + // In the case of loss (i.e. incoming sequence number is not contiguous), // forward even if it is a filtered layer. With temporal scalability, // it is unclear if the current packet should be dropped if it is not // contiguous. Hence forward anything that is not contiguous. @@ -286,7 +286,7 @@ func (v *VP8Munger) PictureIdOffset(extPictureId int32) (int32, bool) { return value.(int32), ok } -//----------------------------- +// ----------------------------- // // VP8PictureIdWrapHandler diff --git a/pkg/sfu/vp8munger_test.go b/pkg/sfu/vp8munger_test.go index 472c8b950..95c5336ec 100644 --- a/pkg/sfu/vp8munger_test.go +++ b/pkg/sfu/vp8munger_test.go @@ -4,10 +4,10 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/testutils" - - "github.com/stretchr/testify/require" ) func compare(expected *VP8Munger, actual *VP8Munger) bool { diff --git a/pkg/telemetry/analyticsservice.go b/pkg/telemetry/analyticsservice.go index 5c5d49c6c..0373c33b6 100644 --- a/pkg/telemetry/analyticsservice.go +++ b/pkg/telemetry/analyticsservice.go @@ -3,7 +3,7 @@ package telemetry import ( "context" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/config" @@ -24,14 +24,14 @@ type analyticsService struct { stats livekit.AnalyticsRecorderService_IngestStatsClient } -func NewAnalyticsService(conf *config.Config, currentNode routing.LocalNode) AnalyticsService { +func NewAnalyticsService(_ *config.Config, currentNode routing.LocalNode) AnalyticsService { return &analyticsService{ analyticsKey: "", // TODO: conf.AnalyticsKey nodeID: currentNode.Id, } } -func (a *analyticsService) SendStats(ctx context.Context, stats []*livekit.AnalyticsStat) { +func (a *analyticsService) SendStats(_ context.Context, stats []*livekit.AnalyticsStat) { if a.stats == nil { return } @@ -45,7 +45,7 @@ func (a *analyticsService) SendStats(ctx context.Context, stats []*livekit.Analy } } -func (a *analyticsService) SendEvent(ctx context.Context, event *livekit.AnalyticsEvent) { +func (a *analyticsService) SendEvent(_ context.Context, event *livekit.AnalyticsEvent) { if a.events == nil { return } diff --git a/pkg/telemetry/interceptor.go b/pkg/telemetry/interceptor.go index daf8eecdb..c3ad054a1 100644 --- a/pkg/telemetry/interceptor.go +++ b/pkg/telemetry/interceptor.go @@ -1,7 +1,7 @@ package telemetry import ( - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/pion/interceptor" "github.com/pion/rtcp" ) @@ -20,7 +20,7 @@ type StatsInterceptorFactory struct { identity string } -func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) { +func (f *StatsInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { return &StatsInterceptor{ t: f.t, participantID: f.participantID, diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index ba6b9f89c..eb049af05 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -4,7 +4,7 @@ import ( "sync/atomic" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" "github.com/prometheus/client_golang/prometheus" ) diff --git a/pkg/telemetry/prometheus/node_linux.go b/pkg/telemetry/prometheus/node_linux.go index 3ac786f65..f1805ab31 100644 --- a/pkg/telemetry/prometheus/node_linux.go +++ b/pkg/telemetry/prometheus/node_linux.go @@ -5,7 +5,7 @@ package prometheus import ( linuxproc "github.com/c9s/goprocinfo/linux" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" ) func updateCurrentNodeSystemStats(nodeStats *livekit.NodeStats) error { diff --git a/pkg/telemetry/prometheus/node_nonlinux.go b/pkg/telemetry/prometheus/node_nonlinux.go index 48c675833..448b48f79 100644 --- a/pkg/telemetry/prometheus/node_nonlinux.go +++ b/pkg/telemetry/prometheus/node_nonlinux.go @@ -3,8 +3,8 @@ package prometheus -import livekit "github.com/livekit/protocol/livekit" +import "github.com/livekit/protocol/livekit" -func updateCurrentNodeSystemStats(nodeStats *livekit.NodeStats) error { +func updateCurrentNodeSystemStats(_ *livekit.NodeStats) error { return nil } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 726ca8d99..368dedfa6 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -5,7 +5,7 @@ import ( "sync" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "google.golang.org/protobuf/types/known/timestamppb" "github.com/livekit/livekit-server/pkg/sfu/buffer" diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index fb4e41d79..1ba2de1e9 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -5,7 +5,7 @@ import ( "sync" "github.com/gammazero/workerpool" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/webhook" "github.com/pion/rtcp" diff --git a/pkg/telemetry/telemetryserviceevents.go b/pkg/telemetry/telemetryserviceevents.go index 254be4e05..9eb52ed10 100644 --- a/pkg/telemetry/telemetryserviceevents.go +++ b/pkg/telemetry/telemetryserviceevents.go @@ -4,7 +4,7 @@ import ( "context" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" "google.golang.org/protobuf/types/known/timestamppb" diff --git a/pkg/telemetry/test/telemetry_service_test.go b/pkg/telemetry/test/telemetry_service_test.go index 9b54e827b..90a16db06 100644 --- a/pkg/telemetry/test/telemetry_service_test.go +++ b/pkg/telemetry/test/telemetry_service_test.go @@ -5,10 +5,11 @@ import ( "testing" "time" - "github.com/livekit/livekit-server/pkg/telemetry" - "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" + + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" ) type telemetryServiceFixture struct { @@ -23,7 +24,7 @@ func createFixture() *telemetryServiceFixture { return fixture } -func Test_TelemetrySerice_Downstream_Stats(t *testing.T) { +func Test_TelemetryService_Downstream_Stats(t *testing.T) { fixture := createFixture() room := &livekit.Room{} @@ -33,7 +34,7 @@ func Test_TelemetrySerice_Downstream_Stats(t *testing.T) { totalBytes := 33 fixture.sut.OnDownstreamPacket(partSID, totalBytes) - //call participant left to trigget sending of analytics + // call participant left to trigget sending of analytics fixture.sut.ParticipantLeft(context.Background(), room, participantInfo) time.Sleep(time.Millisecond * 100) // wait for Update function to be called in go routine diff --git a/pkg/testutils/timeout.go b/pkg/testutils/timeout.go index eb5e56407..377daa90c 100644 --- a/pkg/testutils/timeout.go +++ b/pkg/testutils/timeout.go @@ -9,7 +9,6 @@ import ( ) var ( - SyncDelay = 100 * time.Millisecond ConnectTimeout = 30 * time.Second ) diff --git a/test/client/client.go b/test/client/client.go index d1e4ea16e..390a781f4 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -12,7 +12,8 @@ import ( "time" "github.com/gorilla/websocket" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/pion/rtcp" "github.com/pion/rtp" @@ -20,8 +21,6 @@ import ( "github.com/thoas/go-funk" "google.golang.org/protobuf/proto" - "github.com/livekit/protocol/logger" - "github.com/livekit/livekit-server/pkg/rtc" ) @@ -62,7 +61,7 @@ type RTCClient struct { OnConnected func() OnDataReceived func(data []byte, sid string) - // map of track Id and last packet + // map of track ID and last packet lastPackets map[string]*rtp.Packet bytesReceived map[string]uint64 } @@ -176,13 +175,13 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { if ic == nil { return } - c.SendIceCandidate(ic, livekit.SignalTarget_PUBLISHER) + _ = c.SendIceCandidate(ic, livekit.SignalTarget_PUBLISHER) }) c.subscriber.PeerConnection().OnICECandidate(func(ic *webrtc.ICECandidate) { if ic == nil { return } - c.SendIceCandidate(ic, livekit.SignalTarget_SUBSCRIBER) + _ = c.SendIceCandidate(ic, livekit.SignalTarget_SUBSCRIBER) }) c.subscriber.PeerConnection().OnTrack(func(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { @@ -281,10 +280,10 @@ func (c *RTCClient) Run() error { logger.Infow("join accepted, awaiting offer", "participant", msg.Join.Participant.Identity) case *livekit.SignalResponse_Answer: - //logger.Debugw("received server answer", + // logger.Debugw("received server answer", // "participant", c.localParticipant.Identity, // "answer", msg.Answer.Sdp) - c.handleAnswer(rtc.FromProtoSessionDescription(msg.Answer)) + _ = c.handleAnswer(rtc.FromProtoSessionDescription(msg.Answer)) case *livekit.SignalResponse_Offer: logger.Infow("received server offer", "participant", c.localParticipant.Identity, @@ -361,7 +360,7 @@ func (c *RTCClient) ReadResponse() (*livekit.SignalResponse, error) { msg := &livekit.SignalResponse{} switch messageType { case websocket.PingMessage: - c.conn.WriteMessage(websocket.PongMessage, nil) + _ = c.conn.WriteMessage(websocket.PongMessage, nil) continue case websocket.BinaryMessage: // protobuf encoded @@ -399,7 +398,7 @@ func (c *RTCClient) Stop() { }) c.connected.TrySet(false) c.iceConnected.TrySet(false) - c.conn.Close() + _ = c.conn.Close() c.publisher.Close() c.subscriber.Close() c.cancel() @@ -614,7 +613,7 @@ func (c *RTCClient) handleOffer(desc webrtc.SessionDescription) error { // send remote an answer logger.Infow("sending subscriber answer", "participant", c.localParticipant.Identity, - //"sdp", answer, + // "sdp", answer, ) return c.SendRequest(&livekit.SignalRequest{ Message: &livekit.SignalRequest_Answer{ @@ -723,5 +722,5 @@ func (c *RTCClient) SendNacks(count int) { } c.lock.Unlock() - c.subscriber.PeerConnection().WriteRTCP(packets) + _ = c.subscriber.PeerConnection().WriteRTCP(packets) } diff --git a/test/integration_helpers.go b/test/integration_helpers.go index c221f705d..6bd5c095e 100644 --- a/test/integration_helpers.go +++ b/test/integration_helpers.go @@ -10,7 +10,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/livekit/protocol/auth" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/twitchtv/twirp" @@ -28,9 +28,9 @@ const ( testApiSecret = "apiSecret" testRoom = "mytestroom" defaultServerPort = 7880 - secondServerPort = 8880 - nodeId1 = "node-1" - nodeId2 = "node-2" + secondServerPort = 8880 + nodeID1 = "node-1" + nodeID2 = "node-2" syncDelay = 100 * time.Millisecond // if there are deadlocks, it's helpful to set a short test timeout (i.e. go test -timeout=30s) @@ -70,8 +70,8 @@ func setupSingleNodeTest(name string, roomName string) (*service.LivekitServer, func setupMultiNodeTest(name string) (*service.LivekitServer, *service.LivekitServer, func()) { logger.Infow("----------------STARTING TEST----------------", "test", name) - s1 := createMultiNodeServer(utils.NewGuid(nodeId1), defaultServerPort) - s2 := createMultiNodeServer(utils.NewGuid(nodeId2), secondServerPort) + s1 := createMultiNodeServer(utils.NewGuid(nodeID1), defaultServerPort) + s2 := createMultiNodeServer(utils.NewGuid(nodeID2), secondServerPort) go s1.Start() go s2.Start() @@ -145,7 +145,7 @@ func createSingleNodeServer() *service.LivekitServer { if err != nil { panic(fmt.Sprintf("could not create local node: %v", err)) } - currentNode.Id = utils.NewGuid(nodeId1) + currentNode.Id = utils.NewGuid(nodeID1) s, err := service.InitializeServer(conf, currentNode) if err != nil { @@ -156,7 +156,7 @@ func createSingleNodeServer() *service.LivekitServer { return s } -func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer { +func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer { var err error conf, err := config.NewConfig("", nil) if err != nil { @@ -173,7 +173,7 @@ func createMultiNodeServer(nodeId string, port uint32) *service.LivekitServer { if err != nil { panic(err) } - currentNode.Id = nodeId + currentNode.Id = nodeID // redis routing and store s, err := service.InitializeServer(conf, currentNode) diff --git a/test/multinode_test.go b/test/multinode_test.go index 30919bbce..1be2f8c83 100644 --- a/test/multinode_test.go +++ b/test/multinode_test.go @@ -4,10 +4,10 @@ import ( "testing" "time" - "github.com/livekit/livekit-server/pkg/rtc" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/testutils" ) @@ -48,8 +48,8 @@ func TestMultiNodeRouting(t *testing.T) { } tr1 := c2.SubscribedTracks()[c1.ID()][0] - streamId, _ := rtc.UnpackStreamID(tr1.StreamID()) - require.Equal(t, c1.ID(), streamId) + streamID, _ := rtc.UnpackStreamID(tr1.StreamID()) + require.Equal(t, c1.ID(), streamID) return true }) } @@ -78,7 +78,7 @@ func TestMultinodePublishingUponJoining(t *testing.T) { _, _, finish := setupMultiNodeTest("TestMultinodePublishingUponJoining") defer finish() - scenarioPublishingUponJoining(t, defaultServerPort, secondServerPort) + scenarioPublishingUponJoining(t) } func TestMultinodeReceiveBeforePublish(t *testing.T) { diff --git a/test/scenarios.go b/test/scenarios.go index d36325ccd..70c31ed52 100644 --- a/test/scenarios.go +++ b/test/scenarios.go @@ -4,7 +4,7 @@ import ( "testing" "time" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/stretchr/testify/require" @@ -14,7 +14,7 @@ import ( ) // a scenario with lots of clients connecting, publishing, and leaving at random periods -func scenarioPublishingUponJoining(t *testing.T, ports ...int) { +func scenarioPublishingUponJoining(t *testing.T) { c1 := createRTCClient("puj_1", defaultServerPort, nil) c2 := createRTCClient("puj_2", secondServerPort, &testclient.Options{AutoSubscribe: true}) c3 := createRTCClient("puj_3", defaultServerPort, &testclient.Options{AutoSubscribe: true}) @@ -142,14 +142,6 @@ func scenarioDataPublish(t *testing.T) { } // websocket reconnects -func scenarioWSReconnect(t *testing.T) { - c1 := createRTCClient("wsr_1", defaultServerPort, nil) - c2 := createRTCClient("wsr_2", defaultServerPort, nil) - - waitUntilConnected(t, c1, c2) - - // c1 publishes track, but disconnects websockets and reconnects -} func publishTracksForClients(t *testing.T, clients ...*testclient.RTCClient) []*testclient.TrackWriter { logger.Infow("publishing tracks for clients") diff --git a/test/singlenode_test.go b/test/singlenode_test.go index b4b2bf456..591132261 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -7,11 +7,10 @@ import ( "time" "github.com/livekit/protocol/auth" - - "github.com/livekit/livekit-server/pkg/rtc" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/testutils" testclient "github.com/livekit/livekit-server/test/client" ) @@ -74,7 +73,7 @@ func TestClientConnectDuplicate(t *testing.T) { return false } - //participant ID can be appended with '#..' . but should contain orig id as prefix + // participant ID can be appended with '#..' . but should contain orig id as prefix tr1 := c2.SubscribedTracks()[c1.ID()][0] participantId1, _ := rtc.UnpackStreamID(tr1.StreamID()) require.Equal(t, c1.ID(), participantId1) diff --git a/test/turn_test.go b/test/turn_test.go deleted file mode 100644 index f02b3a713..000000000 --- a/test/turn_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package test - -import ( - "fmt" - "net" - "testing" - "time" - - livekit "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/utils" - "github.com/pion/turn/v2" - "github.com/stretchr/testify/require" - - "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/pkg/service" -) - -func testTurnServer(t *testing.T) { - conf, err := config.NewConfig("", nil) - require.NoError(t, err) - - conf.TURN.Enabled = true - conf.Keys = map[string]string{testApiKey: testApiSecret} - - currentNode, err := routing.NewLocalNode(conf) - require.NoError(t, err) - currentNode.Id = utils.NewGuid(nodeId1) - - // local routing and store - s, err := service.InitializeServer(conf, currentNode) - require.NoError(t, err) - go s.Start() - waitForServerToStart(s) - defer s.Stop(true) - - time.Sleep(syncDelay) - - // create a room - rm := &livekit.Room{ - Sid: utils.NewGuid(utils.RoomPrefix), - Name: "testroom", - TurnPassword: utils.RandomSecret(), - } - // require.NoError(t, roomStore.CreateRoom(rm)) - - turnConf := &turn.ClientConfig{ - STUNServerAddr: fmt.Sprintf("localhost:%d", conf.TURN.TLSPort), - TURNServerAddr: fmt.Sprintf("%s:%d", currentNode.Ip, conf.TURN.TLSPort), - Username: rm.Name, - Password: rm.TurnPassword, - Realm: "livekit", - } - - t.Run("TURN works over TCP", func(t *testing.T) { - conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", conf.TURN.TLSPort)) - require.NoError(t, err) - - tc := *turnConf - tc.Conn = turn.NewSTUNConn(conn) - c, err := turn.NewClient(&tc) - require.NoError(t, err) - defer c.Close() - - require.NoError(t, c.Listen()) - - // Allocate a relay socket on the TURN server. On success, it - // will return a net.PacketConn which represents the remote - // socket. - relayConn, err := c.Allocate() - require.NoError(t, err) - - defer func() { - require.NoError(t, relayConn.Close()) - }() - }) - // UDP test doesn't pass - //t.Run("TURN connects over UDP", func(t *testing.T) { - // conn, err := net.ListenPacket("udp4", "0.0.0.0:0") - // require.NoError(t, err) - // defer func() { - // require.NoError(t, conn.Close()) - // }() - // - // tc := *turnConf - // tc.Conn = conn - // - // client, err := turn.NewClient(&tc) - // require.NoError(t, err) - // defer client.Close() - // - // // Start listening on the conn provided. - // require.NoError(t, client.Listen()) - // - // // Allocate a relay socket on the TURN server. On success, it - // // will return a net.PacketConn which represents the remote - // // socket. - // relayConn, err := client.Allocate() - // require.NoError(t, err) - // defer func() { - // require.NoError(t, relayConn.Close()) - // }() - //}) -} diff --git a/test/webhook_test.go b/test/webhook_test.go index 57b8fdca6..feb10cf12 100644 --- a/test/webhook_test.go +++ b/test/webhook_test.go @@ -9,7 +9,7 @@ import ( "testing" "github.com/livekit/protocol/auth" - livekit "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" @@ -84,7 +84,7 @@ func TestWebhooks(t *testing.T) { require.Equal(t, testRoom, ts.GetEvent(webhook.EventRoomFinished).Room.Name) } -func setupServerWithWebhook() (server *service.LivekitServer, testServer *webookTestServer, finishFunc func(), err error) { +func setupServerWithWebhook() (server *service.LivekitServer, testServer *webhookTestServer, finishFunc func(), err error) { conf, err := config.NewConfig("", nil) if err != nil { panic(fmt.Sprintf("could not create config: %v", err)) @@ -103,7 +103,7 @@ func setupServerWithWebhook() (server *service.LivekitServer, testServer *webook if err != nil { return } - currentNode.Id = utils.NewGuid(nodeId1) + currentNode.Id = utils.NewGuid(nodeID1) server, err = service.InitializeServer(conf, currentNode) if err != nil { @@ -125,15 +125,15 @@ func setupServerWithWebhook() (server *service.LivekitServer, testServer *webook return } -type webookTestServer struct { +type webhookTestServer struct { server *http.Server events map[string]*livekit.WebhookEvent lock sync.Mutex provider auth.KeyProvider } -func newTestServer(addr string) *webookTestServer { - s := &webookTestServer{ +func newTestServer(addr string) *webhookTestServer { + s := &webhookTestServer{ events: make(map[string]*livekit.WebhookEvent), provider: auth.NewFileBasedKeyProviderFromMap(map[string]string{testApiKey: testApiSecret}), } @@ -144,7 +144,7 @@ func newTestServer(addr string) *webookTestServer { return s } -func (s *webookTestServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *webhookTestServer) ServeHTTP(_ http.ResponseWriter, r *http.Request) { data, err := webhook.Receive(r, s.provider) if err != nil { logger.Errorw("could not receive webhook", err) @@ -162,19 +162,19 @@ func (s *webookTestServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.lock.Unlock() } -func (s *webookTestServer) GetEvent(name string) *livekit.WebhookEvent { +func (s *webhookTestServer) GetEvent(name string) *livekit.WebhookEvent { s.lock.Lock() defer s.lock.Unlock() return s.events[name] } -func (s *webookTestServer) ClearEvents() { +func (s *webhookTestServer) ClearEvents() { s.lock.Lock() s.events = make(map[string]*livekit.WebhookEvent) s.lock.Unlock() } -func (s *webookTestServer) Start() error { +func (s *webhookTestServer) Start() error { l, err := net.Listen("tcp", s.server.Addr) if err != nil { return err @@ -183,6 +183,6 @@ func (s *webookTestServer) Start() error { return nil } -func (s *webookTestServer) Stop() { +func (s *webhookTestServer) Stop() { _ = s.server.Shutdown(context.Background()) }