Implements protocol 3 speaker updates (#120)

* Disallow AddTrack from participants that don't have the permission

* Support protocol 3 speaker updates, client info

* update protocol

* Disallow AddTrack from participants that don't have the permission

* increase wait time for GH to pass
This commit is contained in:
David Zhao
2021-09-17 11:47:13 -07:00
committed by GitHub
parent abde72a907
commit ff47301820
12 changed files with 192 additions and 139 deletions

2
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.8.9
github.com/livekit/protocol v0.9.0
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

4
go.sum
View File

@@ -246,8 +246,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.9 h1:Ih+tE5CXqdL1o6iXfZn+uBoddBXWceaOKVIun6aVuZM=
github.com/livekit/ion-sfu v1.20.9/go.mod h1:g8hwobZI5fvX1RXvayf4ZXkgP7spV5YGE4yTSsumpB4=
github.com/livekit/protocol v0.8.9 h1:oZNXehaeuV906sbnKtc4SvOld0Bj7lTF0AerenwvP70=
github.com/livekit/protocol v0.8.9/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo=
github.com/livekit/protocol v0.9.0 h1:cxMmJAiFRvBVYa0Q0Rfd86BHSLG1zLyD+AE0paHoGAo=
github.com/livekit/protocol v0.9.0/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

View File

@@ -25,13 +25,13 @@ type MessageSource interface {
}
type ParticipantInit struct {
Identity string
Metadata string
Reconnect bool
Permission *livekit.ParticipantPermission
ProtocolVersion int32
AutoSubscribe bool
Hidden bool
Identity string
Metadata string
Reconnect bool
Permission *livekit.ParticipantPermission
AutoSubscribe bool
Hidden bool
Client *livekit.ClientInfo
}
type NewParticipantCallback func(ctx context.Context, roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)

View File

@@ -150,12 +150,12 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin
Identity: pi.Identity,
Metadata: pi.Metadata,
// connection id is to allow the RTC node to identify where to route the message back to
ConnectionId: connectionId,
Reconnect: pi.Reconnect,
Permission: pi.Permission,
ProtocolVersion: pi.ProtocolVersion,
AutoSubscribe: pi.AutoSubscribe,
Hidden: pi.Hidden,
ConnectionId: connectionId,
Reconnect: pi.Reconnect,
Permission: pi.Permission,
AutoSubscribe: pi.AutoSubscribe,
Hidden: pi.Hidden,
Client: pi.Client,
})
if err != nil {
return
@@ -219,13 +219,13 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
}
pi := ParticipantInit{
Identity: ss.Identity,
Metadata: ss.Metadata,
Reconnect: ss.Reconnect,
Permission: ss.Permission,
ProtocolVersion: ss.ProtocolVersion,
AutoSubscribe: ss.AutoSubscribe,
Hidden: ss.Hidden,
Identity: ss.Identity,
Metadata: ss.Metadata,
Reconnect: ss.Reconnect,
Permission: ss.Permission,
Client: ss.Client,
AutoSubscribe: ss.AutoSubscribe,
Hidden: ss.Hidden,
}
reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey)

View File

