Files
livekit/test/client/client.go
T
Raja Subramanian 9551c52c85 Try 2 to consolidate mime type (#3407)
* Normalize mime type and add utilities.

An attempt to normalize mime type and avoid string compares remembering
to do case insensitive search.

Not the best solution. Open to ideas. But, define our own mime types
(just in case Pion changes things and Pion also does not have red mime
type defined which should be easy to add though) and tried to use it everywhere.
But, as we get a bunch of callbacks and info from Pion, needed conversion in
more places than I anticipated. And also makes it necessary to carry
that cognitive load of what comes from Pion and needing to process it
properly.

* more locations

* test

* Paul feedback

* MimeType type

* more consolidation

* Remove unused

* test

* test

* mime type as int

* use string method

* Pass error details and timeouts. (#3402)

* go mod tidy (#3408)

* Rename CHANGELOG to CHANGELOG.md (#3391)

Enables markdown features in this otherwise already markdown'ish formatted document

* Update config.go to properly process bool env vars (#3382)

Fixes issue https://github.com/livekit/livekit/issues/3381

* fix(deps): update go deps (#3341)

Generated by renovateBot

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

* Use a Twirp server hook to send API call details to telemetry. (#3401)

* Use a Twirp server hook to send API call details to telemetry.

* mage generate and clean up

* Add project_id

* deps

* - Redact requests
- Do not store responses
- Extract top level fields room_name, room_id, participant_identity,
  participant_id, track_id as appropriate
- Store status as int

* deps

* Update pkg/sfu/mime/mimetype.go

* Fix prefer codec test

* handle down track mime changes

---------

Co-authored-by: Denys Smirnov <dennwc@pm.me>
Co-authored-by: Philzen <Philzen@users.noreply.github.com>
Co-authored-by: Pablo Fuente Pérez <pablofuenteperez@gmail.com>
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Paul Wells <paulwe@gmail.com>
Co-authored-by: cnderrauber <zengjie9004@gmail.com>
2025-02-10 10:44:15 +05:30

896 lines
25 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path/filepath"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"github.com/thoas/go-funk"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/transport/transportfakes"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/mime"
)
type SignalRequestHandler func(msg *livekit.SignalRequest) error
type SignalRequestInterceptor func(msg *livekit.SignalRequest, next SignalRequestHandler) error
type SignalResponseHandler func(msg *livekit.SignalResponse) error
type SignalResponseInterceptor func(msg *livekit.SignalResponse, next SignalResponseHandler) error
type RTCClient struct {
id livekit.ParticipantID
conn *websocket.Conn
publisher *rtc.PCTransport
subscriber *rtc.PCTransport
// sid => track
localTracks map[string]webrtc.TrackLocal
trackSenders map[string]*webrtc.RTPSender
lock sync.Mutex
wsLock sync.Mutex
ctx context.Context
cancel context.CancelFunc
me *webrtc.MediaEngine // optional, populated only when receiving tracks
subscribedTracks map[livekit.ParticipantID][]*webrtc.TrackRemote
localParticipant *livekit.ParticipantInfo
remoteParticipants map[livekit.ParticipantID]*livekit.ParticipantInfo
signalRequestInterceptor SignalRequestInterceptor
signalResponseInterceptor SignalResponseInterceptor
icQueue [2]atomic.Pointer[webrtc.ICECandidate]
subscriberAsPrimary atomic.Bool
publisherFullyEstablished atomic.Bool
subscriberFullyEstablished atomic.Bool
pongReceivedAt atomic.Int64
lastAnswer atomic.Pointer[webrtc.SessionDescription]
// tracks waiting to be acked, cid => trackInfo
pendingPublishedTracks map[string]*livekit.TrackInfo
pendingTrackWriters []*TrackWriter
OnConnected func()
OnDataReceived func(data []byte, sid string)
refreshToken string
// map of livekit.ParticipantID and last packet
lastPackets map[livekit.ParticipantID]*rtp.Packet
bytesReceived map[livekit.ParticipantID]uint64
subscriptionResponse atomic.Pointer[livekit.SubscriptionResponse]
}
var (
// minimal settings only with stun server
rtcConf = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
extMimeMapping = map[string]string{
".ivf": mime.MimeTypeVP8.String(),
".h264": mime.MimeTypeH264.String(),
".ogg": mime.MimeTypeOpus.String(),
}
)
type Options struct {
AutoSubscribe bool
Publish string
ClientInfo *livekit.ClientInfo
DisabledCodecs []webrtc.RTPCodecCapability
TokenCustomizer func(token *auth.AccessToken, grants *auth.VideoGrant)
SignalRequestInterceptor SignalRequestInterceptor
SignalResponseInterceptor SignalResponseInterceptor
}
func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error) {
u, err := url.Parse(host + fmt.Sprintf("/rtc?protocol=%d", types.CurrentProtocol))
if err != nil {
return nil, err
}
requestHeader := make(http.Header)
SetAuthorizationToken(requestHeader, token)
connectUrl := u.String()
sdk := "go"
if opts != nil {
connectUrl = fmt.Sprintf("%s&auto_subscribe=%t", connectUrl, opts.AutoSubscribe)
if opts.Publish != "" {
connectUrl += encodeQueryParam("publish", opts.Publish)
}
if opts.ClientInfo != nil {
if opts.ClientInfo.DeviceModel != "" {
connectUrl += encodeQueryParam("device_model", opts.ClientInfo.DeviceModel)
}
if opts.ClientInfo.Os != "" {
connectUrl += encodeQueryParam("os", opts.ClientInfo.Os)
}
if opts.ClientInfo.Sdk != livekit.ClientInfo_UNKNOWN {
sdk = opts.ClientInfo.Sdk.String()
}
}
}
connectUrl += encodeQueryParam("sdk", sdk)
conn, _, err := websocket.DefaultDialer.Dial(connectUrl, requestHeader)
return conn, err
}
func SetAuthorizationToken(header http.Header, token string) {
header.Set("Authorization", "Bearer "+token)
}
func NewRTCClient(conn *websocket.Conn, opts *Options) (*RTCClient, error) {
var err error
c := &RTCClient{
conn: conn,
localTracks: make(map[string]webrtc.TrackLocal),
trackSenders: make(map[string]*webrtc.RTPSender),
pendingPublishedTracks: make(map[string]*livekit.TrackInfo),
subscribedTracks: make(map[livekit.ParticipantID][]*webrtc.TrackRemote),
remoteParticipants: make(map[livekit.ParticipantID]*livekit.ParticipantInfo),
me: &webrtc.MediaEngine{},
lastPackets: make(map[livekit.ParticipantID]*rtp.Packet),
bytesReceived: make(map[livekit.ParticipantID]uint64),
}
c.ctx, c.cancel = context.WithCancel(context.Background())
conf := rtc.WebRTCConfig{
WebRTCConfig: rtcconfig.WebRTCConfig{
Configuration: rtcConf,
},
}
conf.SettingEngine.SetLite(false)
conf.SettingEngine.SetAnsweringDTLSRole(webrtc.DTLSRoleClient)
ff := buffer.NewFactoryOfBufferFactory(500, 200)
conf.SetBufferFactory(ff.CreateBufferFactory())
var codecs []*livekit.Codec
for _, codec := range []*livekit.Codec{
{
Mime: "audio/opus",
},
{
Mime: "video/vp8",
},
{
Mime: "video/h264",
},
} {
var disabled bool
if opts != nil {
for _, dc := range opts.DisabledCodecs {
if mime.IsMimeTypeStringEqual(dc.MimeType, codec.Mime) && (dc.SDPFmtpLine == "" || dc.SDPFmtpLine == codec.FmtpLine) {
disabled = true
break
}
}
}
if !disabled {
codecs = append(codecs, codec)
}
}
//
// The signal targets are from point of view of server.
// From client side, they are flipped,
// i. e. the publisher transport on client side has SUBSCRIBER signal target (i. e. publisher is offerer).
// Same applies for subscriber transport also
//
publisherHandler := &transportfakes.FakeHandler{}
c.publisher, err = rtc.NewPCTransport(rtc.TransportParams{
Config: &conf,
DirectionConfig: conf.Subscriber,
EnabledCodecs: codecs,
IsOfferer: true,
IsSendSide: true,
Handler: publisherHandler,
DatachannelSlowThreshold: 1024 * 1024 * 1024,
})
if err != nil {
return nil, err
}
subscriberHandler := &transportfakes.FakeHandler{}
c.subscriber, err = rtc.NewPCTransport(rtc.TransportParams{
Config: &conf,
DirectionConfig: conf.Publisher,
EnabledCodecs: codecs,
Handler: subscriberHandler,
DatachannelMaxReceiverBufferSize: 1500,
FireOnTrackBySdp: true,
})
if err != nil {
return nil, err
}
publisherHandler.OnICECandidateCalls(func(ic *webrtc.ICECandidate, t livekit.SignalTarget) error {
return c.SendIceCandidate(ic, livekit.SignalTarget_PUBLISHER)
})
publisherHandler.OnOfferCalls(c.onOffer)
publisherHandler.OnFullyEstablishedCalls(func() {
logger.Debugw("publisher fully established", "participant", c.localParticipant.Identity, "pID", c.localParticipant.Sid)
c.publisherFullyEstablished.Store(true)
})
ordered := true
if err := c.publisher.CreateDataChannel(rtc.ReliableDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
}); err != nil {
return nil, err
}
maxRetransmits := uint16(0)
if err := c.publisher.CreateDataChannel(rtc.LossyDataChannel, &webrtc.DataChannelInit{
Ordered: &ordered,
MaxRetransmits: &maxRetransmits,
}); err != nil {
return nil, err
}
subscriberHandler.OnICECandidateCalls(func(ic *webrtc.ICECandidate, t livekit.SignalTarget) error {
if ic == nil {
return nil
}
return c.SendIceCandidate(ic, livekit.SignalTarget_SUBSCRIBER)
})
subscriberHandler.OnTrackCalls(func(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
fmt.Println("ontrack", track.Codec(), track.PayloadType())
go c.processTrack(track)
})
subscriberHandler.OnDataPacketCalls(c.handleDataMessage)
subscriberHandler.OnInitialConnectedCalls(func() {
logger.Debugw("subscriber initial connected", "participant", c.localParticipant.Identity)
c.lock.Lock()
defer c.lock.Unlock()
for _, tw := range c.pendingTrackWriters {
if err := tw.Start(); err != nil {
logger.Errorw("track writer error", err)
}
}
c.pendingTrackWriters = nil
if c.OnConnected != nil {
go c.OnConnected()
}
})
subscriberHandler.OnFullyEstablishedCalls(func() {
logger.Debugw("subscriber fully established", "participant", c.localParticipant.Identity, "pID", c.localParticipant.Sid)
c.subscriberFullyEstablished.Store(true)
})
subscriberHandler.OnAnswerCalls(func(answer webrtc.SessionDescription) error {
// send remote an answer
logger.Infow("sending subscriber answer",
"participant", c.localParticipant.Identity,
// "sdp", answer,
)
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Answer{
Answer: rtc.ToProtoSessionDescription(answer),
},
})
})
if opts != nil {
c.signalRequestInterceptor = opts.SignalRequestInterceptor
c.signalResponseInterceptor = opts.SignalResponseInterceptor
}
return c, nil
}
func (c *RTCClient) ID() livekit.ParticipantID {
return c.id
}
// create an offer for the server
func (c *RTCClient) Run() error {
c.conn.SetCloseHandler(func(code int, text string) error {
// when closed, stop connection
logger.Infow("connection closed", "code", code, "text", text)
c.Stop()
return nil
})
// run the session
for {
res, err := c.ReadResponse()
if errors.Is(io.EOF, err) {
return nil
} else if err != nil {
logger.Errorw("error while reading", err)
return err
}
if c.signalResponseInterceptor != nil {
err = c.signalResponseInterceptor(res, c.handleSignalResponse)
} else {
err = c.handleSignalResponse(res)
}
if err != nil {
return err
}
}
}
func (c *RTCClient) handleSignalResponse(res *livekit.SignalResponse) error {
switch msg := res.Message.(type) {
case *livekit.SignalResponse_Join:
c.localParticipant = msg.Join.Participant
c.id = livekit.ParticipantID(msg.Join.Participant.Sid)
c.lock.Lock()
for _, p := range msg.Join.OtherParticipants {
c.remoteParticipants[livekit.ParticipantID(p.Sid)] = p
}
c.lock.Unlock()
// if publish only, negotiate
if !msg.Join.SubscriberPrimary {
c.subscriberAsPrimary.Store(false)
c.publisher.Negotiate(false)
} else {
c.subscriberAsPrimary.Store(true)
}
logger.Infow("join accepted, awaiting offer", "participant", msg.Join.Participant.Identity)
case *livekit.SignalResponse_Answer:
// logger.Debugw("received server answer",
// "participant", c.localParticipant.Identity,
// "answer", msg.Answer.Sdp)
c.handleAnswer(rtc.FromProtoSessionDescription(msg.Answer))
case *livekit.SignalResponse_Offer:
logger.Infow("received server offer",
"participant", c.localParticipant.Identity,
)
desc := rtc.FromProtoSessionDescription(msg.Offer)
c.handleOffer(desc)
case *livekit.SignalResponse_Trickle:
candidateInit, err := rtc.FromProtoTrickle(msg.Trickle)
if err != nil {
return err
}
if msg.Trickle.Target == livekit.SignalTarget_PUBLISHER {
c.publisher.AddICECandidate(candidateInit)
} else {
c.subscriber.AddICECandidate(candidateInit)
}
case *livekit.SignalResponse_Update:
c.lock.Lock()
for _, p := range msg.Update.Participants {
if livekit.ParticipantID(p.Sid) != c.id {
if p.State != livekit.ParticipantInfo_DISCONNECTED {
c.remoteParticipants[livekit.ParticipantID(p.Sid)] = p
} else {
delete(c.remoteParticipants, livekit.ParticipantID(p.Sid))
}
}
}
c.lock.Unlock()
case *livekit.SignalResponse_TrackPublished:
logger.Debugw("track published", "trackID", msg.TrackPublished.Track.Name, "participant", c.localParticipant.Sid,
"cid", msg.TrackPublished.Cid, "trackSid", msg.TrackPublished.Track.Sid)
c.lock.Lock()
c.pendingPublishedTracks[msg.TrackPublished.Cid] = msg.TrackPublished.Track
c.lock.Unlock()
case *livekit.SignalResponse_RefreshToken:
c.lock.Lock()
c.refreshToken = msg.RefreshToken
c.lock.Unlock()
case *livekit.SignalResponse_TrackUnpublished:
sid := msg.TrackUnpublished.TrackSid
c.lock.Lock()
sender := c.trackSenders[sid]
if sender != nil {
if err := c.publisher.RemoveTrack(sender); err != nil {
logger.Errorw("Could not unpublish track", err)
}
c.publisher.Negotiate(false)
}
delete(c.trackSenders, sid)
delete(c.localTracks, sid)
c.lock.Unlock()
case *livekit.SignalResponse_Pong:
c.pongReceivedAt.Store(msg.Pong)
case *livekit.SignalResponse_SubscriptionResponse:
c.subscriptionResponse.Store(msg.SubscriptionResponse)
}
return nil
}
func (c *RTCClient) WaitUntilConnected() error {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
id := string(c.ID())
if c.localParticipant != nil {
id = c.localParticipant.Identity
}
return fmt.Errorf("%s could not connect after timeout", id)
case <-time.After(10 * time.Millisecond):
if c.subscriberAsPrimary.Load() {
if c.subscriberFullyEstablished.Load() {
return nil
}
} else {
if c.publisherFullyEstablished.Load() {
return nil
}
}
}
}
}
func (c *RTCClient) ReadResponse() (*livekit.SignalResponse, error) {
for {
// handle special messages and pass on the rest
messageType, payload, err := c.conn.ReadMessage()
if err != nil {
return nil, err
}
if c.ctx.Err() != nil {
return nil, c.ctx.Err()
}
msg := &livekit.SignalResponse{}
switch messageType {
case websocket.PingMessage:
_ = c.conn.WriteMessage(websocket.PongMessage, nil)
continue
case websocket.BinaryMessage:
// protobuf encoded
err := proto.Unmarshal(payload, msg)
return msg, err
default:
return nil, fmt.Errorf("unexpected message received: %v", messageType)
}
}
}
func (c *RTCClient) SubscribedTracks() map[livekit.ParticipantID][]*webrtc.TrackRemote {
// create a copy of this
c.lock.Lock()
defer c.lock.Unlock()
tracks := make(map[livekit.ParticipantID][]*webrtc.TrackRemote, len(c.subscribedTracks))
for key, val := range c.subscribedTracks {
tracks[key] = val
}
return tracks
}
func (c *RTCClient) RemoteParticipants() []*livekit.ParticipantInfo {
c.lock.Lock()
defer c.lock.Unlock()
return funk.Values(c.remoteParticipants).([]*livekit.ParticipantInfo)
}
func (c *RTCClient) GetRemoteParticipant(sid livekit.ParticipantID) *livekit.ParticipantInfo {
c.lock.Lock()
defer c.lock.Unlock()
return c.remoteParticipants[sid]
}
func (c *RTCClient) Stop() {
logger.Infow("stopping client", "ID", c.ID())
_ = c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Leave{
Leave: &livekit.LeaveRequest{
Reason: livekit.DisconnectReason_CLIENT_INITIATED,
Action: livekit.LeaveRequest_DISCONNECT,
},
},
})
c.publisherFullyEstablished.Store(false)
c.subscriberFullyEstablished.Store(false)
_ = c.conn.Close()
c.publisher.Close()
c.subscriber.Close()
c.cancel()
}
func (c *RTCClient) RefreshToken() string {
c.lock.Lock()
defer c.lock.Unlock()
return c.refreshToken
}
func (c *RTCClient) PongReceivedAt() int64 {
return c.pongReceivedAt.Load()
}
func (c *RTCClient) GetSubscriptionResponseAndClear() *livekit.SubscriptionResponse {
return c.subscriptionResponse.Swap(nil)
}
func (c *RTCClient) SendPing() error {
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Ping{
Ping: time.Now().UnixNano(),
},
})
}
func (c *RTCClient) SendRequest(msg *livekit.SignalRequest) error {
if c.signalRequestInterceptor != nil {
return c.signalRequestInterceptor(msg, c.sendRequest)
} else {
return c.sendRequest(msg)
}
}
func (c *RTCClient) sendRequest(msg *livekit.SignalRequest) error {
payload, err := proto.Marshal(msg)
if err != nil {
return err
}
c.wsLock.Lock()
defer c.wsLock.Unlock()
return c.conn.WriteMessage(websocket.BinaryMessage, payload)
}
func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate, target livekit.SignalTarget) error {
prevIC := c.icQueue[target].Swap(ic)
if prevIC == nil {
return nil
}
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Trickle{
Trickle: rtc.ToProtoTrickle(prevIC.ToJSON(), target, ic == nil),
},
})
}
func (c *RTCClient) SetAttributes(attrs map[string]string) error {
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_UpdateMetadata{
UpdateMetadata: &livekit.UpdateParticipantMetadata{
Attributes: attrs,
},
},
})
}
func (c *RTCClient) hasPrimaryEverConnected() bool {
if c.subscriberAsPrimary.Load() {
return c.subscriber.HasEverConnected()
} else {
return c.publisher.HasEverConnected()
}
}
type AddTrackParams struct {
NoWriter bool
}
type AddTrackOption func(params *AddTrackParams)
func AddTrackNoWriter() AddTrackOption {
return func(params *AddTrackParams) {
params.NoWriter = true
}
}
func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string, opts ...AddTrackOption) (writer *TrackWriter, err error) {
var params AddTrackParams
for _, opt := range opts {
opt(&params)
}
trackType := livekit.TrackType_AUDIO
if track.Kind() == webrtc.RTPCodecTypeVideo {
trackType = livekit.TrackType_VIDEO
}
if err = c.SendAddTrack(track.ID(), track.StreamID(), trackType); err != nil {
return
}
// wait till track published message is received
timeout := time.After(5 * time.Second)
var ti *livekit.TrackInfo
for {
select {
case <-timeout:
return nil, errors.New("could not publish track after timeout")
default:
c.lock.Lock()
ti = c.pendingPublishedTracks[track.ID()]
c.lock.Unlock()
if ti != nil {
break
}
time.Sleep(50 * time.Millisecond)
}
if ti != nil {
break
}
}
c.lock.Lock()
defer c.lock.Unlock()
sender, _, err := c.publisher.AddTrack(track, types.AddTrackParams{})
if err != nil {
logger.Errorw("add track failed", err, "trackID", ti.Sid, "participant", c.localParticipant.Identity, "pID", c.localParticipant.Sid)
return
}
c.localTracks[ti.Sid] = track
c.trackSenders[ti.Sid] = sender
c.publisher.Negotiate(false)
if !params.NoWriter {
writer = NewTrackWriter(c.ctx, track, path)
// write tracks only after connection established
if c.hasPrimaryEverConnected() {
err = writer.Start()
} else {
c.pendingTrackWriters = append(c.pendingTrackWriters, writer)
}
}
return
}
func (c *RTCClient) AddStaticTrack(mime string, id string, label string, opts ...AddTrackOption) (writer *TrackWriter, err error) {
return c.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{MimeType: mime}, id, label, opts...)
}
func (c *RTCClient) AddStaticTrackWithCodec(codec webrtc.RTPCodecCapability, id string, label string, opts ...AddTrackOption) (writer *TrackWriter, err error) {
track, err := webrtc.NewTrackLocalStaticSample(codec, id, label)
if err != nil {
return
}
return c.AddTrack(track, "", opts...)
}
func (c *RTCClient) AddFileTrack(path string, id string, label string) (writer *TrackWriter, err error) {
// determine file mime
mime, ok := extMimeMapping[filepath.Ext(path)]
if !ok {
return nil, fmt.Errorf("%s has an unsupported extension", filepath.Base(path))
}
logger.Debugw("adding file track",
"mime", mime,
)
track, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: mime},
id,
label,
)
if err != nil {
return
}
return c.AddTrack(track, path)
}
// send AddTrack command to server to initiate server-side negotiation
func (c *RTCClient) SendAddTrack(cid string, name string, trackType livekit.TrackType) error {
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_AddTrack{
AddTrack: &livekit.AddTrackRequest{
Cid: cid,
Name: name,
Type: trackType,
},
},
})
}
func (c *RTCClient) PublishData(data []byte, kind livekit.DataPacket_Kind) error {
if err := c.ensurePublisherConnected(); err != nil {
return err
}
dpData, err := proto.Marshal(&livekit.DataPacket{
Value: &livekit.DataPacket_User{
User: &livekit.UserPacket{Payload: data},
},
})
if err != nil {
return err
}
return c.publisher.SendDataPacket(kind, dpData)
}
func (c *RTCClient) GetPublishedTrackIDs() []string {
c.lock.Lock()
defer c.lock.Unlock()
var trackIDs []string
for key := range c.localTracks {
trackIDs = append(trackIDs, key)
}
return trackIDs
}
// LastAnswer return SDP of the last answer for the publisher connection
func (c *RTCClient) LastAnswer() *webrtc.SessionDescription {
return c.lastAnswer.Load()
}
func (c *RTCClient) ensurePublisherConnected() error {
if c.publisher.HasEverConnected() {
return nil
}
// start negotiating
c.publisher.Negotiate(false)
// wait until connected, increase wait time since it takes more than 10s sometimes on GH
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return fmt.Errorf("could not connect publisher after timeout")
case <-time.After(10 * time.Millisecond):
if c.publisherFullyEstablished.Load() {
return nil
}
}
}
}
func (c *RTCClient) handleDataMessage(kind livekit.DataPacket_Kind, data []byte) {
dp := &livekit.DataPacket{}
err := proto.Unmarshal(data, dp)
if err != nil {
return
}
dp.Kind = kind
if val, ok := dp.Value.(*livekit.DataPacket_User); ok {
if c.OnDataReceived != nil {
c.OnDataReceived(val.User.Payload, val.User.ParticipantSid)
}
}
}
// handles a server initiated offer, handle on subscriber PC
func (c *RTCClient) handleOffer(desc webrtc.SessionDescription) {
c.subscriber.HandleRemoteDescription(desc)
}
// the client handles answer on the publisher PC
func (c *RTCClient) handleAnswer(desc webrtc.SessionDescription) {
logger.Infow("handling server answer", "participant", c.localParticipant.Identity)
c.lastAnswer.Store(&desc)
// remote answered the offer, establish connection
c.publisher.HandleRemoteDescription(desc)
}
func (c *RTCClient) onOffer(offer webrtc.SessionDescription) error {
if c.localParticipant != nil {
logger.Infow("starting negotiation", "participant", c.localParticipant.Identity)
}
return c.SendRequest(&livekit.SignalRequest{
Message: &livekit.SignalRequest_Offer{
Offer: rtc.ToProtoSessionDescription(offer),
},
})
}
func (c *RTCClient) processTrack(track *webrtc.TrackRemote) {
lastUpdate := time.Time{}
pId, trackId := rtc.UnpackStreamID(track.StreamID())
if trackId == "" {
trackId = livekit.TrackID(track.ID())
}
c.lock.Lock()
c.subscribedTracks[pId] = append(c.subscribedTracks[pId], track)
c.lock.Unlock()
logger.Infow("client added track", "participant", c.localParticipant.Identity,
"pID", pId,
"trackID", trackId,
"codec", track.Codec(),
)
defer func() {
c.lock.Lock()
c.subscribedTracks[pId] = funk.Without(c.subscribedTracks[pId], track).([]*webrtc.TrackRemote)
c.lock.Unlock()
}()
numBytes := 0
for {
pkt, _, err := track.ReadRTP()
if c.ctx.Err() != nil {
break
}
if rtc.IsEOF(err) {
break
}
if err != nil {
logger.Warnw("error reading RTP", err)
continue
}
c.lock.Lock()
c.lastPackets[pId] = pkt
c.bytesReceived[pId] += uint64(pkt.MarshalSize())
c.lock.Unlock()
numBytes += pkt.MarshalSize()
if time.Since(lastUpdate) > 30*time.Second {
logger.Infow("consumed from participant",
"trackID", trackId, "pID", pId,
"size", numBytes)
lastUpdate = time.Now()
}
}
}
func (c *RTCClient) BytesReceived() uint64 {
var total uint64
c.lock.Lock()
for _, size := range c.bytesReceived {
total += size
}
c.lock.Unlock()
return total
}
func (c *RTCClient) SendNacks(count int) {
var packets []rtcp.Packet
c.lock.Lock()
for _, pkt := range c.lastPackets {
seqs := make([]uint16, 0, count)
for i := 0; i < count; i++ {
seqs = append(seqs, pkt.SequenceNumber-uint16(i))
}
packets = append(packets, &rtcp.TransportLayerNack{
MediaSSRC: pkt.SSRC,
Nacks: rtcp.NackPairsFromSequenceNumbers(seqs),
})
}
c.lock.Unlock()
_ = c.subscriber.WriteRTCP(packets)
}
func encodeQueryParam(key, value string) string {
return fmt.Sprintf("&%s=%s", url.QueryEscape(key), url.QueryEscape(value))
}