diff --git a/go.mod b/go.mod index 4678ccc89..90f198820 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded - github.com/livekit/protocol v1.39.4-0.20250721063419-93319bf9e30a + github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 13f9ef2f8..ca08aaec7 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g= github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= -github.com/livekit/protocol v1.39.4-0.20250721063419-93319bf9e30a h1:gJaHYMRz7ZJCT1qLIfReHEL27zD8L+pRVELMjAatdJI= -github.com/livekit/protocol v1.39.4-0.20250721063419-93319bf9e30a/go.mod h1:6l+zgRJZ9sY96LM7DA3EMcKQC5zsVyZVP73c+9wgvCA= +github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f h1:Cwe38+/ld3r5dnNmIZSALSoZPWNEMeYPZIi/qjpplLo= +github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU= github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c h1:WwEr0YBejYbKzk8LSaO9h8h0G9MnE7shyDu8yXQWmEc= github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c/go.mod h1:kmD+AZPkWu0MaXIMv57jhNlbiSZZ/Jx4bzlxBDVmJes= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index 3338d4c02..213d11511 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -33,13 +33,13 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/observability/roomobs" lksdp "github.com/livekit/protocol/sdp" + "github.com/livekit/protocol/signalling" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/routing/routingfakes" - "github.com/livekit/livekit-server/pkg/rtc/signalling" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" "github.com/livekit/livekit-server/pkg/testutils" diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index fd8aa3473..7eaba1467 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -21,9 +21,9 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + protosignalling "github.com/livekit/protocol/signalling" "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/pkg/rtc/signalling" "github.com/livekit/livekit-server/pkg/rtc/types" "google.golang.org/protobuf/proto" @@ -222,7 +222,7 @@ func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livek return nil } - trickle := signalling.ToProtoTrickle(prevIC.ToJSON(), target, ic == nil) + trickle := protosignalling.ToProtoTrickle(prevIC.ToJSON(), target, ic == nil) p.params.Logger.Debugw("sending ICE candidate", "transport", target, "trickle", logger.Proto(trickle)) return p.signaller.WriteMessage(p.signalling.SignalICECandidate(trickle)) @@ -294,11 +294,11 @@ func (p *ParticipantImpl) sendLeaveRequest( } func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32) error { - return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(signalling.ToProtoSessionDescription(answer, answerId))) + return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(protosignalling.ToProtoSessionDescription(answer, answerId))) } func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32) error { - return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(signalling.ToProtoSessionDescription(offer, offerId))) + return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(offer, offerId))) } func (p *ParticipantImpl) sendStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) error { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 0f8d25e46..6857a26bf 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -32,6 +32,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/observability/roomobs" "github.com/livekit/protocol/rpc" + protosignalling "github.com/livekit/protocol/signalling" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" "github.com/livekit/psrpc" @@ -39,7 +40,6 @@ import ( "github.com/livekit/livekit-server/pkg/agent" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/pkg/rtc/signalling" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -797,7 +797,7 @@ func (r *Room) Joinv2( prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "get_subscriber_offer").Add(1) return nil, err } - connectResponse.SubscriberSdp = signalling.ToProtoSessionDescription(offer, 0) // SIGNALLING-V2-TODO - need to proper offerId? + connectResponse.SubscriberSdp = protosignalling.ToProtoSessionDescription(offer, 0) // SIGNALLING-V2-TODO - need to proper offerId? // for sync response, this does not actually send, only generates messageId and caches the message if err := participant.SendConnectResponse(connectResponse); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) diff --git a/pkg/rtc/signalling/signalcache.go b/pkg/rtc/signalling/signalcache.go deleted file mode 100644 index b51b41a30..000000000 --- a/pkg/rtc/signalling/signalcache.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package signalling - -import ( - "math/rand" - "sync" - - "github.com/gammazero/deque" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" -) - -type SignalCacheParams struct { - Logger logger.Logger - FirstMessageId uint32 // should be used for testing only -} - -type SignalCache struct { - params SignalCacheParams - - lock sync.Mutex - messageId uint32 - lastProcessedRemoteMessageId uint32 - messages deque.Deque[*livekit.Signalv2ServerMessage] -} - -func NewSignalCache(params SignalCacheParams) *SignalCache { - s := &SignalCache{ - params: params, - messageId: params.FirstMessageId, - } - if s.messageId == 0 { - s.messageId = uint32(rand.Intn(1<<8) + 1) - } - s.messages.SetBaseCap(16) - return s -} - -func (s *SignalCache) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) { - s.lock.Lock() - defer s.lock.Unlock() - - s.lastProcessedRemoteMessageId = lastProcessedRemoteMessageId -} - -func (s *SignalCache) Add(msg *livekit.Signalv2ServerMessage) *livekit.Signalv2ServerMessage { - if msg != nil { - s.AddBatch([]*livekit.Signalv2ServerMessage{msg}) - } - - return msg -} - -// SIGNALLING-V2-TODO: may not need this API -func (s *SignalCache) AddBatch(msgs []*livekit.Signalv2ServerMessage) { - s.lock.Lock() - defer s.lock.Unlock() - - for _, msg := range msgs { - msg.Sequencer = &livekit.Sequencer{ - MessageId: s.messageId, - } - s.messageId++ - - s.messages.PushBack(msg) - } -} - -func (s *SignalCache) Clear(till uint32) { - s.lock.Lock() - defer s.lock.Unlock() - - s.clearLocked(till) -} - -func (s *SignalCache) clearLocked(till uint32) { - for s.messages.Len() != 0 { - front := s.messages.Front() - if front.Sequencer.GetMessageId() > till { - break - } - s.messages.PopFront() - } -} - -func (s *SignalCache) GetFromFront() []*livekit.Signalv2ServerMessage { - s.lock.Lock() - defer s.lock.Unlock() - - return s.getFromFrontLocked() -} - -func (s *SignalCache) getFromFrontLocked() []*livekit.Signalv2ServerMessage { - var msgs []*livekit.Signalv2ServerMessage - for msg := range s.messages.Iter() { - clone := utils.CloneProto(msg) - clone.Sequencer.LastProcessedRemoteMessageId = s.lastProcessedRemoteMessageId - msgs = append(msgs, clone) - } - - return msgs -} - -func (s *SignalCache) ClearAndGetFrom(from uint32) []*livekit.Signalv2ServerMessage { - s.lock.Lock() - defer s.lock.Unlock() - - s.clearLocked(from - 1) - return s.getFromFrontLocked() -} diff --git a/pkg/rtc/signalling/signalcache_test.go b/pkg/rtc/signalling/signalcache_test.go deleted file mode 100644 index e7e2a669c..000000000 --- a/pkg/rtc/signalling/signalcache_test.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package signalling - -import ( - "testing" - - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - - "github.com/livekit/protocol/livekit" -) - -func TestSignalCache(t *testing.T) { - firstMessageId := uint32(10) - lastProcessedRemoteMessageId := uint32(2345) - cache := NewSignalCache(SignalCacheParams{ - FirstMessageId: firstMessageId, - }) - - inputMessages := []*livekit.Signalv2ServerMessage{ - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - // SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - } - - expectedOutputMessages := []*livekit.Signalv2ServerMessage{ - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - // SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId + 1, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId + 2, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId + 3, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - } - - cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId) - - // Add() - add one message at a time - for _, inputMessage := range inputMessages { - cache.Add(inputMessage) - } - - // get all messages in cache - outputMessages := cache.GetFromFront() - require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages)) - - // clear one and get again - cache.Clear(firstMessageId) - - outputMessages = cache.GetFromFront() - require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages)) - - // clearing some evicted messages should not clear anything - cache.Clear(firstMessageId) // firstMessageId has been cleared already at this point - - outputMessages = cache.GetFromFront() - require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages)) - - // clear some and get rest in one go - outputMessages = cache.ClearAndGetFrom(firstMessageId + 3) - require.Equal(t, 1, len(outputMessages)) - require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages)) - - // getting again should get the same messages again as they sill should in cache - outputMessages = cache.GetFromFront() - require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages)) - - // clearing all and getting should return nil - require.Nil(t, cache.ClearAndGetFrom(firstMessageId+uint32(len(inputMessages)))) - - // getting again should return nil as the cache is fully cleared above - require.Nil(t, cache.GetFromFront()) - - lastProcessedRemoteMessageId = 4567 - cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId) - - expectedOutputMessages = []*livekit.Signalv2ServerMessage{ - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId + 4, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - // SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId + 1 + 4, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId + 2 + 4, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - &livekit.Signalv2ServerMessage{ - Sequencer: &livekit.Sequencer{ - MessageId: firstMessageId + 3 + 4, - LastProcessedRemoteMessageId: lastProcessedRemoteMessageId, - }, - Message: &livekit.Signalv2ServerMessage_ConnectResponse{}, - }, - } - - // AddBatch() - add all messages at once - cache.AddBatch(inputMessages) - - // get all messages in cache - outputMessages = cache.GetFromFront() - require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages)) -} - -func compareProtoSlices(a []*livekit.Signalv2ServerMessage, b []*livekit.Signalv2ServerMessage) bool { - if len(a) != len(b) { - return false - } - - for i := 0; i < len(a); i++ { - if !proto.Equal(a[i], b[i]) { - return false - } - } - - return true -} diff --git a/pkg/rtc/signalling/signalfragment_test.go b/pkg/rtc/signalling/signalfragment_test.go deleted file mode 100644 index 5237a32df..000000000 --- a/pkg/rtc/signalling/signalfragment_test.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package signalling - -import ( - "testing" - - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" - - "github.com/livekit/protocol/livekit" -) - -func TestSignalFragment(t *testing.T) { - inputMessage := &livekit.Envelope{ - ServerMessages: []*livekit.Signalv2ServerMessage{ - { - Message: &livekit.Signalv2ServerMessage_ConnectResponse{ - ConnectResponse: &livekit.ConnectResponse{ - SifTrailer: []byte("abcdefghijklmnopqrstuvwxyz0123456789"), - }, - }, - }, - { - Message: &livekit.Signalv2ServerMessage_ConnectResponse{ - ConnectResponse: &livekit.ConnectResponse{ - SifTrailer: []byte("0123456789abcdefghijklmnopqrstuvwxyz0123456789"), - }, - }, - }, - { - Message: &livekit.Signalv2ServerMessage_ConnectResponse{ - ConnectResponse: &livekit.ConnectResponse{ - SifTrailer: []byte("ABCDEFGHIJKLMNOPQRSTabcdefghijklmnopqrstuvwxyz0123456789"), - }, - }, - }, - }, - } - - t.Run("no segmentation needed", func(t *testing.T) { - sr := NewSignalSegmenter(SignalSegmenterParams{ - MaxFragmentSize: 5_000_000, - }) - - marshalled, err := proto.Marshal(inputMessage) - require.NoError(t, err) - require.Nil(t, sr.Segment(marshalled)) - }) - - t.Run("segmentation + reassembly", func(t *testing.T) { - maxFragmentSize := 5 - sr := NewSignalSegmenter(SignalSegmenterParams{ - MaxFragmentSize: maxFragmentSize, - }) - - marshalled, err := proto.Marshal(inputMessage) - require.NoError(t, err) - - expectedNumFragments := (len(marshalled) + maxFragmentSize - 1) / maxFragmentSize - - fragments := sr.Segment(marshalled) - require.NotZero(t, len(fragments)) - require.Equal(t, uint32(len(marshalled)), fragments[0].TotalSize) - - rr := NewSignalReassembler(SignalReassemblerParams{}) - var reassembled []byte - for idx, fragment := range fragments { - require.Equal(t, uint32(idx+1), fragment.FragmentNumber) - require.NotZero(t, fragment.FragmentSize) - require.Equal(t, uint32(expectedNumFragments), fragment.NumFragments) - require.Equal(t, fragment.FragmentSize, uint32(len(fragment.Data))) - - reassembled = rr.Reassemble(fragment) - } - require.Equal(t, marshalled, reassembled) - }) - - t.Run("runt", func(t *testing.T) { - maxFragmentSize := 5 - sr := NewSignalSegmenter(SignalSegmenterParams{ - MaxFragmentSize: maxFragmentSize, - }) - - marshalled, err := proto.Marshal(inputMessage) - require.NoError(t, err) - - fragments := sr.Segment(marshalled) - - rr := NewSignalReassembler(SignalReassemblerParams{}) - var reassembled []byte - for idx, fragment := range fragments { - // do not send one packet into re-assembly initially, re-assembly should not succeed - if idx == 0 { - continue - } - - reassembled = rr.Reassemble(fragment) - } - require.Zero(t, len(reassembled)) - - // submit 1st fragment and ensure reassembly completes - reassembled = rr.Reassemble(fragments[0]) - require.Equal(t, marshalled, reassembled) - }) - - t.Run("corrupted", func(t *testing.T) { - maxFragmentSize := 5 - sr := NewSignalSegmenter(SignalSegmenterParams{ - MaxFragmentSize: maxFragmentSize, - }) - - marshalled, err := proto.Marshal(inputMessage) - require.NoError(t, err) - - fragments := sr.Segment(marshalled) - - rr := NewSignalReassembler(SignalReassemblerParams{}) - var reassembled []byte - for idx, fragment := range fragments { - // corrupt a fragment, re-assembly should fail - if idx == 0 { - fragment.FragmentSize += 1 - } - - reassembled = rr.Reassemble(fragment) - } - require.Zero(t, len(reassembled)) - }) -} diff --git a/pkg/rtc/signalling/signalhandler.go b/pkg/rtc/signalling/signalhandler.go index 63ef6bbdd..9f4b445e1 100644 --- a/pkg/rtc/signalling/signalhandler.go +++ b/pkg/rtc/signalling/signalhandler.go @@ -19,6 +19,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + protosignalling "github.com/livekit/protocol/signalling" "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -57,13 +58,13 @@ func (s *signalhandler) HandleRequest(msg proto.Message) error { switch msg := req.GetMessage().(type) { case *livekit.SignalRequest_Offer: - s.params.Participant.HandleOffer(FromProtoSessionDescription(msg.Offer)) + s.params.Participant.HandleOffer(protosignalling.FromProtoSessionDescription(msg.Offer)) case *livekit.SignalRequest_Answer: - s.params.Participant.HandleAnswer(FromProtoSessionDescription(msg.Answer)) + s.params.Participant.HandleAnswer(protosignalling.FromProtoSessionDescription(msg.Answer)) case *livekit.SignalRequest_Trickle: - candidateInit, err := FromProtoTrickle(msg.Trickle) + candidateInit, err := protosignalling.FromProtoTrickle(msg.Trickle) if err != nil { s.params.Logger.Warnw("could not decode trickle", err) return err diff --git a/pkg/rtc/signalling/signalhandlerv2.go b/pkg/rtc/signalling/signalhandlerv2.go index 5f2e19783..b0f722da3 100644 --- a/pkg/rtc/signalling/signalhandlerv2.go +++ b/pkg/rtc/signalling/signalhandlerv2.go @@ -19,6 +19,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + protosignalling "github.com/livekit/protocol/signalling" "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -41,13 +42,13 @@ type signalhandlerv2 struct { // SIGNALLING-V2-TODO: have to set this properly for `ConnectRequest` coming via sync HTTP path lastProcessedRemoteMessageId atomic.Uint32 - signalReassembler *SignalReassembler + signalReassembler *protosignalling.SignalReassembler } func NewSignalHandlerv2(params SignalHandlerv2Params) ParticipantSignalHandler { return &signalhandlerv2{ params: params, - signalReassembler: NewSignalReassembler(SignalReassemblerParams{ + signalReassembler: protosignalling.NewSignalReassembler(protosignalling.SignalReassemblerParams{ Logger: params.Logger, }), } @@ -69,6 +70,7 @@ func (s *signalhandlerv2) HandleRequest(msg proto.Message) error { switch msg := req.GetMessage().(type) { case *livekit.Signalv2WireMessage_Envelope: for _, clientMessage := range msg.Envelope.ClientMessages { + // SIGNAL-V2-TODO: cannot do this comparison for very first message if clientMessage.Sequencer.MessageId != s.lastProcessedRemoteMessageId.Load()+1 { s.params.Logger.Infow( "gap in message stream", @@ -77,15 +79,15 @@ func (s *signalhandlerv2) HandleRequest(msg proto.Message) error { ) } - // SIGNALLING-V2-TODO: process messages switch payload := clientMessage.GetMessage().(type) { case *livekit.Signalv2ClientMessage_PublisherSdp: - s.params.Participant.HandleOffer(FromProtoSessionDescription(payload.PublisherSdp)) + s.params.Participant.HandleOffer(protosignalling.FromProtoSessionDescription(payload.PublisherSdp)) case *livekit.Signalv2ClientMessage_SubscriberSdp: - s.params.Participant.HandleAnswer(FromProtoSessionDescription(payload.SubscriberSdp)) + s.params.Participant.HandleAnswer(protosignalling.FromProtoSessionDescription(payload.SubscriberSdp)) } + s.lastProcessedRemoteMessageId.Store(clientMessage.Sequencer.MessageId) s.params.Signalling.AckMessageId(clientMessage.Sequencer.LastProcessedRemoteMessageId) s.params.Signalling.SetLastProcessedRemoteMessageId(clientMessage.Sequencer.MessageId) } diff --git a/pkg/rtc/signalling/signallerv2async.go b/pkg/rtc/signalling/signallerv2async.go index 7ea150516..578a1853d 100644 --- a/pkg/rtc/signalling/signallerv2async.go +++ b/pkg/rtc/signalling/signallerv2async.go @@ -19,6 +19,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + protosignalling "github.com/livekit/protocol/signalling" "github.com/livekit/protocol/utils" "github.com/livekit/psrpc" @@ -40,14 +41,14 @@ type signallerv2Async struct { *signallerAsyncBase - signalSegmenter *SignalSegmenter + signalSegmenter *protosignalling.SignalSegmenter } func NewSignallerv2Async(params Signallerv2AsyncParams) ParticipantSignaller { return &signallerv2Async{ params: params, signallerAsyncBase: newSignallerAsyncBase(signallerAsyncBaseParams{Logger: params.Logger}), - signalSegmenter: NewSignalSegmenter(SignalSegmenterParams{ + signalSegmenter: protosignalling.NewSignalSegmenter(protosignalling.SignalSegmenterParams{ Logger: params.Logger, }), } diff --git a/pkg/rtc/signalling/signallingv2.go b/pkg/rtc/signalling/signallingv2.go index ce127094a..9df2b5b3d 100644 --- a/pkg/rtc/signalling/signallingv2.go +++ b/pkg/rtc/signalling/signallingv2.go @@ -17,6 +17,7 @@ package signalling import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + protosignalling "github.com/livekit/protocol/signalling" "google.golang.org/protobuf/proto" ) @@ -31,13 +32,13 @@ type signallingv2 struct { params Signallingv2Params - signalCache *SignalCache + signalCache *protosignalling.Signalv2Cache } func NewSignallingv2(params Signallingv2Params) ParticipantSignalling { return &signallingv2{ params: params, - signalCache: NewSignalCache(SignalCacheParams{ + signalCache: protosignalling.NewSignalv2Cache(protosignalling.Signalv2CacheParams{ Logger: params.Logger, }), } diff --git a/pkg/rtc/signalling/signalreassembler.go b/pkg/rtc/signalling/signalreassembler.go deleted file mode 100644 index 54fd2beb7..000000000 --- a/pkg/rtc/signalling/signalreassembler.go +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package signalling - -import ( - "sync" - "time" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" - "go.uber.org/zap/zapcore" -) - -const ( - reassemblerTimeout = time.Minute -) - -type reassembly struct { - packetId uint32 - startedAt time.Time - fragments []*livekit.Fragment - isCorrupted bool - tqi *utils.TimeoutQueueItem[*reassembly] -} - -func (r *reassembly) MarshalLogObject(e zapcore.ObjectEncoder) error { - if r == nil { - return nil - } - - e.AddUint32("packetId", r.packetId) - e.AddTime("startAt", r.startedAt) - e.AddDuration("age", time.Since(r.startedAt)) - - expectedNumberOfFragments := len(r.fragments) - expectedTotalSize := uint32(0) - availableSize := uint32(0) - var availableFragments []uint32 - for _, fragment := range r.fragments { - if fragment == nil { - continue - } - - expectedTotalSize = fragment.TotalSize - availableSize += fragment.FragmentSize - availableFragments = append(availableFragments, fragment.FragmentNumber) - } - e.AddInt("expectedNumberOfFragments", expectedNumberOfFragments) - e.AddUint32("expectedTotalSize", expectedTotalSize) - e.AddUint32("availableSize", availableSize) - e.AddArray("availableFragments", logger.Uint32Slice(availableFragments)) - - e.AddBool("isCorrupted", r.isCorrupted) - return nil -} - -// ------------------------------------------------ - -type SignalReassemblerParams struct { - Logger logger.Logger -} - -type SignalReassembler struct { - params SignalReassemblerParams - - lock sync.Mutex - reassemblies map[uint32]*reassembly - - timeoutQueue utils.TimeoutQueue[*reassembly] -} - -func NewSignalReassembler(params SignalReassemblerParams) *SignalReassembler { - return &SignalReassembler{ - params: params, - reassemblies: make(map[uint32]*reassembly), - } -} - -func (s *SignalReassembler) Reassemble(fragment *livekit.Fragment) []byte { - s.lock.Lock() - defer s.lock.Unlock() - - re, ok := s.reassemblies[fragment.PacketId] - if !ok { - re = &reassembly{ - packetId: fragment.PacketId, - startedAt: time.Now(), - fragments: make([]*livekit.Fragment, fragment.NumFragments), - } - re.tqi = &utils.TimeoutQueueItem[*reassembly]{Value: re} - - s.reassemblies[fragment.PacketId] = re - } - if int(fragment.FragmentNumber) <= len(re.fragments) { - if int(fragment.FragmentSize) != len(fragment.Data) { - re.isCorrupted = true // runt packet, data size of blob does not match fragment size - } else { - re.fragments[fragment.FragmentNumber-1] = fragment - } - } else { - re.isCorrupted = true - } - - if re.isCorrupted { - return nil - } - - // try to reassemble - expectedTotalSize := uint32(0) - totalSize := 0 - for _, fr := range re.fragments { - if fr == nil { - return nil // not received all fragments of packet yet - } - - expectedTotalSize = fr.TotalSize // can read this from any fragment of packet - totalSize += len(fr.Data) - } - if expectedTotalSize != 0 && uint32(totalSize) != expectedTotalSize { - re.isCorrupted = true - return nil - } - - data := make([]byte, 0, expectedTotalSize) - for _, fr := range re.fragments { - data = append(data, fr.Data...) - } - delete(s.reassemblies, re.packetId) // fully re-assembled, can be deleted from cache - return data -} - -func (s *SignalReassembler) Prune() { - for it := s.timeoutQueue.IterateRemoveAfter(reassemblerTimeout); it.Next(); { - re := it.Item().Value - s.params.Logger.Infow("pruning stale reassembly packet", "reassembly", re) - - s.lock.Lock() - delete(s.reassemblies, re.packetId) - s.lock.Unlock() - } -} - -// SIGNALLING-V2-TODO: maybe do a prune worker? will need a way to stop/clean up the goroutine then diff --git a/pkg/rtc/signalling/signalsegmenter.go b/pkg/rtc/signalling/signalsegmenter.go deleted file mode 100644 index d187c6acf..000000000 --- a/pkg/rtc/signalling/signalsegmenter.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package signalling - -import ( - "math/rand" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "go.uber.org/atomic" -) - -const ( - defaultMaxFragmentSize = 8192 -) - -type SignalSegmenterParams struct { - Logger logger.Logger - MaxFragmentSize int - FirstPacketId uint32 // should be used for testing only -} - -type SignalSegmenter struct { - params SignalSegmenterParams - - packetId atomic.Uint32 -} - -func NewSignalSegmenter(params SignalSegmenterParams) *SignalSegmenter { - s := &SignalSegmenter{ - params: params, - } - if s.params.MaxFragmentSize == 0 { - s.params.MaxFragmentSize = defaultMaxFragmentSize - } - s.packetId.Store(params.FirstPacketId) - if s.packetId.Load() == 0 { - s.packetId.Store(uint32(rand.Intn(1<<8) + 1)) - } - return s -} - -func (s *SignalSegmenter) Segment(data []byte) []*livekit.Fragment { - if len(data) <= s.params.MaxFragmentSize { - return nil - } - - var fragments []*livekit.Fragment - numFragments := uint32((len(data) + s.params.MaxFragmentSize - 1) / s.params.MaxFragmentSize) - fragmentNumber := uint32(1) - consumed := 0 - packetId := s.packetId.Inc() - for len(data[consumed:]) != 0 { - fragmentSize := min(len(data[consumed:]), s.params.MaxFragmentSize) - fragment := &livekit.Fragment{ - PacketId: packetId, - FragmentNumber: fragmentNumber, - NumFragments: numFragments, - FragmentSize: uint32(fragmentSize), - TotalSize: uint32(len(data)), - Data: data[consumed : consumed+fragmentSize], - } - fragments = append(fragments, fragment) - fragmentNumber++ - consumed += fragmentSize - } - - return fragments -} diff --git a/pkg/rtc/signalling/utils.go b/pkg/rtc/signalling/utils.go deleted file mode 100644 index bbe71fdbf..000000000 --- a/pkg/rtc/signalling/utils.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package signalling - -import ( - "encoding/json" - - "github.com/livekit/protocol/livekit" - "github.com/pion/webrtc/v4" -) - -func ToProtoSessionDescription(sd webrtc.SessionDescription, id uint32) *livekit.SessionDescription { - return &livekit.SessionDescription{ - Type: sd.Type.String(), - Sdp: sd.SDP, - Id: id, - } -} - -func FromProtoSessionDescription(sd *livekit.SessionDescription) (webrtc.SessionDescription, uint32) { - var sdType webrtc.SDPType - switch sd.Type { - case webrtc.SDPTypeOffer.String(): - sdType = webrtc.SDPTypeOffer - case webrtc.SDPTypeAnswer.String(): - sdType = webrtc.SDPTypeAnswer - case webrtc.SDPTypePranswer.String(): - sdType = webrtc.SDPTypePranswer - case webrtc.SDPTypeRollback.String(): - sdType = webrtc.SDPTypeRollback - } - return webrtc.SessionDescription{ - Type: sdType, - SDP: sd.Sdp, - }, sd.Id -} - -func ToProtoTrickle(candidateInit webrtc.ICECandidateInit, target livekit.SignalTarget, final bool) *livekit.TrickleRequest { - data, _ := json.Marshal(candidateInit) - return &livekit.TrickleRequest{ - CandidateInit: string(data), - Target: target, - Final: final, - } -} - -func FromProtoTrickle(trickle *livekit.TrickleRequest) (webrtc.ICECandidateInit, error) { - ci := webrtc.ICECandidateInit{} - err := json.Unmarshal([]byte(trickle.CandidateInit), &ci) - if err != nil { - return webrtc.ICECandidateInit{}, err - } - return ci, nil -} diff --git a/test/client/client.go b/test/client/client.go index acfd12916..6d479aba7 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -39,9 +39,9 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/signalling" "github.com/livekit/livekit-server/pkg/rtc" - "github.com/livekit/livekit-server/pkg/rtc/signalling" "github.com/livekit/livekit-server/pkg/rtc/transport/transportfakes" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu/buffer"