mirror of
https://github.com/livekit/livekit.git
synced 2026-03-29 09:19:53 +00:00
trackwriter, progress with tester client
This commit is contained in:
@@ -3,11 +3,14 @@ package client
|
||||
import (
|
||||
"container/ring"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pion/randutil"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@@ -18,14 +21,19 @@ import (
|
||||
)
|
||||
|
||||
type RTCClient struct {
|
||||
conn *websocket.Conn
|
||||
PeerConn *webrtc.PeerConnection
|
||||
pendingCandidates []*webrtc.ICECandidate
|
||||
lock sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
connected bool
|
||||
paused bool
|
||||
conn *websocket.Conn
|
||||
PeerConn *webrtc.PeerConnection
|
||||
localTracks []*webrtc.Track
|
||||
lock sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
connected bool
|
||||
iceConnected bool
|
||||
paused bool
|
||||
|
||||
// pending actions to start after connected to peer
|
||||
pendingCandidates []*webrtc.ICECandidate
|
||||
pendingTrackWriters []*TrackWriter
|
||||
|
||||
// navigate log ring buffer. saving the last N entries
|
||||
writer *ring.Ring
|
||||
@@ -41,7 +49,17 @@ var (
|
||||
},
|
||||
},
|
||||
}
|
||||
maxLogs = 256
|
||||
maxLogs = 256
|
||||
extFormatMapping = map[string]string{
|
||||
".ivf": webrtc.VP8,
|
||||
".h264": webrtc.H264,
|
||||
".ogg": webrtc.Opus,
|
||||
}
|
||||
payloadTypes = map[string]int{
|
||||
webrtc.VP8: webrtc.DefaultPayloadTypeVP8,
|
||||
webrtc.H264: webrtc.DefaultPayloadTypeH264,
|
||||
webrtc.Opus: webrtc.DefaultPayloadTypeOpus,
|
||||
}
|
||||
)
|
||||
|
||||
func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
|
||||
@@ -56,6 +74,7 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
|
||||
conn: conn,
|
||||
lock: sync.Mutex{},
|
||||
pendingCandidates: make([]*webrtc.ICECandidate, 0),
|
||||
localTracks: make([]*webrtc.Track, 0),
|
||||
reader: logRing,
|
||||
writer: logRing,
|
||||
PeerConn: peerConn,
|
||||
@@ -88,11 +107,29 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
|
||||
})
|
||||
|
||||
peerConn.OnNegotiationNeeded(func() {
|
||||
c.AppendLog("negotiate needed")
|
||||
// TODO: negotiate
|
||||
})
|
||||
|
||||
peerConn.OnDataChannel(func(channel *webrtc.DataChannel) {
|
||||
})
|
||||
|
||||
peerConn.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
|
||||
c.AppendLog("ICE state has changed", "state", connectionState.String())
|
||||
if connectionState == webrtc.ICEConnectionStateConnected {
|
||||
// flush peers
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
for _, tw := range c.pendingTrackWriters {
|
||||
if err := tw.Start(); err != nil {
|
||||
c.AppendLog("track writer error", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.iceConnected = true
|
||||
}
|
||||
})
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -169,7 +206,7 @@ func (c *RTCClient) Run() error {
|
||||
c.pendingCandidates = nil
|
||||
c.lock.Unlock()
|
||||
case *livekit.SignalResponse_Negotiate:
|
||||
// TBD
|
||||
c.AppendLog("remote wants to negotiate")
|
||||
case *livekit.SignalResponse_Trickle:
|
||||
candidateInit := service.FromProtoTrickle(msg.Trickle)
|
||||
c.AppendLog("adding remote candidate", "candidate", candidateInit.Candidate)
|
||||
@@ -217,6 +254,7 @@ func (c *RTCClient) ReadResponse() (*livekit.SignalResponse, error) {
|
||||
}
|
||||
|
||||
func (c *RTCClient) Stop() {
|
||||
c.conn.Close()
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
@@ -247,6 +285,41 @@ func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (c *RTCClient) AddTrack(path string, codecType webrtc.RTPCodecType, id string, label string) error {
|
||||
// determine file type
|
||||
format, ok := extFormatMapping[filepath.Ext(path)]
|
||||
if !ok {
|
||||
return fmt.Errorf("%s has an unsupported extension", filepath.Base(path))
|
||||
}
|
||||
payloadType := uint8(payloadTypes[format])
|
||||
|
||||
logger.GetLogger().Infow("adding track",
|
||||
"format", format,
|
||||
)
|
||||
track, err := c.PeerConn.NewTrack(payloadType, randutil.NewMathRandomGenerator().Uint32(), id, label)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.localTracks = append(c.localTracks, track)
|
||||
|
||||
if _, err := c.PeerConn.AddTrack(track); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tw := NewTrackWriter(c.ctx, track, path, format)
|
||||
// write tracks only after ICE connectivity
|
||||
if c.iceConnected {
|
||||
return tw.Start()
|
||||
}
|
||||
|
||||
c.pendingTrackWriters = append(c.pendingTrackWriters, tw)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type logEntry struct {
|
||||
msg string
|
||||
args []interface{}
|
||||
|
||||
130
cmd/cli/client/trackwriter.go
Normal file
130
cmd/cli/client/trackwriter.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v3"
|
||||
"github.com/pion/webrtc/v3/pkg/media"
|
||||
"github.com/pion/webrtc/v3/pkg/media/h264reader"
|
||||
"github.com/pion/webrtc/v3/pkg/media/ivfreader"
|
||||
"github.com/pion/webrtc/v3/pkg/media/oggreader"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/logger"
|
||||
)
|
||||
|
||||
type TrackWriter struct {
|
||||
ctx context.Context
|
||||
track *webrtc.Track
|
||||
filePath string
|
||||
format string
|
||||
|
||||
ogg *oggreader.OggReader
|
||||
ivfheader *ivfreader.IVFFileHeader
|
||||
ivf *ivfreader.IVFReader
|
||||
h264 *h264reader.H264Reader
|
||||
}
|
||||
|
||||
func NewTrackWriter(ctx context.Context, track *webrtc.Track, filePath string, format string) *TrackWriter {
|
||||
return &TrackWriter{
|
||||
ctx: ctx,
|
||||
track: track,
|
||||
filePath: filePath,
|
||||
format: format,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *TrackWriter) Start() error {
|
||||
file, err := os.Open(w.filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.GetLogger().Infow("starting track writer",
|
||||
"track", w.track.ID(),
|
||||
"format", w.format)
|
||||
switch w.format {
|
||||
case webrtc.Opus:
|
||||
w.ogg, _, err = oggreader.NewWith(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go w.writeOgg()
|
||||
case webrtc.VP8:
|
||||
w.ivf, w.ivfheader, err = ivfreader.NewWith(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go w.writeVP8()
|
||||
case webrtc.H264:
|
||||
w.h264, err = h264reader.NewReader(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go w.writeH264()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *TrackWriter) writeOgg() {
|
||||
// Keep track of last granule, the difference is the amount of samples in the buffer
|
||||
var lastGranule uint64
|
||||
for {
|
||||
pageData, pageHeader, err := w.ogg.ParseNextPage()
|
||||
if err == io.EOF {
|
||||
logger.GetLogger().Infow("all audio samples parsed and sent")
|
||||
w.onWriteComplete()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.GetLogger().Errorw("could not parse ogg page", "err", err)
|
||||
}
|
||||
|
||||
// The amount of samples is the difference between the last and current timestamp
|
||||
sampleCount := float64(pageHeader.GranulePosition - lastGranule)
|
||||
lastGranule = pageHeader.GranulePosition
|
||||
|
||||
if err = w.track.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); err != nil {
|
||||
logger.GetLogger().Errorw("could not write sample", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Convert seconds to Milliseconds, Sleep doesn't accept floats
|
||||
time.Sleep(time.Duration((sampleCount/48000)*1000) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *TrackWriter) writeVP8() {
|
||||
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
|
||||
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
|
||||
sleepTime := time.Millisecond * time.Duration((float32(w.ivfheader.TimebaseNumerator)/float32(w.ivfheader.TimebaseDenominator))*1000)
|
||||
for {
|
||||
frame, _, err := w.ivf.ParseNextFrame()
|
||||
if err == io.EOF {
|
||||
logger.GetLogger().Infow("all video frames parsed and sent")
|
||||
w.onWriteComplete()
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.GetLogger().Errorw("could not parse VP8 frame", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(sleepTime)
|
||||
if err = w.track.WriteSample(media.Sample{Data: frame, Samples: 90000}); err != nil {
|
||||
logger.GetLogger().Errorw("could not write sample", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *TrackWriter) writeH264() {
|
||||
// TODO: this is harder
|
||||
}
|
||||
|
||||
func (w *TrackWriter) onWriteComplete() {
|
||||
|
||||
}
|
||||
@@ -1,9 +1,18 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/manifoldco/promptui"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"github.com/livekit/livekit-server/cmd/cli/client"
|
||||
@@ -42,7 +51,9 @@ func joinRoom(c *cli.Context) error {
|
||||
v.Set("peer_id", c.String("peer-id"))
|
||||
u.RawQuery = v.Encode()
|
||||
|
||||
logger.GetLogger().Infow("connecting to Websocket signal", "url", u.String())
|
||||
log := logger.GetLogger()
|
||||
|
||||
log.Infow("connecting to Websocket signal", "url", u.String())
|
||||
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -54,7 +65,122 @@ func joinRoom(c *cli.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: input loop to detect user commands
|
||||
// signal to stop client
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
go func() {
|
||||
sig := <-sigChan
|
||||
log.Infow("exit requested, shutting down", "signal", sig)
|
||||
rc.Stop()
|
||||
}()
|
||||
|
||||
// start loop to detect input
|
||||
go func() {
|
||||
for {
|
||||
r := bufio.NewReader(os.Stdin)
|
||||
_, _, err := r.ReadLine()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// pause client output and wait
|
||||
rc.PauseLogs()
|
||||
|
||||
err = handleCommand(rc)
|
||||
if err != nil {
|
||||
log.Errorw("could not handle command", "err", err)
|
||||
}
|
||||
|
||||
rc.ResumeLogs()
|
||||
}
|
||||
}()
|
||||
|
||||
return rc.Run()
|
||||
}
|
||||
|
||||
func handleCommand(rc *client.RTCClient) error {
|
||||
cmdPrompt := promptui.Select{
|
||||
Label: "select command",
|
||||
Items: []string{
|
||||
"add video",
|
||||
"add audio",
|
||||
//"add data",
|
||||
//"send data",
|
||||
"remove track",
|
||||
},
|
||||
}
|
||||
|
||||
idx, _, err := cmdPrompt.Run()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch idx {
|
||||
case 0:
|
||||
return handleAddMedia(rc, false)
|
||||
case 1:
|
||||
return handleAddMedia(rc, true)
|
||||
default:
|
||||
return errors.New("unimplemented command")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleAddMedia(rc *client.RTCClient, isAudio bool) error {
|
||||
fileType := "vp8, h264"
|
||||
codecType := webrtc.RTPCodecTypeVideo
|
||||
if isAudio {
|
||||
fileType = "opus"
|
||||
codecType = webrtc.RTPCodecTypeAudio
|
||||
}
|
||||
// get media location
|
||||
p := promptui.Prompt{
|
||||
Label: fmt.Sprintf("media path (%s)", fileType),
|
||||
Validate: func(s string) error {
|
||||
s = ExpandUser(s)
|
||||
// ensure it exists
|
||||
st, err := os.Stat(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if st.IsDir() {
|
||||
return errors.New("cannot be a directory")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
mediaPath, err := p.Run()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mediaPath = ExpandUser(mediaPath)
|
||||
|
||||
// TODO: see what the ID should be
|
||||
err = rc.AddTrack(mediaPath, codecType, filepath.Base(mediaPath), "livekit")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("added %s track: %s\n", codecType.String(), mediaPath)
|
||||
|
||||
if isAudio {
|
||||
return nil
|
||||
}
|
||||
|
||||
// for video files, also look for the .ogg at the same path
|
||||
videoExt := filepath.Ext(mediaPath)
|
||||
if len(videoExt) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
audioPath := mediaPath[0:len(mediaPath)-len(videoExt)] + ".ogg"
|
||||
if _, err = os.Stat(audioPath); err == nil {
|
||||
err = rc.AddTrack(audioPath, webrtc.RTPCodecTypeAudio, filepath.Base(audioPath), "livekit")
|
||||
if err != nil {
|
||||
fmt.Printf("added audio track: %s\n", audioPath)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -3,6 +3,9 @@ package commands
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
@@ -26,3 +29,12 @@ func PrintJSON(obj interface{}) {
|
||||
txt, _ := json.Marshal(obj)
|
||||
fmt.Println(string(txt))
|
||||
}
|
||||
|
||||
func ExpandUser(p string) string {
|
||||
if strings.HasPrefix(p, "~") {
|
||||
home, _ := os.UserHomeDir()
|
||||
return filepath.Join(home, p[1:])
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -8,7 +8,9 @@ require (
|
||||
github.com/google/uuid v1.1.2
|
||||
github.com/google/wire v0.4.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/manifoldco/promptui v0.8.0
|
||||
github.com/pion/ion-sfu v1.0.27
|
||||
github.com/pion/randutil v0.1.0
|
||||
github.com/pion/rtcp v1.2.4
|
||||
github.com/pion/rtp v1.6.1
|
||||
github.com/pion/sdp/v3 v3.0.2
|
||||
|
||||
16
go.sum
16
go.sum
@@ -38,6 +38,12 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
|
||||
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
|
||||
github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
|
||||
@@ -159,6 +165,8 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a h1:FaWFmfWdAUKbSCtOU2QjDaorUexogfaMgbipgYATUMU=
|
||||
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
@@ -176,9 +184,13 @@ github.com/lucas-clemente/quic-go v0.18.1 h1:DMR7guC0NtVS8zNZR3IO7NARZvZygkSC56G
|
||||
github.com/lucas-clemente/quic-go v0.18.1/go.mod h1:yXttHsSNxQi8AWijC/vLP+OJczXqzHSOcJrM5ITUlCg=
|
||||
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
|
||||
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
|
||||
github.com/lunixbochs/vtclean v0.0.0-20180621232353-2d01aacdc34a/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
|
||||
github.com/lunixbochs/vtclean v1.0.0 h1:xu2sLAri4lGiovBDQKxl5mrXyESr3gUr5m5SM5+LVb8=
|
||||
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
|
||||
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
|
||||
github.com/manifoldco/promptui v0.8.0 h1:R95mMF+McvXZQ7j1g8ucVZE1gLP3Sv6j9vlF9kyRqQo=
|
||||
github.com/manifoldco/promptui v0.8.0/go.mod h1:n4zTdgP0vr0S3w7/O/g98U+e0gwLScEXGwov2nIKuGQ=
|
||||
github.com/marten-seemann/qpack v0.2.0/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc=
|
||||
github.com/marten-seemann/qtls v0.10.0 h1:ECsuYUKalRL240rRD4Ri33ISb7kAQ3qGDlrrl55b2pc=
|
||||
github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs=
|
||||
@@ -186,8 +198,11 @@ github.com/marten-seemann/qtls-go1-15 v0.1.0 h1:i/YPXVxz8q9umso/5y474CNcHmTpA+5D
|
||||
github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
|
||||
github.com/marten-seemann/qtls-go1-15 v0.1.1 h1:LIH6K34bPVttyXnUWixk0bzH6/N07VxbSabxn5A5gZQ=
|
||||
github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I=
|
||||
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs=
|
||||
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
@@ -450,6 +465,7 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
|
||||
golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190316082340-a2f829d7f35f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
||||
@@ -83,6 +83,10 @@ func NewWebRTCPeer(id string, me *MediaEngine, conf WebRTCConfig) (*WebRTCPeer,
|
||||
}
|
||||
})
|
||||
|
||||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||
logger.GetLogger().Debugw("ICE connection state changed", "state", state.String())
|
||||
})
|
||||
|
||||
// TODO: handle data channel
|
||||
|
||||
return peer, nil
|
||||
@@ -169,6 +173,7 @@ func (p *WebRTCPeer) RemoveSubscriber(peerId string) {
|
||||
|
||||
// when a new track is created, creates a PeerTrack and adds it to room
|
||||
func (p *WebRTCPeer) onTrack(track *webrtc.Track, rtpReceiver *webrtc.RTPReceiver) {
|
||||
logger.GetLogger().Debugw("track added", "peerId", p.ID(), "track", track.Label())
|
||||
|
||||
// create Receiver
|
||||
receiver := NewReceiver(p.ctx, p.id, rtpReceiver, p.receiverConfig, p.mediaEngine)
|
||||
@@ -178,6 +183,7 @@ func (p *WebRTCPeer) onTrack(track *webrtc.Track, rtpReceiver *webrtc.RTPReceive
|
||||
p.tracks = append(p.tracks, pt)
|
||||
p.lock.Unlock()
|
||||
|
||||
pt.Start()
|
||||
if p.OnPeerTrack != nil {
|
||||
// caller should hook up what happens when the peer track is available
|
||||
go p.OnPeerTrack(p, pt)
|
||||
|
||||
@@ -43,6 +43,11 @@ func NewPeerTrack(ctx context.Context, peerId string, rtcpWriter RTCPWriter, tra
|
||||
}
|
||||
}
|
||||
|
||||
func (t *PeerTrack) Start() {
|
||||
// start worker
|
||||
go t.forwardWorker()
|
||||
}
|
||||
|
||||
// subscribes peer to current track
|
||||
// creates and add necessary forwarders and starts them
|
||||
func (t *PeerTrack) AddSubscriber(peer *WebRTCPeer) error {
|
||||
@@ -100,7 +105,14 @@ func (t *PeerTrack) RemoveSubscriber(peerId string) {
|
||||
// forwardWorker reads from the receiver and writes to each sender
|
||||
func (t *PeerTrack) forwardWorker() {
|
||||
for pkt := range t.receiver.RTPChan() {
|
||||
if t.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
|
||||
logger.GetLogger().Debugw("read packet from track",
|
||||
"peerId", t.peerId,
|
||||
"track", t.track.ID())
|
||||
t.lock.RLock()
|
||||
for dstPeerId, forwarder := range t.forwarders {
|
||||
// There exists a bug in chrome where setLocalDescription
|
||||
|
||||
@@ -144,6 +144,7 @@ func (s *RTCService) handleJoin(sc SignalConnection, room *rtc.Room, sdp string)
|
||||
|
||||
// TODO: it might be better to return error instead of nil
|
||||
peer.OnICECandidate = func(c *webrtc.ICECandidateInit) {
|
||||
log.Debugw("sending ICE candidate", "peerId", peer.ID())
|
||||
err = sc.WriteResponse(&livekit.SignalResponse{
|
||||
Message: &livekit.SignalResponse_Trickle{
|
||||
Trickle: &livekit.Trickle{
|
||||
@@ -153,7 +154,7 @@ func (s *RTCService) handleJoin(sc SignalConnection, room *rtc.Room, sdp string)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logger.GetLogger().Errorw("could not send trickle", "err", err)
|
||||
log.Errorw("could not send trickle", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,6 +223,7 @@ func (s *RTCService) handleNegotiate(sc SignalConnection, peer *rtc.WebRTCPeer,
|
||||
|
||||
func (s *RTCService) handleTrickle(peer *rtc.WebRTCPeer, trickle *livekit.Trickle) error {
|
||||
candidateInit := FromProtoTrickle(trickle)
|
||||
logger.GetLogger().Debugw("adding peer candidate", "peerId", peer.ID())
|
||||
if err := peer.AddICECandidate(*candidateInit); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user