Use signalling utils from protocol (#3807)

* Use signalling utils from protocol

* staticcheck
This commit is contained in:
Raja Subramanian
2025-07-21 18:15:06 +05:30
committed by GitHub
parent f5fc82d344
commit fffc2ac090
16 changed files with 28 additions and 767 deletions
+1 -1
View File
@@ -23,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
github.com/livekit/protocol v1.39.4-0.20250721063419-93319bf9e30a
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.39.4-0.20250721063419-93319bf9e30a h1:gJaHYMRz7ZJCT1qLIfReHEL27zD8L+pRVELMjAatdJI=
github.com/livekit/protocol v1.39.4-0.20250721063419-93319bf9e30a/go.mod h1:6l+zgRJZ9sY96LM7DA3EMcKQC5zsVyZVP73c+9wgvCA=
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f h1:Cwe38+/ld3r5dnNmIZSALSoZPWNEMeYPZIi/qjpplLo=
github.com/livekit/protocol v1.39.4-0.20250721114233-52633eee694f/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c h1:WwEr0YBejYbKzk8LSaO9h8h0G9MnE7shyDu8yXQWmEc=
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c/go.mod h1:kmD+AZPkWu0MaXIMv57jhNlbiSZZ/Jx4bzlxBDVmJes=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
+1 -1
View File
@@ -33,13 +33,13 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/observability/roomobs"
lksdp "github.com/livekit/protocol/sdp"
"github.com/livekit/protocol/signalling"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/routing/routingfakes"
"github.com/livekit/livekit-server/pkg/rtc/signalling"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
"github.com/livekit/livekit-server/pkg/testutils"
+4 -4
View File
@@ -21,9 +21,9 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/signalling"
"github.com/livekit/livekit-server/pkg/rtc/types"
"google.golang.org/protobuf/proto"
@@ -222,7 +222,7 @@ func (p *ParticipantImpl) sendICECandidate(ic *webrtc.ICECandidate, target livek
return nil
}
trickle := signalling.ToProtoTrickle(prevIC.ToJSON(), target, ic == nil)
trickle := protosignalling.ToProtoTrickle(prevIC.ToJSON(), target, ic == nil)
p.params.Logger.Debugw("sending ICE candidate", "transport", target, "trickle", logger.Proto(trickle))
return p.signaller.WriteMessage(p.signalling.SignalICECandidate(trickle))
@@ -294,11 +294,11 @@ func (p *ParticipantImpl) sendLeaveRequest(
}
func (p *ParticipantImpl) sendSdpAnswer(answer webrtc.SessionDescription, answerId uint32) error {
return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(signalling.ToProtoSessionDescription(answer, answerId)))
return p.signaller.WriteMessage(p.signalling.SignalSdpAnswer(protosignalling.ToProtoSessionDescription(answer, answerId)))
}
func (p *ParticipantImpl) sendSdpOffer(offer webrtc.SessionDescription, offerId uint32) error {
return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(signalling.ToProtoSessionDescription(offer, offerId)))
return p.signaller.WriteMessage(p.signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(offer, offerId)))
}
func (p *ParticipantImpl) sendStreamStateUpdate(streamStateUpdate *livekit.StreamStateUpdate) error {
+2 -2
View File
@@ -32,6 +32,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/observability/roomobs"
"github.com/livekit/protocol/rpc"
protosignalling "github.com/livekit/protocol/signalling"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/psrpc"
@@ -39,7 +40,6 @@ import (
"github.com/livekit/livekit-server/pkg/agent"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/signalling"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -797,7 +797,7 @@ func (r *Room) Joinv2(
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "get_subscriber_offer").Add(1)
return nil, err
}
connectResponse.SubscriberSdp = signalling.ToProtoSessionDescription(offer, 0) // SIGNALLING-V2-TODO - need to proper offerId?
connectResponse.SubscriberSdp = protosignalling.ToProtoSessionDescription(offer, 0) // SIGNALLING-V2-TODO - need to proper offerId?
// for sync response, this does not actually send, only generates messageId and caches the message
if err := participant.SendConnectResponse(connectResponse); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
-125
View File
@@ -1,125 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"math/rand"
"sync"
"github.com/gammazero/deque"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
type SignalCacheParams struct {
Logger logger.Logger
FirstMessageId uint32 // should be used for testing only
}
type SignalCache struct {
params SignalCacheParams
lock sync.Mutex
messageId uint32
lastProcessedRemoteMessageId uint32
messages deque.Deque[*livekit.Signalv2ServerMessage]
}
func NewSignalCache(params SignalCacheParams) *SignalCache {
s := &SignalCache{
params: params,
messageId: params.FirstMessageId,
}
if s.messageId == 0 {
s.messageId = uint32(rand.Intn(1<<8) + 1)
}
s.messages.SetBaseCap(16)
return s
}
func (s *SignalCache) SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId uint32) {
s.lock.Lock()
defer s.lock.Unlock()
s.lastProcessedRemoteMessageId = lastProcessedRemoteMessageId
}
func (s *SignalCache) Add(msg *livekit.Signalv2ServerMessage) *livekit.Signalv2ServerMessage {
if msg != nil {
s.AddBatch([]*livekit.Signalv2ServerMessage{msg})
}
return msg
}
// SIGNALLING-V2-TODO: may not need this API
func (s *SignalCache) AddBatch(msgs []*livekit.Signalv2ServerMessage) {
s.lock.Lock()
defer s.lock.Unlock()
for _, msg := range msgs {
msg.Sequencer = &livekit.Sequencer{
MessageId: s.messageId,
}
s.messageId++
s.messages.PushBack(msg)
}
}
func (s *SignalCache) Clear(till uint32) {
s.lock.Lock()
defer s.lock.Unlock()
s.clearLocked(till)
}
func (s *SignalCache) clearLocked(till uint32) {
for s.messages.Len() != 0 {
front := s.messages.Front()
if front.Sequencer.GetMessageId() > till {
break
}
s.messages.PopFront()
}
}
func (s *SignalCache) GetFromFront() []*livekit.Signalv2ServerMessage {
s.lock.Lock()
defer s.lock.Unlock()
return s.getFromFrontLocked()
}
func (s *SignalCache) getFromFrontLocked() []*livekit.Signalv2ServerMessage {
var msgs []*livekit.Signalv2ServerMessage
for msg := range s.messages.Iter() {
clone := utils.CloneProto(msg)
clone.Sequencer.LastProcessedRemoteMessageId = s.lastProcessedRemoteMessageId
msgs = append(msgs, clone)
}
return msgs
}
func (s *SignalCache) ClearAndGetFrom(from uint32) []*livekit.Signalv2ServerMessage {
s.lock.Lock()
defer s.lock.Unlock()
s.clearLocked(from - 1)
return s.getFromFrontLocked()
}
-174
View File
@@ -1,174 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
)
func TestSignalCache(t *testing.T) {
firstMessageId := uint32(10)
lastProcessedRemoteMessageId := uint32(2345)
cache := NewSignalCache(SignalCacheParams{
FirstMessageId: firstMessageId,
})
inputMessages := []*livekit.Signalv2ServerMessage{
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
// SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
}
expectedOutputMessages := []*livekit.Signalv2ServerMessage{
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
// SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 1,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 2,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 3,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
}
cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
// Add() - add one message at a time
for _, inputMessage := range inputMessages {
cache.Add(inputMessage)
}
// get all messages in cache
outputMessages := cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages))
// clear one and get again
cache.Clear(firstMessageId)
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages))
// clearing some evicted messages should not clear anything
cache.Clear(firstMessageId) // firstMessageId has been cleared already at this point
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages[1:], outputMessages))
// clear some and get rest in one go
outputMessages = cache.ClearAndGetFrom(firstMessageId + 3)
require.Equal(t, 1, len(outputMessages))
require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages))
// getting again should get the same messages again as they sill should in cache
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages[3:], outputMessages))
// clearing all and getting should return nil
require.Nil(t, cache.ClearAndGetFrom(firstMessageId+uint32(len(inputMessages))))
// getting again should return nil as the cache is fully cleared above
require.Nil(t, cache.GetFromFront())
lastProcessedRemoteMessageId = 4567
cache.SetLastProcessedRemoteMessageId(lastProcessedRemoteMessageId)
expectedOutputMessages = []*livekit.Signalv2ServerMessage{
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
// SIGNALLING-V2-TODO: replace with other kinds of messages when more types are added
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 1 + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 2 + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
&livekit.Signalv2ServerMessage{
Sequencer: &livekit.Sequencer{
MessageId: firstMessageId + 3 + 4,
LastProcessedRemoteMessageId: lastProcessedRemoteMessageId,
},
Message: &livekit.Signalv2ServerMessage_ConnectResponse{},
},
}
// AddBatch() - add all messages at once
cache.AddBatch(inputMessages)
// get all messages in cache
outputMessages = cache.GetFromFront()
require.True(t, compareProtoSlices(expectedOutputMessages, outputMessages))
}
func compareProtoSlices(a []*livekit.Signalv2ServerMessage, b []*livekit.Signalv2ServerMessage) bool {
if len(a) != len(b) {
return false
}
for i := 0; i < len(a); i++ {
if !proto.Equal(a[i], b[i]) {
return false
}
}
return true
}
-142
View File
@@ -1,142 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
)
func TestSignalFragment(t *testing.T) {
inputMessage := &livekit.Envelope{
ServerMessages: []*livekit.Signalv2ServerMessage{
{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
ConnectResponse: &livekit.ConnectResponse{
SifTrailer: []byte("abcdefghijklmnopqrstuvwxyz0123456789"),
},
},
},
{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
ConnectResponse: &livekit.ConnectResponse{
SifTrailer: []byte("0123456789abcdefghijklmnopqrstuvwxyz0123456789"),
},
},
},
{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
ConnectResponse: &livekit.ConnectResponse{
SifTrailer: []byte("ABCDEFGHIJKLMNOPQRSTabcdefghijklmnopqrstuvwxyz0123456789"),
},
},
},
},
}
t.Run("no segmentation needed", func(t *testing.T) {
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: 5_000_000,
})
marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)
require.Nil(t, sr.Segment(marshalled))
})
t.Run("segmentation + reassembly", func(t *testing.T) {
maxFragmentSize := 5
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: maxFragmentSize,
})
marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)
expectedNumFragments := (len(marshalled) + maxFragmentSize - 1) / maxFragmentSize
fragments := sr.Segment(marshalled)
require.NotZero(t, len(fragments))
require.Equal(t, uint32(len(marshalled)), fragments[0].TotalSize)
rr := NewSignalReassembler(SignalReassemblerParams{})
var reassembled []byte
for idx, fragment := range fragments {
require.Equal(t, uint32(idx+1), fragment.FragmentNumber)
require.NotZero(t, fragment.FragmentSize)
require.Equal(t, uint32(expectedNumFragments), fragment.NumFragments)
require.Equal(t, fragment.FragmentSize, uint32(len(fragment.Data)))
reassembled = rr.Reassemble(fragment)
}
require.Equal(t, marshalled, reassembled)
})
t.Run("runt", func(t *testing.T) {
maxFragmentSize := 5
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: maxFragmentSize,
})
marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)
fragments := sr.Segment(marshalled)
rr := NewSignalReassembler(SignalReassemblerParams{})
var reassembled []byte
for idx, fragment := range fragments {
// do not send one packet into re-assembly initially, re-assembly should not succeed
if idx == 0 {
continue
}
reassembled = rr.Reassemble(fragment)
}
require.Zero(t, len(reassembled))
// submit 1st fragment and ensure reassembly completes
reassembled = rr.Reassemble(fragments[0])
require.Equal(t, marshalled, reassembled)
})
t.Run("corrupted", func(t *testing.T) {
maxFragmentSize := 5
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: maxFragmentSize,
})
marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)
fragments := sr.Segment(marshalled)
rr := NewSignalReassembler(SignalReassemblerParams{})
var reassembled []byte
for idx, fragment := range fragments {
// corrupt a fragment, re-assembly should fail
if idx == 0 {
fragment.FragmentSize += 1
}
reassembled = rr.Reassemble(fragment)
}
require.Zero(t, len(reassembled))
})
}
+4 -3
View File
@@ -19,6 +19,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -57,13 +58,13 @@ func (s *signalhandler) HandleRequest(msg proto.Message) error {
switch msg := req.GetMessage().(type) {
case *livekit.SignalRequest_Offer:
s.params.Participant.HandleOffer(FromProtoSessionDescription(msg.Offer))
s.params.Participant.HandleOffer(protosignalling.FromProtoSessionDescription(msg.Offer))
case *livekit.SignalRequest_Answer:
s.params.Participant.HandleAnswer(FromProtoSessionDescription(msg.Answer))
s.params.Participant.HandleAnswer(protosignalling.FromProtoSessionDescription(msg.Answer))
case *livekit.SignalRequest_Trickle:
candidateInit, err := FromProtoTrickle(msg.Trickle)
candidateInit, err := protosignalling.FromProtoTrickle(msg.Trickle)
if err != nil {
s.params.Logger.Warnw("could not decode trickle", err)
return err
+7 -5
View File
@@ -19,6 +19,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -41,13 +42,13 @@ type signalhandlerv2 struct {
// SIGNALLING-V2-TODO: have to set this properly for `ConnectRequest` coming via sync HTTP path
lastProcessedRemoteMessageId atomic.Uint32
signalReassembler *SignalReassembler
signalReassembler *protosignalling.SignalReassembler
}
func NewSignalHandlerv2(params SignalHandlerv2Params) ParticipantSignalHandler {
return &signalhandlerv2{
params: params,
signalReassembler: NewSignalReassembler(SignalReassemblerParams{
signalReassembler: protosignalling.NewSignalReassembler(protosignalling.SignalReassemblerParams{
Logger: params.Logger,
}),
}
@@ -69,6 +70,7 @@ func (s *signalhandlerv2) HandleRequest(msg proto.Message) error {
switch msg := req.GetMessage().(type) {
case *livekit.Signalv2WireMessage_Envelope:
for _, clientMessage := range msg.Envelope.ClientMessages {
// SIGNAL-V2-TODO: cannot do this comparison for very first message
if clientMessage.Sequencer.MessageId != s.lastProcessedRemoteMessageId.Load()+1 {
s.params.Logger.Infow(
"gap in message stream",
@@ -77,15 +79,15 @@ func (s *signalhandlerv2) HandleRequest(msg proto.Message) error {
)
}
// SIGNALLING-V2-TODO: process messages
switch payload := clientMessage.GetMessage().(type) {
case *livekit.Signalv2ClientMessage_PublisherSdp:
s.params.Participant.HandleOffer(FromProtoSessionDescription(payload.PublisherSdp))
s.params.Participant.HandleOffer(protosignalling.FromProtoSessionDescription(payload.PublisherSdp))
case *livekit.Signalv2ClientMessage_SubscriberSdp:
s.params.Participant.HandleAnswer(FromProtoSessionDescription(payload.SubscriberSdp))
s.params.Participant.HandleAnswer(protosignalling.FromProtoSessionDescription(payload.SubscriberSdp))
}
s.lastProcessedRemoteMessageId.Store(clientMessage.Sequencer.MessageId)
s.params.Signalling.AckMessageId(clientMessage.Sequencer.LastProcessedRemoteMessageId)
s.params.Signalling.SetLastProcessedRemoteMessageId(clientMessage.Sequencer.MessageId)
}
+3 -2
View File
@@ -19,6 +19,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
@@ -40,14 +41,14 @@ type signallerv2Async struct {
*signallerAsyncBase
signalSegmenter *SignalSegmenter
signalSegmenter *protosignalling.SignalSegmenter
}
func NewSignallerv2Async(params Signallerv2AsyncParams) ParticipantSignaller {
return &signallerv2Async{
params: params,
signallerAsyncBase: newSignallerAsyncBase(signallerAsyncBaseParams{Logger: params.Logger}),
signalSegmenter: NewSignalSegmenter(SignalSegmenterParams{
signalSegmenter: protosignalling.NewSignalSegmenter(protosignalling.SignalSegmenterParams{
Logger: params.Logger,
}),
}
+3 -2
View File
@@ -17,6 +17,7 @@ package signalling
import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"google.golang.org/protobuf/proto"
)
@@ -31,13 +32,13 @@ type signallingv2 struct {
params Signallingv2Params
signalCache *SignalCache
signalCache *protosignalling.Signalv2Cache
}
func NewSignallingv2(params Signallingv2Params) ParticipantSignalling {
return &signallingv2{
params: params,
signalCache: NewSignalCache(SignalCacheParams{
signalCache: protosignalling.NewSignalv2Cache(protosignalling.Signalv2CacheParams{
Logger: params.Logger,
}),
}
-156
View File
@@ -1,156 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"sync"
"time"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"go.uber.org/zap/zapcore"
)
const (
reassemblerTimeout = time.Minute
)
type reassembly struct {
packetId uint32
startedAt time.Time
fragments []*livekit.Fragment
isCorrupted bool
tqi *utils.TimeoutQueueItem[*reassembly]
}
func (r *reassembly) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
e.AddUint32("packetId", r.packetId)
e.AddTime("startAt", r.startedAt)
e.AddDuration("age", time.Since(r.startedAt))
expectedNumberOfFragments := len(r.fragments)
expectedTotalSize := uint32(0)
availableSize := uint32(0)
var availableFragments []uint32
for _, fragment := range r.fragments {
if fragment == nil {
continue
}
expectedTotalSize = fragment.TotalSize
availableSize += fragment.FragmentSize
availableFragments = append(availableFragments, fragment.FragmentNumber)
}
e.AddInt("expectedNumberOfFragments", expectedNumberOfFragments)
e.AddUint32("expectedTotalSize", expectedTotalSize)
e.AddUint32("availableSize", availableSize)
e.AddArray("availableFragments", logger.Uint32Slice(availableFragments))
e.AddBool("isCorrupted", r.isCorrupted)
return nil
}
// ------------------------------------------------
type SignalReassemblerParams struct {
Logger logger.Logger
}
type SignalReassembler struct {
params SignalReassemblerParams
lock sync.Mutex
reassemblies map[uint32]*reassembly
timeoutQueue utils.TimeoutQueue[*reassembly]
}
func NewSignalReassembler(params SignalReassemblerParams) *SignalReassembler {
return &SignalReassembler{
params: params,
reassemblies: make(map[uint32]*reassembly),
}
}
func (s *SignalReassembler) Reassemble(fragment *livekit.Fragment) []byte {
s.lock.Lock()
defer s.lock.Unlock()
re, ok := s.reassemblies[fragment.PacketId]
if !ok {
re = &reassembly{
packetId: fragment.PacketId,
startedAt: time.Now(),
fragments: make([]*livekit.Fragment, fragment.NumFragments),
}
re.tqi = &utils.TimeoutQueueItem[*reassembly]{Value: re}
s.reassemblies[fragment.PacketId] = re
}
if int(fragment.FragmentNumber) <= len(re.fragments) {
if int(fragment.FragmentSize) != len(fragment.Data) {
re.isCorrupted = true // runt packet, data size of blob does not match fragment size
} else {
re.fragments[fragment.FragmentNumber-1] = fragment
}
} else {
re.isCorrupted = true
}
if re.isCorrupted {
return nil
}
// try to reassemble
expectedTotalSize := uint32(0)
totalSize := 0
for _, fr := range re.fragments {
if fr == nil {
return nil // not received all fragments of packet yet
}
expectedTotalSize = fr.TotalSize // can read this from any fragment of packet
totalSize += len(fr.Data)
}
if expectedTotalSize != 0 && uint32(totalSize) != expectedTotalSize {
re.isCorrupted = true
return nil
}
data := make([]byte, 0, expectedTotalSize)
for _, fr := range re.fragments {
data = append(data, fr.Data...)
}
delete(s.reassemblies, re.packetId) // fully re-assembled, can be deleted from cache
return data
}
func (s *SignalReassembler) Prune() {
for it := s.timeoutQueue.IterateRemoveAfter(reassemblerTimeout); it.Next(); {
re := it.Item().Value
s.params.Logger.Infow("pruning stale reassembly packet", "reassembly", re)
s.lock.Lock()
delete(s.reassemblies, re.packetId)
s.lock.Unlock()
}
}
// SIGNALLING-V2-TODO: maybe do a prune worker? will need a way to stop/clean up the goroutine then
-81
View File
@@ -1,81 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"math/rand"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"go.uber.org/atomic"
)
const (
defaultMaxFragmentSize = 8192
)
type SignalSegmenterParams struct {
Logger logger.Logger
MaxFragmentSize int
FirstPacketId uint32 // should be used for testing only
}
type SignalSegmenter struct {
params SignalSegmenterParams
packetId atomic.Uint32
}
func NewSignalSegmenter(params SignalSegmenterParams) *SignalSegmenter {
s := &SignalSegmenter{
params: params,
}
if s.params.MaxFragmentSize == 0 {
s.params.MaxFragmentSize = defaultMaxFragmentSize
}
s.packetId.Store(params.FirstPacketId)
if s.packetId.Load() == 0 {
s.packetId.Store(uint32(rand.Intn(1<<8) + 1))
}
return s
}
func (s *SignalSegmenter) Segment(data []byte) []*livekit.Fragment {
if len(data) <= s.params.MaxFragmentSize {
return nil
}
var fragments []*livekit.Fragment
numFragments := uint32((len(data) + s.params.MaxFragmentSize - 1) / s.params.MaxFragmentSize)
fragmentNumber := uint32(1)
consumed := 0
packetId := s.packetId.Inc()
for len(data[consumed:]) != 0 {
fragmentSize := min(len(data[consumed:]), s.params.MaxFragmentSize)
fragment := &livekit.Fragment{
PacketId: packetId,
FragmentNumber: fragmentNumber,
NumFragments: numFragments,
FragmentSize: uint32(fragmentSize),
TotalSize: uint32(len(data)),
Data: data[consumed : consumed+fragmentSize],
}
fragments = append(fragments, fragment)
fragmentNumber++
consumed += fragmentSize
}
return fragments
}
-66
View File
@@ -1,66 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signalling
import (
"encoding/json"
"github.com/livekit/protocol/livekit"
"github.com/pion/webrtc/v4"
)
func ToProtoSessionDescription(sd webrtc.SessionDescription, id uint32) *livekit.SessionDescription {
return &livekit.SessionDescription{
Type: sd.Type.String(),
Sdp: sd.SDP,
Id: id,
}
}
func FromProtoSessionDescription(sd *livekit.SessionDescription) (webrtc.SessionDescription, uint32) {
var sdType webrtc.SDPType
switch sd.Type {
case webrtc.SDPTypeOffer.String():
sdType = webrtc.SDPTypeOffer
case webrtc.SDPTypeAnswer.String():
sdType = webrtc.SDPTypeAnswer
case webrtc.SDPTypePranswer.String():
sdType = webrtc.SDPTypePranswer
case webrtc.SDPTypeRollback.String():
sdType = webrtc.SDPTypeRollback
}
return webrtc.SessionDescription{
Type: sdType,
SDP: sd.Sdp,
}, sd.Id
}
func ToProtoTrickle(candidateInit webrtc.ICECandidateInit, target livekit.SignalTarget, final bool) *livekit.TrickleRequest {
data, _ := json.Marshal(candidateInit)
return &livekit.TrickleRequest{
CandidateInit: string(data),
Target: target,
Final: final,
}
}
func FromProtoTrickle(trickle *livekit.TrickleRequest) (webrtc.ICECandidateInit, error) {
ci := webrtc.ICECandidateInit{}
err := json.Unmarshal([]byte(trickle.CandidateInit), &ci)
if err != nil {
return webrtc.ICECandidateInit{}, err
}
return ci, nil
}
+1 -1
View File
@@ -39,9 +39,9 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/signalling"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/signalling"
"github.com/livekit/livekit-server/pkg/rtc/transport/transportfakes"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/buffer"