mirror of
https://github.com/livekit/livekit.git
synced 2026-04-07 14:15:41 +00:00
600 lines
15 KiB
Go
600 lines
15 KiB
Go
package rtc
|
|
|
|
import (
|
|
"math"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/go-logr/zapr"
|
|
"github.com/livekit/protocol/utils"
|
|
"github.com/pion/ion-sfu/pkg/buffer"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/logger"
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
livekit "github.com/livekit/livekit-server/proto"
|
|
)
|
|
|
|
const (
|
|
DefaultEmptyTimeout = 5 * 60 // 5m
|
|
DefaultRoomDepartureGrace = 20
|
|
AudioLevelQuantization = 8 // ideally power of 2 to minimize float decimal
|
|
)
|
|
|
|
type Room struct {
|
|
Room *livekit.Room
|
|
config WebRTCConfig
|
|
iceServers []*livekit.ICEServer
|
|
lock sync.RWMutex
|
|
// map of identity -> Participant
|
|
participants map[string]types.Participant
|
|
participantOpts map[string]*ParticipantOptions
|
|
bufferFactory *buffer.Factory
|
|
|
|
// time the first participant joined the room
|
|
joinedAt atomic.Value
|
|
// time that the last participant left the room
|
|
leftAt atomic.Value
|
|
isClosed utils.AtomicFlag
|
|
|
|
// for active speaker updates
|
|
audioConfig *config.AudioConfig
|
|
|
|
statsReporter *RoomStatsReporter
|
|
|
|
onParticipantChanged func(p types.Participant)
|
|
onClose func()
|
|
}
|
|
|
|
type ParticipantOptions struct {
|
|
AutoSubscribe bool
|
|
}
|
|
|
|
func NewRoom(room *livekit.Room, config WebRTCConfig, iceServers []*livekit.ICEServer, audioConfig *config.AudioConfig) *Room {
|
|
r := &Room{
|
|
Room: proto.Clone(room).(*livekit.Room),
|
|
config: config,
|
|
iceServers: iceServers,
|
|
audioConfig: audioConfig,
|
|
statsReporter: NewRoomStatsReporter(room.Name),
|
|
participants: make(map[string]types.Participant),
|
|
participantOpts: make(map[string]*ParticipantOptions),
|
|
bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, zapr.NewLogger(logger.GetLogger())),
|
|
}
|
|
if r.Room.EmptyTimeout == 0 {
|
|
r.Room.EmptyTimeout = DefaultEmptyTimeout
|
|
}
|
|
if r.Room.CreationTime == 0 {
|
|
r.Room.CreationTime = time.Now().Unix()
|
|
}
|
|
r.statsReporter.RoomStarted()
|
|
go r.audioUpdateWorker()
|
|
return r
|
|
}
|
|
|
|
func (r *Room) GetParticipant(identity string) types.Participant {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
return r.participants[identity]
|
|
}
|
|
|
|
func (r *Room) GetParticipants() []types.Participant {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
participants := make([]types.Participant, 0, len(r.participants))
|
|
for _, p := range r.participants {
|
|
participants = append(participants, p)
|
|
}
|
|
return participants
|
|
}
|
|
|
|
func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo {
|
|
participants := r.GetParticipants()
|
|
speakers := make([]*livekit.SpeakerInfo, 0, len(participants))
|
|
for _, p := range participants {
|
|
level, active := p.GetAudioLevel()
|
|
if !active {
|
|
continue
|
|
}
|
|
speakers = append(speakers, &livekit.SpeakerInfo{
|
|
Sid: p.ID(),
|
|
Level: ConvertAudioLevel(level),
|
|
Active: active,
|
|
})
|
|
}
|
|
|
|
sort.Slice(speakers, func(i, j int) bool {
|
|
return speakers[i].Level > speakers[j].Level
|
|
})
|
|
return speakers
|
|
}
|
|
|
|
func (r *Room) GetStatsReporter() *RoomStatsReporter {
|
|
return r.statsReporter
|
|
}
|
|
|
|
func (r *Room) GetBufferFactor() *buffer.Factory {
|
|
return r.bufferFactory
|
|
}
|
|
|
|
func (r *Room) FirstJoinedAt() int64 {
|
|
j := r.joinedAt.Load()
|
|
if t, ok := j.(int64); ok {
|
|
return t
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (r *Room) LastLeftAt() int64 {
|
|
l := r.leftAt.Load()
|
|
if t, ok := l.(int64); ok {
|
|
return t
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) error {
|
|
if r.isClosed.Get() {
|
|
return ErrRoomClosed
|
|
}
|
|
|
|
r.lock.Lock()
|
|
defer r.lock.Unlock()
|
|
|
|
if r.participants[participant.Identity()] != nil {
|
|
return ErrAlreadyJoined
|
|
}
|
|
|
|
if r.Room.MaxParticipants > 0 && int(r.Room.MaxParticipants) == len(r.participants) {
|
|
return ErrMaxParticipantsExceeded
|
|
}
|
|
|
|
if r.FirstJoinedAt() == 0 {
|
|
r.joinedAt.Store(time.Now().Unix())
|
|
}
|
|
|
|
r.statsReporter.AddParticipant()
|
|
|
|
// it's important to set this before connection, we don't want to miss out on any publishedTracks
|
|
participant.OnTrackPublished(r.onTrackPublished)
|
|
participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) {
|
|
logger.Debugw("participant state changed", "state", p.State(), "participant", p.Identity(),
|
|
"oldState", oldState)
|
|
if r.onParticipantChanged != nil {
|
|
r.onParticipantChanged(participant)
|
|
}
|
|
r.broadcastParticipantState(p, true)
|
|
|
|
state := p.State()
|
|
if state == livekit.ParticipantInfo_ACTIVE {
|
|
if p.UpdateAfterActive() {
|
|
_ = p.SendParticipantUpdate(ToProtoParticipants(r.GetParticipants()))
|
|
}
|
|
|
|
// subscribe participant to existing publishedTracks
|
|
r.subscribeToExistingTracks(p)
|
|
|
|
// start the workers once connectivity is established
|
|
p.Start()
|
|
|
|
} else if state == livekit.ParticipantInfo_DISCONNECTED {
|
|
// remove participant from room
|
|
go r.RemoveParticipant(p.Identity())
|
|
}
|
|
})
|
|
participant.OnTrackUpdated(r.onTrackUpdated)
|
|
participant.OnMetadataUpdate(r.onParticipantMetadataUpdate)
|
|
participant.OnDataPacket(r.onDataPacket)
|
|
logger.Infow("new participant joined",
|
|
"id", participant.ID(),
|
|
"participant", participant.Identity(),
|
|
"roomId", r.Room.Sid)
|
|
|
|
r.participants[participant.Identity()] = participant
|
|
r.participantOpts[participant.Identity()] = opts
|
|
|
|
// gather other participants and send join response
|
|
otherParticipants := make([]types.Participant, 0, len(r.participants))
|
|
for _, p := range r.participants {
|
|
if p.ID() != participant.ID() {
|
|
otherParticipants = append(otherParticipants, p)
|
|
}
|
|
}
|
|
|
|
if r.onParticipantChanged != nil {
|
|
r.onParticipantChanged(participant)
|
|
}
|
|
|
|
time.AfterFunc(time.Minute, func() {
|
|
state := participant.State()
|
|
if state == livekit.ParticipantInfo_JOINING || state == livekit.ParticipantInfo_JOINED {
|
|
r.RemoveParticipant(participant.Identity())
|
|
}
|
|
})
|
|
|
|
return participant.SendJoinResponse(r.Room, otherParticipants, r.iceServers)
|
|
}
|
|
|
|
func (r *Room) RemoveParticipant(identity string) {
|
|
r.lock.Lock()
|
|
p, ok := r.participants[identity]
|
|
if ok {
|
|
delete(r.participants, identity)
|
|
delete(r.participantOpts, identity)
|
|
}
|
|
r.lock.Unlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
r.statsReporter.SubParticipant()
|
|
|
|
// send broadcast only if it's not already closed
|
|
sendUpdates := p.State() != livekit.ParticipantInfo_DISCONNECTED
|
|
|
|
p.OnTrackUpdated(nil)
|
|
p.OnTrackPublished(nil)
|
|
p.OnStateChange(nil)
|
|
p.OnMetadataUpdate(nil)
|
|
p.OnDataPacket(nil)
|
|
|
|
// close participant as well
|
|
_ = p.Close()
|
|
|
|
r.lock.RLock()
|
|
if len(r.participants) == 0 {
|
|
r.leftAt.Store(time.Now().Unix())
|
|
}
|
|
r.lock.RUnlock()
|
|
|
|
if sendUpdates {
|
|
if r.onParticipantChanged != nil {
|
|
r.onParticipantChanged(p)
|
|
}
|
|
r.broadcastParticipantState(p, true)
|
|
}
|
|
}
|
|
|
|
func (r *Room) UpdateSubscriptions(participant types.Participant, trackIds []string, subscribe bool) error {
|
|
if !participant.CanSubscribe() {
|
|
return ErrCannotSubscribe
|
|
}
|
|
|
|
// find all matching tracks
|
|
var tracks []types.PublishedTrack
|
|
participants := r.GetParticipants()
|
|
for _, p := range participants {
|
|
for _, sid := range trackIds {
|
|
for _, track := range p.GetPublishedTracks() {
|
|
if sid == track.ID() {
|
|
tracks = append(tracks, track)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// handle subscription changes
|
|
for _, track := range tracks {
|
|
if subscribe {
|
|
if err := track.AddSubscriber(participant); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
track.RemoveSubscriber(participant.ID())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CloseIfEmpty closes the room if all participants had left, or it's still empty past timeout
|
|
func (r *Room) CloseIfEmpty() {
|
|
if r.isClosed.Get() {
|
|
return
|
|
}
|
|
|
|
r.lock.RLock()
|
|
numParticipants := len(r.participants)
|
|
r.lock.RUnlock()
|
|
|
|
if numParticipants > 0 {
|
|
return
|
|
}
|
|
|
|
timeout := r.Room.EmptyTimeout
|
|
var elapsed int64
|
|
if r.FirstJoinedAt() > 0 {
|
|
// exit 20s after
|
|
elapsed = time.Now().Unix() - r.LastLeftAt()
|
|
if timeout > DefaultRoomDepartureGrace {
|
|
timeout = DefaultRoomDepartureGrace
|
|
}
|
|
} else {
|
|
elapsed = time.Now().Unix() - r.Room.CreationTime
|
|
}
|
|
|
|
if elapsed >= int64(timeout) {
|
|
r.Close()
|
|
}
|
|
}
|
|
|
|
func (r *Room) Close() {
|
|
if !r.isClosed.TrySet(true) {
|
|
return
|
|
}
|
|
logger.Infow("closing room", "room", r.Room.Sid, "name", r.Room.Name)
|
|
|
|
r.statsReporter.RoomEnded()
|
|
if r.onClose != nil {
|
|
r.onClose()
|
|
}
|
|
}
|
|
|
|
func (r *Room) GetIncomingStats() PacketStats {
|
|
return *r.statsReporter.incoming
|
|
}
|
|
|
|
func (r *Room) GetOutgoingStats() PacketStats {
|
|
return *r.statsReporter.outgoing
|
|
}
|
|
|
|
func (r *Room) OnClose(f func()) {
|
|
r.onClose = f
|
|
}
|
|
|
|
func (r *Room) OnParticipantChanged(f func(participant types.Participant)) {
|
|
r.onParticipantChanged = f
|
|
}
|
|
|
|
// checks if participant should be autosubscribed to new tracks, assumes lock is already acquired
|
|
func (r *Room) autoSubscribe(participant types.Participant) bool {
|
|
if !participant.CanSubscribe() {
|
|
return false
|
|
}
|
|
|
|
opts := r.participantOpts[participant.Identity()]
|
|
// default to true if no options are set
|
|
if opts != nil && !opts.AutoSubscribe {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// a ParticipantImpl in the room added a new remoteTrack, subscribe other participants to it
|
|
func (r *Room) onTrackPublished(participant types.Participant, track types.PublishedTrack) {
|
|
// publish participant update, since track state is changed
|
|
r.broadcastParticipantState(participant, true)
|
|
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
|
|
// subscribe all existing participants to this PublishedTrack
|
|
// this is the default behavior. in the future this could be more selective
|
|
for _, existingParticipant := range r.participants {
|
|
if existingParticipant == participant {
|
|
// skip publishing participant
|
|
continue
|
|
}
|
|
if existingParticipant.State() != livekit.ParticipantInfo_ACTIVE {
|
|
// not fully joined. don't subscribe yet
|
|
continue
|
|
}
|
|
if !r.autoSubscribe(existingParticipant) {
|
|
continue
|
|
}
|
|
|
|
logger.Debugw("subscribing to new track",
|
|
"source", participant.Identity(),
|
|
"remoteTrack", track.ID(),
|
|
"dest", existingParticipant.Identity())
|
|
if err := track.AddSubscriber(existingParticipant); err != nil {
|
|
logger.Errorw("could not subscribe to remoteTrack", err,
|
|
"source", participant.Identity(),
|
|
"remoteTrack", track.ID(),
|
|
"dest", existingParticipant.Identity())
|
|
}
|
|
}
|
|
|
|
if r.onParticipantChanged != nil {
|
|
r.onParticipantChanged(participant)
|
|
}
|
|
}
|
|
|
|
func (r *Room) onTrackUpdated(p types.Participant, _ types.PublishedTrack) {
|
|
// send track updates to everyone, especially if track was updated by admin
|
|
r.broadcastParticipantState(p, false)
|
|
if r.onParticipantChanged != nil {
|
|
r.onParticipantChanged(p)
|
|
}
|
|
}
|
|
|
|
func (r *Room) onParticipantMetadataUpdate(p types.Participant) {
|
|
r.broadcastParticipantState(p, false)
|
|
if r.onParticipantChanged != nil {
|
|
r.onParticipantChanged(p)
|
|
}
|
|
}
|
|
|
|
func (r *Room) onDataPacket(source types.Participant, dp *livekit.DataPacket) {
|
|
dest := dp.GetUser().GetDestinationSids()
|
|
|
|
for _, op := range r.GetParticipants() {
|
|
if op.State() != livekit.ParticipantInfo_ACTIVE {
|
|
continue
|
|
}
|
|
if op.ID() == source.ID() {
|
|
continue
|
|
}
|
|
if len(dest) > 0 {
|
|
found := false
|
|
for _, dSid := range dest {
|
|
if op.ID() == dSid {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
continue
|
|
}
|
|
}
|
|
_ = op.SendDataPacket(dp)
|
|
}
|
|
}
|
|
|
|
func (r *Room) subscribeToExistingTracks(p types.Participant) {
|
|
r.lock.RLock()
|
|
shouldSubscribe := r.autoSubscribe(p)
|
|
r.lock.RUnlock()
|
|
if !shouldSubscribe {
|
|
return
|
|
}
|
|
|
|
tracksAdded := 0
|
|
for _, op := range r.GetParticipants() {
|
|
if p.ID() == op.ID() {
|
|
// don't send to itself
|
|
continue
|
|
}
|
|
if n, err := op.AddSubscriber(p); err != nil {
|
|
// TODO: log error? or disconnect?
|
|
logger.Errorw("could not subscribe to participant", err,
|
|
"dest", p.Identity(),
|
|
"source", op.Identity())
|
|
} else {
|
|
tracksAdded += n
|
|
}
|
|
}
|
|
if tracksAdded > 0 {
|
|
logger.Debugw("subscribed participants to existing tracks", "tracks", tracksAdded)
|
|
}
|
|
}
|
|
|
|
// broadcast an update about participant p
|
|
func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
|
|
updates := ToProtoParticipants([]types.Participant{p})
|
|
participants := r.GetParticipants()
|
|
for _, op := range participants {
|
|
// skip itself && closed participants
|
|
if (skipSource && p.ID() == op.ID()) || op.State() == livekit.ParticipantInfo_DISCONNECTED {
|
|
continue
|
|
}
|
|
|
|
err := op.SendParticipantUpdate(updates)
|
|
if err != nil {
|
|
logger.Errorw("could not send update to participant", err,
|
|
"participant", p.Identity())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Room) sendSpeakerUpdates(speakers []*livekit.SpeakerInfo) {
|
|
dp := &livekit.DataPacket{
|
|
Kind: livekit.DataPacket_LOSSY,
|
|
Value: &livekit.DataPacket_Speaker{
|
|
Speaker: &livekit.ActiveSpeakerUpdate{
|
|
Speakers: speakers,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, p := range r.GetParticipants() {
|
|
if p.ProtocolVersion().HandlesDataPackets() {
|
|
_ = p.SendDataPacket(dp)
|
|
} else {
|
|
_ = p.SendActiveSpeakers(speakers)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Room) audioUpdateWorker() {
|
|
var smoothValues map[string]float32
|
|
var smoothFactor float32
|
|
var activeThreshold float32
|
|
if ss := r.audioConfig.SmoothIntervals; ss > 1 {
|
|
smoothValues = make(map[string]float32)
|
|
// exponential moving average (EMA), same center of mass with simple moving average (SMA)
|
|
smoothFactor = 2 / float32(ss+1)
|
|
activeThreshold = ConvertAudioLevel(r.audioConfig.ActiveLevel)
|
|
}
|
|
|
|
var lastActiveSpeakers []*livekit.SpeakerInfo
|
|
for {
|
|
if r.isClosed.Get() {
|
|
return
|
|
}
|
|
|
|
speakers := r.GetActiveSpeakers()
|
|
if smoothValues != nil {
|
|
for _, speaker := range speakers {
|
|
sid := speaker.Sid
|
|
level := smoothValues[sid]
|
|
delete(smoothValues, sid)
|
|
// exponential moving average (EMA)
|
|
level += (speaker.Level - level) * smoothFactor
|
|
speaker.Level = level
|
|
}
|
|
|
|
// ensure that previous active speakers are also included
|
|
for sid, level := range smoothValues {
|
|
delete(smoothValues, sid)
|
|
level += -level * smoothFactor
|
|
if level > activeThreshold {
|
|
speakers = append(speakers, &livekit.SpeakerInfo{
|
|
Sid: sid,
|
|
Level: level,
|
|
Active: true,
|
|
})
|
|
}
|
|
}
|
|
|
|
// smoothValues map is drained, now repopulate it back
|
|
for _, speaker := range speakers {
|
|
smoothValues[speaker.Sid] = speaker.Level
|
|
}
|
|
|
|
sort.Slice(speakers, func(i, j int) bool {
|
|
return speakers[i].Level > speakers[j].Level
|
|
})
|
|
}
|
|
|
|
const invAudioLevelQuantization = 1.0 / AudioLevelQuantization
|
|
for _, speaker := range speakers {
|
|
speaker.Level = float32(math.Ceil(float64(speaker.Level*AudioLevelQuantization)) * invAudioLevelQuantization)
|
|
}
|
|
|
|
// see if an update is needed
|
|
if len(speakers) == len(lastActiveSpeakers) {
|
|
for i, speaker := range speakers {
|
|
if speaker.Level != lastActiveSpeakers[i].Level || speaker.Sid != lastActiveSpeakers[i].Sid {
|
|
r.sendSpeakerUpdates(speakers)
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
r.sendSpeakerUpdates(speakers)
|
|
}
|
|
|
|
lastActiveSpeakers = speakers
|
|
|
|
time.Sleep(time.Duration(r.audioConfig.UpdateInterval) * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (r *Room) DebugInfo() map[string]interface{} {
|
|
info := map[string]interface{}{
|
|
"Name": r.Room.Name,
|
|
"Sid": r.Room.Sid,
|
|
"CreatedAt": r.Room.CreationTime,
|
|
}
|
|
|
|
participants := r.GetParticipants()
|
|
participantInfo := make(map[string]interface{})
|
|
for _, p := range participants {
|
|
participantInfo[p.Identity()] = p.DebugInfo()
|
|
}
|
|
info["Participants"] = participantInfo
|
|
|
|
return info
|
|
}
|