diff --git a/go.mod b/go.mod index e0d1acf60..01988518b 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.21.1-0.20240913074525-1f5de7d620c4 + github.com/livekit/protocol v1.21.1-0.20240919052504-1874ac067983 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 4002f6667..c3d22a889 100644 --- a/go.sum +++ b/go.sum @@ -169,8 +169,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.21.1-0.20240913074525-1f5de7d620c4 h1:JhLovaAv+UxOTYpjQxN3IRV3kuJFb9f3LZmg/KqvHkc= -github.com/livekit/protocol v1.21.1-0.20240913074525-1f5de7d620c4/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI= +github.com/livekit/protocol v1.21.1-0.20240919052504-1874ac067983 h1:zIxjMDJlt4YXqRWkNZEp5y7gIMjACDve3m3bD4s4lug= +github.com/livekit/protocol v1.21.1-0.20240919052504-1874ac067983/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/clientconfiguration/staticconfiguration.go b/pkg/clientconfiguration/staticconfiguration.go index 4fb67bae4..d8145c260 100644 --- a/pkg/clientconfiguration/staticconfiguration.go +++ b/pkg/clientconfiguration/staticconfiguration.go @@ -20,6 +20,7 @@ import ( "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + protoutils "github.com/livekit/protocol/utils" ) type ConfigurationItem struct { @@ -58,7 +59,7 @@ func (s *StaticClientConfigurationManager) GetConfiguration(clientInfo *livekit. var conf *livekit.ClientConfiguration for k, v := range matchedConf { if k == 0 { - conf = proto.Clone(matchedConf[0]).(*livekit.ClientConfiguration) + conf = protoutils.CloneProto(matchedConf[0]) } else { // TODO : there is a problem use protobuf merge, we don't have flag to indicate 'no value', // don't override default behavior or other configuration's field. So a bool value = false or diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 467c2a0f2..363d45250 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -30,6 +30,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -116,7 +117,7 @@ func NewMediaTrackReceiver(params MediaTrackReceiverParams, ti *livekit.TrackInf params: params, state: mediaTrackReceiverStateOpen, } - t.trackInfo.Store(proto.Clone(ti).(*livekit.TrackInfo)) + t.trackInfo.Store(utils.CloneProto(ti)) t.MediaTrackSubscriptions = NewMediaTrackSubscriptions(MediaTrackSubscriptionsParams{ MediaTrack: params.MediaTrack, @@ -185,7 +186,7 @@ func (t *MediaTrackReceiver) SetupReceiver(receiver sfu.TrackReceiver, priority trackInfo := t.TrackInfo() if priority == 0 { - trackInfo = proto.Clone(trackInfo).(*livekit.TrackInfo) + trackInfo = utils.CloneProto(trackInfo) trackInfo.MimeType = receiver.Codec().MimeType trackInfo.Mid = mid t.trackInfo.Store(trackInfo) @@ -605,7 +606,7 @@ func (t *MediaTrackReceiver) UpdateCodecCid(codecs []*livekit.SimulcastCodec) { func (t *MediaTrackReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { updateMute := false - clonedInfo := proto.Clone(ti).(*livekit.TrackInfo) + clonedInfo := utils.CloneProto(ti) t.lock.Lock() trackInfo := t.TrackInfo() @@ -658,7 +659,7 @@ func (t *MediaTrackReceiver) UpdateAudioTrack(update *livekit.UpdateLocalAudioTr t.lock.Lock() trackInfo := t.TrackInfo() - clonedInfo := proto.Clone(trackInfo).(*livekit.TrackInfo) + clonedInfo := utils.CloneProto(trackInfo) clonedInfo.AudioFeatures = update.Features clonedInfo.Stereo = false clonedInfo.DisableDtx = false @@ -690,7 +691,7 @@ func (t *MediaTrackReceiver) UpdateVideoTrack(update *livekit.UpdateLocalVideoTr t.lock.Lock() trackInfo := t.TrackInfo() - clonedInfo := proto.Clone(trackInfo).(*livekit.TrackInfo) + clonedInfo := utils.CloneProto(trackInfo) clonedInfo.Width = update.Width clonedInfo.Height = update.Height if proto.Equal(trackInfo, clonedInfo) { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 6bc61e523..48776d0ba 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1949,7 +1949,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l clonedLayers := make([]*livekit.VideoLayer, 0, len(req.Layers)) for _, l := range req.Layers { - clonedLayers = append(clonedLayers, proto.Clone(l).(*livekit.VideoLayer)) + clonedLayers = append(clonedLayers, utils.CloneProto(l)) } ti.Codecs = append(ti.Codecs, &livekit.SimulcastCodecInfo{ MimeType: mime, diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 481744315..a64a3f028 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -246,7 +246,7 @@ func NewRoom( egressLauncher EgressLauncher, ) *Room { r := &Room{ - protoRoom: proto.Clone(room).(*livekit.Room), + protoRoom: utils.CloneProto(room), internal: internal, Logger: LoggerWithRoom( logger.GetLogger().WithComponent(sutils.ComponentRoom), @@ -964,7 +964,7 @@ func (r *Room) GetAgentDispatches(dispatchID string) ([]*livekit.AgentDispatch, for _, ad := range r.agentDispatches { if dispatchID == "" || ad.Id == dispatchID { - ret = append(ret, proto.Clone(ad.AgentDispatch).(*livekit.AgentDispatch)) + ret = append(ret, utils.CloneProto(ad.AgentDispatch)) } } @@ -1427,7 +1427,7 @@ func (r *Room) pushAndDequeueUpdates( func (r *Room) updateProto() *livekit.Room { r.lock.RLock() - room := proto.Clone(r.protoRoom).(*livekit.Room) + room := utils.CloneProto(r.protoRoom) r.lock.RUnlock() room.NumPublishers = 0 @@ -1493,7 +1493,7 @@ func (r *Room) audioUpdateWorker() { // changedSpeakers need to include previous speakers that are no longer speaking for sid, speaker := range lastActiveMap { if nextActiveMap[sid] == nil { - inactiveSpeaker := proto.Clone(speaker).(*livekit.SpeakerInfo) + inactiveSpeaker := utils.CloneProto(speaker) inactiveSpeaker.Level = 0 inactiveSpeaker.Active = false changedSpeakers = append(changedSpeakers, inactiveSpeaker) diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index a85f361c1..37b0df7a6 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -23,6 +23,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" "github.com/livekit/livekit-server/version" @@ -284,7 +285,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "last version is enqueued", pi: subscriber1v2, - existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)}, // clone the existing value since it can be modified when setting to disconnected + existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)}, // clone the existing value since it can be modified when setting to disconnected validate: func(t *testing.T, rm *Room, _ []*participantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] require.NotNil(t, queued) @@ -294,7 +295,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "latest version when immediate", pi: subscriber1v2, - existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)}, + existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)}, immediate: true, expected: []*participantUpdate{{pi: subscriber1v2}}, validate: func(t *testing.T, rm *Room, _ []*participantUpdate) { @@ -305,7 +306,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "out of order updates are rejected", pi: subscriber1v1, - existing: &participantUpdate{pi: proto.Clone(subscriber1v2).(*livekit.ParticipantInfo)}, + existing: &participantUpdate{pi: utils.CloneProto(subscriber1v2)}, validate: func(t *testing.T, rm *Room, updates []*participantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] requirePIEquals(t, subscriber1v2, queued.pi) @@ -315,7 +316,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { name: "sid change is broadcasted immediately with synthsized disconnect", pi: publisher2, closeReason: types.ParticipantCloseReasonServiceRequestRemoveParticipant, // just to test if update contain the close reason - existing: &participantUpdate{pi: proto.Clone(subscriber1v2).(*livekit.ParticipantInfo), closeReason: types.ParticipantCloseReasonStale}, + existing: &participantUpdate{pi: utils.CloneProto(subscriber1v2), closeReason: types.ParticipantCloseReasonStale}, expected: []*participantUpdate{ { pi: &livekit.ParticipantInfo{ @@ -333,7 +334,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { { name: "when switching to publisher, queue is cleared", pi: publisher1v2, - existing: &participantUpdate{pi: proto.Clone(subscriber1v1).(*livekit.ParticipantInfo)}, + existing: &participantUpdate{pi: utils.CloneProto(subscriber1v1)}, expected: []*participantUpdate{{pi: publisher1v2}}, validate: func(t *testing.T, rm *Room, updates []*participantUpdate) { require.Empty(t, rm.batchedUpdates) @@ -622,7 +623,7 @@ func TestDataChannel(t *testing.T) { } setSource(mode, packet, p) - packetExp := proto.Clone(packet).(*livekit.DataPacket) + packetExp := utils.CloneProto(packet) if mode != legacySID { packetExp.ParticipantIdentity = string(p.Identity()) packetExp.GetUser().ParticipantIdentity = string(p.Identity()) @@ -667,7 +668,7 @@ func TestDataChannel(t *testing.T) { setSource(mode, packet, p) setDest(mode, packet, p1) - packetExp := proto.Clone(packet).(*livekit.DataPacket) + packetExp := utils.CloneProto(packet) if mode != legacySID { packetExp.ParticipantIdentity = string(p.Identity()) packetExp.GetUser().ParticipantIdentity = string(p.Identity()) diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index f5dd9e995..c117885e9 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -220,7 +220,7 @@ func (t *SubscribedTrack) UpdateSubscriberSettings(settings *livekit.UpdateTrack } isImmediate = isImmediate || (!settings.Disabled && settings.Disabled != t.isMutedLocked()) - t.settings = proto.Clone(settings).(*livekit.UpdateTrackSettings) + t.settings = utils.CloneProto(settings) t.settingsLock.Unlock() if isImmediate { diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 93dfc8b86..e7fedf9e2 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/transport" @@ -439,7 +440,7 @@ func (t *TransportManager) GetICEConfig() *livekit.ICEConfig { if t.iceConfig == nil { return nil } - return proto.Clone(t.iceConfig).(*livekit.ICEConfig) + return utils.CloneProto(t.iceConfig) } func (t *TransportManager) resetTransportConfigureLocked(reconfigured bool) { diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index b1b498509..65013c5ea 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -20,9 +20,9 @@ import ( "time" "github.com/thoas/go-funk" - "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils" ) // encapsulates CRUD operations for room settings @@ -184,7 +184,7 @@ func (s *LocalStore) StoreAgentDispatch(ctx context.Context, dispatch *livekit.A s.lock.Lock() defer s.lock.Unlock() - clone := proto.Clone(dispatch).(*livekit.AgentDispatch) + clone := utils.CloneProto(dispatch) if clone.State != nil { clone.State.Jobs = nil } @@ -224,14 +224,14 @@ func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.R var js []*livekit.Job if agentJobs != nil { for _, j := range agentJobs { - js = append(js, proto.Clone(j).(*livekit.Job)) + js = append(js, utils.CloneProto(j)) } } var ds []*livekit.AgentDispatch m := make(map[string]*livekit.AgentDispatch) for _, d := range agentDispatches { - clone := proto.Clone(d).(*livekit.AgentDispatch) + clone := utils.CloneProto(d) m[d.Id] = clone ds = append(ds, clone) } @@ -239,7 +239,7 @@ func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.R for _, j := range js { d := m[j.DispatchId] if d != nil { - d.State.Jobs = append(d.State.Jobs, proto.Clone(j).(*livekit.Job)) + d.State.Jobs = append(d.State.Jobs, utils.CloneProto(j)) } } @@ -250,7 +250,7 @@ func (s *LocalStore) StoreAgentJob(ctx context.Context, job *livekit.Job) error s.lock.Lock() defer s.lock.Unlock() - clone := proto.Clone(job).(*livekit.Job) + clone := utils.CloneProto(job) clone.Room = nil if clone.Participant != nil { clone.Participant = &livekit.ParticipantInfo{ diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 8326e36f1..35d02d892 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" "github.com/livekit/psrpc" @@ -527,7 +528,7 @@ func (s *RedisStore) storeIngress(_ context.Context, info *livekit.IngressInfo) } // ignore state - infoCopy := proto.Clone(info).(*livekit.IngressInfo) + infoCopy := utils.CloneProto(info) infoCopy.State = nil data, err := proto.Marshal(infoCopy) @@ -830,7 +831,7 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) } func (s *RedisStore) StoreAgentDispatch(_ context.Context, dispatch *livekit.AgentDispatch) error { - di := proto.Clone(dispatch).(*livekit.AgentDispatch) + di := utils.CloneProto(dispatch) // Do not store jobs with the dispatch if di.State != nil { @@ -893,7 +894,7 @@ func (s *RedisStore) StoreAgentJob(_ context.Context, job *livekit.Job) error { key := AgentJobPrefix + string(job.Room.Name) - jb := proto.Clone(job).(*livekit.Job) + jb := utils.CloneProto(job) // Do not store room with the job jb.Room = nil diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index 13a6905e6..d584c4102 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -343,7 +343,7 @@ func TestAgentStore(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(rd)) - expected := proto.Clone(ad).(*livekit.AgentDispatch) + expected := utils.CloneProto(ad) expected.State.Jobs = nil require.True(t, proto.Equal(expected, rd[0])) @@ -354,7 +354,7 @@ func TestAgentStore(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(rd)) - expected = proto.Clone(ad).(*livekit.AgentDispatch) + expected = utils.CloneProto(ad) expected.State.Jobs[0].Room = nil expected.State.Jobs[0].Participant = &livekit.ParticipantInfo{ Identity: "identity", @@ -368,7 +368,7 @@ func TestAgentStore(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(rd)) - expected = proto.Clone(ad).(*livekit.AgentDispatch) + expected = utils.CloneProto(ad) expected.State.Jobs = nil require.True(t, proto.Equal(expected, rd[0])) diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 93ddec420..dce8f1d2f 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -19,8 +19,6 @@ import ( "errors" "time" - "google.golang.org/protobuf/proto" - "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" @@ -211,7 +209,7 @@ func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateR return req, psrpc.NewErrorf(psrpc.InvalidArgument, "unknown room confguration in create room request") } - clone := proto.Clone(req).(*livekit.CreateRoomRequest) + clone := utils.CloneProto(req) // Request overwrites conf if clone.EmptyTimeout == 0 { @@ -224,10 +222,10 @@ func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateR clone.MaxParticipants = conf.MaxParticipants } if clone.Egress == nil { - clone.Egress = proto.Clone(conf.Egress).(*livekit.RoomEgress) + clone.Egress = utils.CloneProto(conf.Egress) } if clone.Agent == nil { - clone.Agent = proto.Clone(conf.Agent).(*livekit.RoomAgent) + clone.Agent = utils.CloneProto(conf.Agent) } if clone.MinPlayoutDelay == 0 { clone.MinPlayoutDelay = conf.MinPlayoutDelay diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 76875b75d..bd76e5ca3 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -21,7 +21,6 @@ import ( "github.com/pkg/errors" "github.com/twitchtv/twirp" - "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -30,6 +29,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" ) type RoomService struct { @@ -323,7 +323,7 @@ func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoom return req } - clone := proto.Clone(req).(*livekit.CreateRoomRequest) + clone := utils.CloneProto(req) if clone.Egress.Room != nil { egress.RedactEncodedOutputs(clone.Egress.Room) diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 6142f10c7..812fcacc3 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -21,7 +21,6 @@ import ( "time" "go.uber.org/zap/zapcore" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/livekit/mediatransportutil" @@ -302,8 +301,8 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool { r.rtt = from.rtt r.maxRtt = from.maxRtt - r.srFirst = proto.Clone(from.srFirst).(*livekit.RTCPSenderReportState) - r.srNewest = proto.Clone(from.srNewest).(*livekit.RTCPSenderReportState) + r.srFirst = utils.CloneProto(from.srFirst) + r.srNewest = utils.CloneProto(from.srNewest) r.nextSnapshotID = from.nextSnapshotID r.snapshots = make([]snapshot, cap(from.snapshots)) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 81d3f1178..504de9b3b 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -21,7 +21,6 @@ import ( "github.com/pion/rtcp" "go.uber.org/zap/zapcore" - "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/mediatransportutil" @@ -413,7 +412,7 @@ func (r *RTPStatsReceiver) getExtendedSenderReport(srData *livekit.RTCPSenderRep } } - srDataExt := proto.Clone(srData).(*livekit.RTCPSenderReportState) + srDataExt := protoutils.CloneProto(srData) srDataExt.RtpTimestampExt = uint64(srDataExt.RtpTimestamp) + tsCycles return srDataExt } @@ -567,7 +566,7 @@ func (r *RTPStatsReceiver) GetRtcpSenderReportData() *livekit.RTCPSenderReportSt r.lock.RLock() defer r.lock.RUnlock() - return proto.Clone(r.srNewest).(*livekit.RTCPSenderReportState) + return protoutils.CloneProto(r.srNewest) } func (r *RTPStatsReceiver) LastSenderReportTime() time.Time { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 94fd13aca..43c5d1641 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -26,11 +26,11 @@ import ( "github.com/pion/rtp" "github.com/pion/webrtc/v3" "go.uber.org/zap/zapcore" - "google.golang.org/protobuf/proto" "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/codecmunger" @@ -406,7 +406,7 @@ func (f *Forwarder) GetState() *livekit.RTPForwarderState { state.SenderReportState = make([]*livekit.RTCPSenderReportState, len(f.refInfos)) for layer, refInfo := range f.refInfos { - state.SenderReportState[layer] = proto.Clone(refInfo.senderReport).(*livekit.RTCPSenderReportState) + state.SenderReportState[layer] = utils.CloneProto(refInfo.senderReport) } return state } @@ -421,7 +421,7 @@ func (f *Forwarder) SeedState(state *livekit.RTPForwarderState) { for layer, rtcpSenderReportState := range state.SenderReportState { f.refInfos[layer] = refInfo{} - if senderReport := proto.Clone(rtcpSenderReportState).(*livekit.RTCPSenderReportState); senderReport != nil && senderReport.NtpTimestamp != 0 { + if senderReport := utils.CloneProto(rtcpSenderReportState); senderReport != nil && senderReport.NtpTimestamp != 0 { f.refInfos[layer].senderReport = senderReport } } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 0a565661f..9e015360b 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -24,11 +24,11 @@ import ( "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "go.uber.org/atomic" - "google.golang.org/protobuf/proto" "github.com/livekit/mediatransportutil/pkg/bucket" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/sfu/audio" @@ -209,7 +209,7 @@ func NewWebRTCReceiver( for _, opt := range opts { w = opt(w) } - w.trackInfo.Store(proto.Clone(trackInfo).(*livekit.TrackInfo)) + w.trackInfo.Store(utils.CloneProto(trackInfo)) w.downTrackSpreader = NewDownTrackSpreader(DownTrackSpreaderParams{ Threshold: w.lbThreshold, @@ -249,7 +249,7 @@ func (w *WebRTCReceiver) TrackInfo() *livekit.TrackInfo { } func (w *WebRTCReceiver) UpdateTrackInfo(ti *livekit.TrackInfo) { - w.trackInfo.Store(proto.Clone(ti).(*livekit.TrackInfo)) + w.trackInfo.Store(utils.CloneProto(ti)) w.streamTrackerManager.UpdateTrackInfo(ti) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index d106dd9e5..b25544b29 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -21,10 +21,10 @@ import ( "github.com/frostbyte73/core" "go.uber.org/atomic" - "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -82,7 +82,7 @@ func NewStreamTrackerManager( maxTemporalLayerSeen: buffer.InvalidLayerTemporal, clockRate: clockRate, } - s.trackInfo.Store(proto.Clone(trackInfo).(*livekit.TrackInfo)) + s.trackInfo.Store(utils.CloneProto(trackInfo)) switch trackInfo.Source { case livekit.TrackSource_SCREEN_SHARE: @@ -312,7 +312,7 @@ func (s *StreamTrackerManager) IsPaused() bool { } func (s *StreamTrackerManager) UpdateTrackInfo(ti *livekit.TrackInfo) { - s.trackInfo.Store(proto.Clone(ti).(*livekit.TrackInfo)) + s.trackInfo.Store(utils.CloneProto(ti)) s.maxExpectedLayerFromTrackInfo() } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 150a62bde..a72ceaf8c 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -19,12 +19,12 @@ import ( "sync" "time" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + protoutils "github.com/livekit/protocol/utils" ) // StatsWorker handles participant stats @@ -227,7 +227,7 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat { for _, videoLayer := range analyticsStream.VideoLayers { coalescedVideoLayer := coalescedVideoLayers[videoLayer.Layer] if coalescedVideoLayer == nil { - coalescedVideoLayer = proto.Clone(videoLayer).(*livekit.AnalyticsVideoLayer) + coalescedVideoLayer = protoutils.CloneProto(videoLayer) coalescedVideoLayers[videoLayer.Layer] = coalescedVideoLayer } else { coalescedVideoLayer.Packets += videoLayer.Packets diff --git a/pkg/utils/protocol.go b/pkg/utils/protocol.go index 23844742f..77d324b94 100644 --- a/pkg/utils/protocol.go +++ b/pkg/utils/protocol.go @@ -17,16 +17,15 @@ package utils import ( - "google.golang.org/protobuf/proto" - "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils" ) func ClientInfoWithoutAddress(c *livekit.ClientInfo) *livekit.ClientInfo { if c == nil { return nil } - clone := proto.Clone(c).(*livekit.ClientInfo) + clone := utils.CloneProto(c) clone.Address = "" return clone }