@@ -321,6 +321,12 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
return
}
if !p.CanPublish() {
logger.Warnw("no permission to publish track", nil,
"participant", p.Identity(), "pID", p.ID())
return
}
ti := &livekit.TrackInfo{
Type: req.Type,
Name: req.Name,
@@ -526,14 +532,15 @@ func (p *ParticipantImpl) SendParticipantUpdate(participants []*livekit.Particip
})
}
func (p *ParticipantImpl) SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error {
// SendSpeakerUpdate notifies participant changes to speakers. only send members that have changed since last update
func (p *ParticipantImpl) SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error {
if !p.IsReady() {
return nil
}
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Speaker{
Speaker: &livekit.ActiveSpeakerUpdate{
Message: &livekit.SignalResponse_SpeakersChanged{
SpeakersChanged: &livekit.SpeakersChanged{
Speakers: speakers,
},
},
@@ -769,6 +776,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
}
logger.Debugw("mediaTrack added",
"kind", track.Kind().String(),
"participant", p.Identity(),
"pID", p.ID(),
"track", track.ID(),

View File

@@ -532,7 +532,8 @@ func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
}
}
func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) {
// for protocol 2, send all active speakers
func (r *Room) sendActiveSpeakers(speakers []*livekit.SpeakerInfo) {
dp := &livekit.DataPacket{
Kind: livekit.DataPacket_LOSSY,
Value: &livekit.DataPacket_Speaker{
@@ -543,10 +544,17 @@ func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) {
}
for _, p := range r.GetParticipants() {
if p.ProtocolVersion().HandlesDataPackets() {
if p.ProtocolVersion().HandlesDataPackets() && !p.ProtocolVersion().SupportsSpeakerChanged() {
_ = p.SendDataPacket(dp)
} else {
_ = p.SendActiveSpeakers(speakers)
}
}
}
// for protocol 3, send only changed updates
func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) {
for _, p := range r.GetParticipants() {
if p.ProtocolVersion().SupportsSpeakerChanged() {
_ = p.SendSpeakerUpdate(speakers)
}
}
}
@@ -562,15 +570,15 @@ func (r *Room) audioUpdateWorker() {
activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel)
}
var lastActiveSpeakers []*livekit.SpeakerInfo
lastActiveMap := make(map[string]*livekit.SpeakerInfo)
for {
if r.isClosed.Get() {
return
}
speakers := r.GetActiveSpeakers()
activeSpeakers := r.GetActiveSpeakers()
if smoothValues != nil {
for _, speaker := range speakers {
for _, speaker := range activeSpeakers {
sid := speaker.Sid
level := smoothValues[sid]
delete(smoothValues, sid)
@@ -584,7 +592,7 @@ func (r *Room) audioUpdateWorker() {
delete(smoothValues, sid)
level += -level * smoothFactor
if level > activeThreshold {
speakers = append(speakers, &livekit.SpeakerInfo{
activeSpeakers = append(activeSpeakers, &livekit.SpeakerInfo{
Sid: sid,
Level: level,
Active: true,
@@ -593,33 +601,45 @@ func (r *Room) audioUpdateWorker() {
}
// smoothValues map is drained, now repopulate it back
for _, speaker := range speakers {
for _, speaker := range activeSpeakers {
smoothValues[speaker.Sid] = speaker.Level
}
sort.Slice(speakers, func(i, j int) bool {
return speakers[i].Level > speakers[j].Level
sort.Slice(activeSpeakers, func(i, j int) bool {
return activeSpeakers[i].Level > activeSpeakers[j].Level
})
}
const invAudioLevelQuantization = 1.0 / AudioLevelQuantization
for _, speaker := range speakers {
for _, speaker := range activeSpeakers {
speaker.Level = float32(math.Ceil(float64(speaker.Level*AudioLevelQuantization)) * invAudioLevelQuantization)
}
// see if an update is needed
if len(speakers) == len(lastActiveSpeakers) {
for i, speaker := range speakers {
if speaker.Level != lastActiveSpeakers[i].Level || speaker.Sid != lastActiveSpeakers[i].Sid {
r.sendSpeakerUpdates(speakers)
break
}
changedSpeakers := make([]*livekit.SpeakerInfo, 0, len(activeSpeakers))
nextActiveMap := make(map[string]*livekit.SpeakerInfo, len(activeSpeakers))
for _, speaker := range activeSpeakers {
prev := lastActiveMap[speaker.Sid]
if prev == nil || prev.Level != speaker.Level {
changedSpeakers = append(changedSpeakers, speaker)
}
nextActiveMap[speaker.Sid] = speaker
}
// changedSpeakers need to include previous speakers that are no longer speaking
for sid, speaker := range lastActiveMap {
if nextActiveMap[sid] == nil {
speaker.Level = 0
speaker.Active = false
changedSpeakers = append(changedSpeakers, speaker)
}
} else {
r.sendSpeakerUpdates(speakers)
}
lastActiveSpeakers = speakers
// see if an update is needed
if len(changedSpeakers) > 0 {
r.sendActiveSpeakers(activeSpeakers)
r.sendSpeakerChanges(changedSpeakers)
}
lastActiveMap = nextActiveMap
time.Sleep(time.Duration(r.audioConfig.UpdateInterval) * time.Millisecond)
}

View File

@@ -48,7 +48,7 @@ type Participant interface {
RemoveSubscriber(peerId string)
SendJoinResponse(info *livekit.Room, otherParticipants []Participant, iceServers []*livekit.ICEServer) error
SendParticipantUpdate(participants []*livekit.ParticipantInfo) error
SendActiveSpeakers(speakers []*livekit.SpeakerInfo) error
SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error
SendDataPacket(packet *livekit.DataPacket) error
SetTrackMuted(trackId string, muted bool, fromAdmin bool)
GetAudioLevel() (level uint8, active bool)

View File

@@ -20,3 +20,8 @@ func (v ProtocolVersion) HandlesDataPackets() bool {
func (v ProtocolVersion) SubscriberAsPrimary() bool {
return v > 2
}
// SupportsSpeakerChanged - if client handles speaker info deltas, instead of a comprehensive list
func (v ProtocolVersion) SupportsSpeakerChanged() bool {
return v > 2
}

View File

@@ -290,17 +290,6 @@ type FakeParticipant struct {
removeSubscriberArgsForCall []struct {
arg1 string
}
SendActiveSpeakersStub func([]*livekit.SpeakerInfo) error
sendActiveSpeakersMutex sync.RWMutex
sendActiveSpeakersArgsForCall []struct {
arg1 []*livekit.SpeakerInfo
}
sendActiveSpeakersReturns struct {
result1 error
}
sendActiveSpeakersReturnsOnCall map[int]struct {
result1 error
}
SendDataPacketStub func(*livekit.DataPacket) error
sendDataPacketMutex sync.RWMutex
sendDataPacketArgsForCall []struct {
@@ -336,6 +325,17 @@ type FakeParticipant struct {
sendParticipantUpdateReturnsOnCall map[int]struct {
result1 error
}
SendSpeakerUpdateStub func([]*livekit.SpeakerInfo) error
sendSpeakerUpdateMutex sync.RWMutex
sendSpeakerUpdateArgsForCall []struct {
arg1 []*livekit.SpeakerInfo
}
sendSpeakerUpdateReturns struct {
result1 error
}
sendSpeakerUpdateReturnsOnCall map[int]struct {
result1 error
}
SetMetadataStub func(string)
setMetadataMutex sync.RWMutex
setMetadataArgsForCall []struct {
@@ -1917,72 +1917,6 @@ func (fake *FakeParticipant) RemoveSubscriberArgsForCall(i int) string {
return argsForCall.arg1
}
func (fake *FakeParticipant) SendActiveSpeakers(arg1 []*livekit.SpeakerInfo) error {
var arg1Copy []*livekit.SpeakerInfo
if arg1 != nil {
arg1Copy = make([]*livekit.SpeakerInfo, len(arg1))
copy(arg1Copy, arg1)
}
fake.sendActiveSpeakersMutex.Lock()
ret, specificReturn := fake.sendActiveSpeakersReturnsOnCall[len(fake.sendActiveSpeakersArgsForCall)]
fake.sendActiveSpeakersArgsForCall = append(fake.sendActiveSpeakersArgsForCall, struct {
arg1 []*livekit.SpeakerInfo
}{arg1Copy})
stub := fake.SendActiveSpeakersStub
fakeReturns := fake.sendActiveSpeakersReturns
fake.recordInvocation("SendActiveSpeakers", []interface{}{arg1Copy})
fake.sendActiveSpeakersMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) SendActiveSpeakersCallCount() int {
fake.sendActiveSpeakersMutex.RLock()
defer fake.sendActiveSpeakersMutex.RUnlock()
return len(fake.sendActiveSpeakersArgsForCall)
}
func (fake *FakeParticipant) SendActiveSpeakersCalls(stub func([]*livekit.SpeakerInfo) error) {
fake.sendActiveSpeakersMutex.Lock()
defer fake.sendActiveSpeakersMutex.Unlock()
fake.SendActiveSpeakersStub = stub
}
func (fake *FakeParticipant) SendActiveSpeakersArgsForCall(i int) []*livekit.SpeakerInfo {
fake.sendActiveSpeakersMutex.RLock()
defer fake.sendActiveSpeakersMutex.RUnlock()
argsForCall := fake.sendActiveSpeakersArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeParticipant) SendActiveSpeakersReturns(result1 error) {
fake.sendActiveSpeakersMutex.Lock()
defer fake.sendActiveSpeakersMutex.Unlock()
fake.SendActiveSpeakersStub = nil
fake.sendActiveSpeakersReturns = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) SendActiveSpeakersReturnsOnCall(i int, result1 error) {
fake.sendActiveSpeakersMutex.Lock()
defer fake.sendActiveSpeakersMutex.Unlock()
fake.SendActiveSpeakersStub = nil
if fake.sendActiveSpeakersReturnsOnCall == nil {
fake.sendActiveSpeakersReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.sendActiveSpeakersReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) SendDataPacket(arg1 *livekit.DataPacket) error {
fake.sendDataPacketMutex.Lock()
ret, specificReturn := fake.sendDataPacketReturnsOnCall[len(fake.sendDataPacketArgsForCall)]
@@ -2183,6 +2117,72 @@ func (fake *FakeParticipant) SendParticipantUpdateReturnsOnCall(i int, result1 e
}{result1}
}
func (fake *FakeParticipant) SendSpeakerUpdate(arg1 []*livekit.SpeakerInfo) error {
var arg1Copy []*livekit.SpeakerInfo
if arg1 != nil {
arg1Copy = make([]*livekit.SpeakerInfo, len(arg1))
copy(arg1Copy, arg1)
}
fake.sendSpeakerUpdateMutex.Lock()
ret, specificReturn := fake.sendSpeakerUpdateReturnsOnCall[len(fake.sendSpeakerUpdateArgsForCall)]
fake.sendSpeakerUpdateArgsForCall = append(fake.sendSpeakerUpdateArgsForCall, struct {
arg1 []*livekit.SpeakerInfo
}{arg1Copy})
stub := fake.SendSpeakerUpdateStub
fakeReturns := fake.sendSpeakerUpdateReturns
fake.recordInvocation("SendSpeakerUpdate", []interface{}{arg1Copy})
fake.sendSpeakerUpdateMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) SendSpeakerUpdateCallCount() int {
fake.sendSpeakerUpdateMutex.RLock()
defer fake.sendSpeakerUpdateMutex.RUnlock()
return len(fake.sendSpeakerUpdateArgsForCall)
}
func (fake *FakeParticipant) SendSpeakerUpdateCalls(stub func([]*livekit.SpeakerInfo) error) {
fake.sendSpeakerUpdateMutex.Lock()
defer fake.sendSpeakerUpdateMutex.Unlock()
fake.SendSpeakerUpdateStub = stub
}
func (fake *FakeParticipant) SendSpeakerUpdateArgsForCall(i int) []*livekit.SpeakerInfo {
fake.sendSpeakerUpdateMutex.RLock()
defer fake.sendSpeakerUpdateMutex.RUnlock()
argsForCall := fake.sendSpeakerUpdateArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeParticipant) SendSpeakerUpdateReturns(result1 error) {
fake.sendSpeakerUpdateMutex.Lock()
defer fake.sendSpeakerUpdateMutex.Unlock()
fake.SendSpeakerUpdateStub = nil
fake.sendSpeakerUpdateReturns = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) SendSpeakerUpdateReturnsOnCall(i int, result1 error) {
fake.sendSpeakerUpdateMutex.Lock()
defer fake.sendSpeakerUpdateMutex.Unlock()
fake.SendSpeakerUpdateStub = nil
if fake.sendSpeakerUpdateReturnsOnCall == nil {
fake.sendSpeakerUpdateReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.sendSpeakerUpdateReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeParticipant) SetMetadata(arg1 string) {
fake.setMetadataMutex.Lock()
fake.setMetadataArgsForCall = append(fake.setMetadataArgsForCall, struct {
@@ -2669,14 +2669,14 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.removeSubscribedTrackMutex.RUnlock()
fake.removeSubscriberMutex.RLock()
defer fake.removeSubscriberMutex.RUnlock()
fake.sendActiveSpeakersMutex.RLock()
defer fake.sendActiveSpeakersMutex.RUnlock()
fake.sendDataPacketMutex.RLock()
defer fake.sendDataPacketMutex.RUnlock()
fake.sendJoinResponseMutex.RLock()
defer fake.sendJoinResponseMutex.RUnlock()
fake.sendParticipantUpdateMutex.RLock()
defer fake.sendParticipantUpdateMutex.RUnlock()
fake.sendSpeakerUpdateMutex.RLock()
defer fake.sendSpeakerUpdateMutex.RUnlock()
fake.setMetadataMutex.RLock()
defer fake.setMetadataMutex.RUnlock()
fake.setPermissionMutex.RLock()

View File

@@ -227,10 +227,12 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi
"room", roomName,
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
"protocol", pi.ProtocolVersion,
"sdk", pi.Client.Sdk,
"sdkVersion", pi.Client.Version,
"protocol", pi.Client.Protocol,
)
pv := types.ProtocolVersion(pi.ProtocolVersion)
pv := types.ProtocolVersion(pi.Client.Protocol)
rtcConf := *r.rtcConfig
rtcConf.SetBufferFactory(room.GetBufferFactor())
participant, err = rtc.NewParticipant(rtc.ParticipantParams{

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
@@ -66,7 +67,6 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit,
roomName := r.FormValue("room")
reconnectParam := r.FormValue("reconnect")
protocolParam := r.FormValue("protocol")
autoSubParam := r.FormValue("auto_subscribe")
if onlyName != "" {
@@ -79,13 +79,11 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit,
AutoSubscribe: true,
Metadata: claims.Metadata,
Hidden: claims.Video.Hidden,
Client: s.parseClientInfo(r.Form),
}
if autoSubParam != "" {
pi.AutoSubscribe = boolValue(autoSubParam)
}
if pv, err := strconv.Atoi(protocolParam); err == nil {
pi.ProtocolVersion = int32(pv)
}
pi.Permission = permissionFromGrant(claims.Video)
return roomName, pi, http.StatusOK, nil
@@ -134,7 +132,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
sigConn := NewWSSignalConnection(conn)
if types.ProtocolVersion(pi.ProtocolVersion).SupportsProtobuf() {
if types.ProtocolVersion(pi.Client.Protocol).SupportsProtobuf() {
sigConn.useJSON = false
}
@@ -201,3 +199,23 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
}
func (s *RTCService) parseClientInfo(values url.Values) *livekit.ClientInfo {
ci := &livekit.ClientInfo{}
if pv, err := strconv.Atoi(values.Get("protocol")); err == nil {
ci.Protocol = int32(pv)
}
sdkString := values.Get("sdk")
switch sdkString {
case "js":
ci.Sdk = livekit.ClientInfo_JS
case "ios":
ci.Sdk = livekit.ClientInfo_IOS
case "android":
ci.Sdk = livekit.ClientInfo_ANDROID
case "flutter":
ci.Sdk = livekit.ClientInfo_FLUTTER
}
ci.Version = values.Get("version")
return ci
}

View File

@@ -540,15 +540,15 @@ func (c *RTCClient) ensurePublisherConnected() error {
}
dcOpen := utils.AtomicFlag{}
c.lossyDC.OnOpen(func() {
c.reliableDC.OnOpen(func() {
dcOpen.TrySet(true)
})
if c.lossyDC.ReadyState() == webrtc.DataChannelStateOpen {
if c.reliableDC.ReadyState() == webrtc.DataChannelStateOpen {
dcOpen.TrySet(true)
}
// wait until connected
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// wait until connected, increase wait time since it takes more than 10s sometimes on GH
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
select {