diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index a49c67f58..234bd1e4c 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -3,11 +3,14 @@ package client import ( "container/ring" "context" + "fmt" "io" + "path/filepath" "sync" "time" "github.com/gorilla/websocket" + "github.com/pion/randutil" "github.com/pion/webrtc/v3" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -18,14 +21,19 @@ import ( ) type RTCClient struct { - conn *websocket.Conn - PeerConn *webrtc.PeerConnection - pendingCandidates []*webrtc.ICECandidate - lock sync.Mutex - ctx context.Context - cancel context.CancelFunc - connected bool - paused bool + conn *websocket.Conn + PeerConn *webrtc.PeerConnection + localTracks []*webrtc.Track + lock sync.Mutex + ctx context.Context + cancel context.CancelFunc + connected bool + iceConnected bool + paused bool + + // pending actions to start after connected to peer + pendingCandidates []*webrtc.ICECandidate + pendingTrackWriters []*TrackWriter // navigate log ring buffer. saving the last N entries writer *ring.Ring @@ -41,7 +49,17 @@ var ( }, }, } - maxLogs = 256 + maxLogs = 256 + extFormatMapping = map[string]string{ + ".ivf": webrtc.VP8, + ".h264": webrtc.H264, + ".ogg": webrtc.Opus, + } + payloadTypes = map[string]int{ + webrtc.VP8: webrtc.DefaultPayloadTypeVP8, + webrtc.H264: webrtc.DefaultPayloadTypeH264, + webrtc.Opus: webrtc.DefaultPayloadTypeOpus, + } ) func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { @@ -56,6 +74,7 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { conn: conn, lock: sync.Mutex{}, pendingCandidates: make([]*webrtc.ICECandidate, 0), + localTracks: make([]*webrtc.Track, 0), reader: logRing, writer: logRing, PeerConn: peerConn, @@ -88,11 +107,29 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { }) peerConn.OnNegotiationNeeded(func() { + c.AppendLog("negotiate needed") + // TODO: negotiate }) peerConn.OnDataChannel(func(channel *webrtc.DataChannel) { }) + peerConn.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { + c.AppendLog("ICE state has changed", "state", connectionState.String()) + if connectionState == webrtc.ICEConnectionStateConnected { + // flush peers + c.lock.Lock() + defer c.lock.Unlock() + for _, tw := range c.pendingTrackWriters { + if err := tw.Start(); err != nil { + c.AppendLog("track writer error", "err", err) + } + } + + c.iceConnected = true + } + }) + return c, nil } @@ -169,7 +206,7 @@ func (c *RTCClient) Run() error { c.pendingCandidates = nil c.lock.Unlock() case *livekit.SignalResponse_Negotiate: - // TBD + c.AppendLog("remote wants to negotiate") case *livekit.SignalResponse_Trickle: candidateInit := service.FromProtoTrickle(msg.Trickle) c.AppendLog("adding remote candidate", "candidate", candidateInit.Candidate) @@ -217,6 +254,7 @@ func (c *RTCClient) ReadResponse() (*livekit.SignalResponse, error) { } func (c *RTCClient) Stop() { + c.conn.Close() c.cancel() } @@ -247,6 +285,41 @@ func (c *RTCClient) SendIceCandidate(ic *webrtc.ICECandidate) error { }) } +func (c *RTCClient) AddTrack(path string, codecType webrtc.RTPCodecType, id string, label string) error { + // determine file type + format, ok := extFormatMapping[filepath.Ext(path)] + if !ok { + return fmt.Errorf("%s has an unsupported extension", filepath.Base(path)) + } + payloadType := uint8(payloadTypes[format]) + + logger.GetLogger().Infow("adding track", + "format", format, + ) + track, err := c.PeerConn.NewTrack(payloadType, randutil.NewMathRandomGenerator().Uint32(), id, label) + if err != nil { + return err + } + + c.lock.Lock() + defer c.lock.Unlock() + c.localTracks = append(c.localTracks, track) + + if _, err := c.PeerConn.AddTrack(track); err != nil { + return err + } + + tw := NewTrackWriter(c.ctx, track, path, format) + // write tracks only after ICE connectivity + if c.iceConnected { + return tw.Start() + } + + c.pendingTrackWriters = append(c.pendingTrackWriters, tw) + + return nil +} + type logEntry struct { msg string args []interface{} diff --git a/cmd/cli/client/trackwriter.go b/cmd/cli/client/trackwriter.go new file mode 100644 index 000000000..68cbdd85d --- /dev/null +++ b/cmd/cli/client/trackwriter.go @@ -0,0 +1,130 @@ +package client + +import ( + "context" + "io" + "os" + "time" + + "github.com/pion/webrtc/v3" + "github.com/pion/webrtc/v3/pkg/media" + "github.com/pion/webrtc/v3/pkg/media/h264reader" + "github.com/pion/webrtc/v3/pkg/media/ivfreader" + "github.com/pion/webrtc/v3/pkg/media/oggreader" + + "github.com/livekit/livekit-server/pkg/logger" +) + +type TrackWriter struct { + ctx context.Context + track *webrtc.Track + filePath string + format string + + ogg *oggreader.OggReader + ivfheader *ivfreader.IVFFileHeader + ivf *ivfreader.IVFReader + h264 *h264reader.H264Reader +} + +func NewTrackWriter(ctx context.Context, track *webrtc.Track, filePath string, format string) *TrackWriter { + return &TrackWriter{ + ctx: ctx, + track: track, + filePath: filePath, + format: format, + } +} + +func (w *TrackWriter) Start() error { + file, err := os.Open(w.filePath) + if err != nil { + return err + } + + logger.GetLogger().Infow("starting track writer", + "track", w.track.ID(), + "format", w.format) + switch w.format { + case webrtc.Opus: + w.ogg, _, err = oggreader.NewWith(file) + if err != nil { + return err + } + go w.writeOgg() + case webrtc.VP8: + w.ivf, w.ivfheader, err = ivfreader.NewWith(file) + if err != nil { + return err + } + go w.writeVP8() + case webrtc.H264: + w.h264, err = h264reader.NewReader(file) + if err != nil { + return err + } + go w.writeH264() + } + return nil +} + +func (w *TrackWriter) writeOgg() { + // Keep track of last granule, the difference is the amount of samples in the buffer + var lastGranule uint64 + for { + pageData, pageHeader, err := w.ogg.ParseNextPage() + if err == io.EOF { + logger.GetLogger().Infow("all audio samples parsed and sent") + w.onWriteComplete() + } + + if err != nil { + logger.GetLogger().Errorw("could not parse ogg page", "err", err) + } + + // The amount of samples is the difference between the last and current timestamp + sampleCount := float64(pageHeader.GranulePosition - lastGranule) + lastGranule = pageHeader.GranulePosition + + if err = w.track.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); err != nil { + logger.GetLogger().Errorw("could not write sample", "err", err) + return + } + + // Convert seconds to Milliseconds, Sleep doesn't accept floats + time.Sleep(time.Duration((sampleCount/48000)*1000) * time.Millisecond) + } +} + +func (w *TrackWriter) writeVP8() { + // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. + // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. + sleepTime := time.Millisecond * time.Duration((float32(w.ivfheader.TimebaseNumerator)/float32(w.ivfheader.TimebaseDenominator))*1000) + for { + frame, _, err := w.ivf.ParseNextFrame() + if err == io.EOF { + logger.GetLogger().Infow("all video frames parsed and sent") + w.onWriteComplete() + return + } + + if err != nil { + logger.GetLogger().Errorw("could not parse VP8 frame", "err", err) + return + } + + time.Sleep(sleepTime) + if err = w.track.WriteSample(media.Sample{Data: frame, Samples: 90000}); err != nil { + logger.GetLogger().Errorw("could not write sample", "err", err) + return + } + } +} + +func (w *TrackWriter) writeH264() { + // TODO: this is harder +} + +func (w *TrackWriter) onWriteComplete() { + +} diff --git a/cmd/cli/commands/rtc.go b/cmd/cli/commands/rtc.go index eaf64e442..7f91a1f5f 100644 --- a/cmd/cli/commands/rtc.go +++ b/cmd/cli/commands/rtc.go @@ -1,9 +1,18 @@ package commands import ( + "bufio" + "errors" + "fmt" "net/url" + "os" + "os/signal" + "path/filepath" + "syscall" "github.com/gorilla/websocket" + "github.com/manifoldco/promptui" + "github.com/pion/webrtc/v3" "github.com/urfave/cli/v2" "github.com/livekit/livekit-server/cmd/cli/client" @@ -42,7 +51,9 @@ func joinRoom(c *cli.Context) error { v.Set("peer_id", c.String("peer-id")) u.RawQuery = v.Encode() - logger.GetLogger().Infow("connecting to Websocket signal", "url", u.String()) + log := logger.GetLogger() + + log.Infow("connecting to Websocket signal", "url", u.String()) conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { return err @@ -54,7 +65,122 @@ func joinRoom(c *cli.Context) error { return err } - // TODO: input loop to detect user commands + // signal to stop client + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + + go func() { + sig := <-sigChan + log.Infow("exit requested, shutting down", "signal", sig) + rc.Stop() + }() + + // start loop to detect input + go func() { + for { + r := bufio.NewReader(os.Stdin) + _, _, err := r.ReadLine() + if err != nil { + return + } + + // pause client output and wait + rc.PauseLogs() + + err = handleCommand(rc) + if err != nil { + log.Errorw("could not handle command", "err", err) + } + + rc.ResumeLogs() + } + }() return rc.Run() } + +func handleCommand(rc *client.RTCClient) error { + cmdPrompt := promptui.Select{ + Label: "select command", + Items: []string{ + "add video", + "add audio", + //"add data", + //"send data", + "remove track", + }, + } + + idx, _, err := cmdPrompt.Run() + if err != nil { + return err + } + + switch idx { + case 0: + return handleAddMedia(rc, false) + case 1: + return handleAddMedia(rc, true) + default: + return errors.New("unimplemented command") + } + return nil +} + +func handleAddMedia(rc *client.RTCClient, isAudio bool) error { + fileType := "vp8, h264" + codecType := webrtc.RTPCodecTypeVideo + if isAudio { + fileType = "opus" + codecType = webrtc.RTPCodecTypeAudio + } + // get media location + p := promptui.Prompt{ + Label: fmt.Sprintf("media path (%s)", fileType), + Validate: func(s string) error { + s = ExpandUser(s) + // ensure it exists + st, err := os.Stat(s) + if err != nil { + return err + } + if st.IsDir() { + return errors.New("cannot be a directory") + } + return nil + }, + } + + mediaPath, err := p.Run() + if err != nil { + return err + } + + mediaPath = ExpandUser(mediaPath) + + // TODO: see what the ID should be + err = rc.AddTrack(mediaPath, codecType, filepath.Base(mediaPath), "livekit") + if err != nil { + return err + } + fmt.Printf("added %s track: %s\n", codecType.String(), mediaPath) + + if isAudio { + return nil + } + + // for video files, also look for the .ogg at the same path + videoExt := filepath.Ext(mediaPath) + if len(videoExt) == 0 { + return nil + } + + audioPath := mediaPath[0:len(mediaPath)-len(videoExt)] + ".ogg" + if _, err = os.Stat(audioPath); err == nil { + err = rc.AddTrack(audioPath, webrtc.RTPCodecTypeAudio, filepath.Base(audioPath), "livekit") + if err != nil { + fmt.Printf("added audio track: %s\n", audioPath) + } + } + return err +} diff --git a/cmd/cli/commands/utils.go b/cmd/cli/commands/utils.go index 73a02172d..0fae82411 100644 --- a/cmd/cli/commands/utils.go +++ b/cmd/cli/commands/utils.go @@ -3,6 +3,9 @@ package commands import ( "encoding/json" "fmt" + "os" + "path/filepath" + "strings" "github.com/urfave/cli/v2" ) @@ -26,3 +29,12 @@ func PrintJSON(obj interface{}) { txt, _ := json.Marshal(obj) fmt.Println(string(txt)) } + +func ExpandUser(p string) string { + if strings.HasPrefix(p, "~") { + home, _ := os.UserHomeDir() + return filepath.Join(home, p[1:]) + } + + return p +} diff --git a/go.mod b/go.mod index c53c9d308..f59e2ad15 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,9 @@ require ( github.com/google/uuid v1.1.2 github.com/google/wire v0.4.0 github.com/gorilla/websocket v1.4.2 + github.com/manifoldco/promptui v0.8.0 github.com/pion/ion-sfu v1.0.27 + github.com/pion/randutil v0.1.0 github.com/pion/rtcp v1.2.4 github.com/pion/rtp v1.6.1 github.com/pion/sdp/v3 v3.0.2 diff --git a/go.sum b/go.sum index 93ca5d18e..0ea16ee7a 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,12 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -159,6 +165,8 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a h1:FaWFmfWdAUKbSCtOU2QjDaorUexogfaMgbipgYATUMU= +github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -176,9 +184,13 @@ github.com/lucas-clemente/quic-go v0.18.1 h1:DMR7guC0NtVS8zNZR3IO7NARZvZygkSC56G github.com/lucas-clemente/quic-go v0.18.1/go.mod h1:yXttHsSNxQi8AWijC/vLP+OJczXqzHSOcJrM5ITUlCg= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= +github.com/lunixbochs/vtclean v0.0.0-20180621232353-2d01aacdc34a/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= +github.com/lunixbochs/vtclean v1.0.0 h1:xu2sLAri4lGiovBDQKxl5mrXyESr3gUr5m5SM5+LVb8= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/manifoldco/promptui v0.8.0 h1:R95mMF+McvXZQ7j1g8ucVZE1gLP3Sv6j9vlF9kyRqQo= +github.com/manifoldco/promptui v0.8.0/go.mod h1:n4zTdgP0vr0S3w7/O/g98U+e0gwLScEXGwov2nIKuGQ= github.com/marten-seemann/qpack v0.2.0/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc= github.com/marten-seemann/qtls v0.10.0 h1:ECsuYUKalRL240rRD4Ri33ISb7kAQ3qGDlrrl55b2pc= github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs= @@ -186,8 +198,11 @@ github.com/marten-seemann/qtls-go1-15 v0.1.0 h1:i/YPXVxz8q9umso/5y474CNcHmTpA+5D github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= github.com/marten-seemann/qtls-go1-15 v0.1.1 h1:LIH6K34bPVttyXnUWixk0bzH6/N07VxbSabxn5A5gZQ= github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= +github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= +github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -450,6 +465,7 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190316082340-a2f829d7f35f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/rtc/peer.go b/pkg/rtc/peer.go index eee9238ee..f494ab770 100644 --- a/pkg/rtc/peer.go +++ b/pkg/rtc/peer.go @@ -83,6 +83,10 @@ func NewWebRTCPeer(id string, me *MediaEngine, conf WebRTCConfig) (*WebRTCPeer, } }) + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + logger.GetLogger().Debugw("ICE connection state changed", "state", state.String()) + }) + // TODO: handle data channel return peer, nil @@ -169,6 +173,7 @@ func (p *WebRTCPeer) RemoveSubscriber(peerId string) { // when a new track is created, creates a PeerTrack and adds it to room func (p *WebRTCPeer) onTrack(track *webrtc.Track, rtpReceiver *webrtc.RTPReceiver) { + logger.GetLogger().Debugw("track added", "peerId", p.ID(), "track", track.Label()) // create Receiver receiver := NewReceiver(p.ctx, p.id, rtpReceiver, p.receiverConfig, p.mediaEngine) @@ -178,6 +183,7 @@ func (p *WebRTCPeer) onTrack(track *webrtc.Track, rtpReceiver *webrtc.RTPReceive p.tracks = append(p.tracks, pt) p.lock.Unlock() + pt.Start() if p.OnPeerTrack != nil { // caller should hook up what happens when the peer track is available go p.OnPeerTrack(p, pt) diff --git a/pkg/rtc/track.go b/pkg/rtc/track.go index efa0f4281..fe24da47d 100644 --- a/pkg/rtc/track.go +++ b/pkg/rtc/track.go @@ -43,6 +43,11 @@ func NewPeerTrack(ctx context.Context, peerId string, rtcpWriter RTCPWriter, tra } } +func (t *PeerTrack) Start() { + // start worker + go t.forwardWorker() +} + // subscribes peer to current track // creates and add necessary forwarders and starts them func (t *PeerTrack) AddSubscriber(peer *WebRTCPeer) error { @@ -100,7 +105,14 @@ func (t *PeerTrack) RemoveSubscriber(peerId string) { // forwardWorker reads from the receiver and writes to each sender func (t *PeerTrack) forwardWorker() { for pkt := range t.receiver.RTPChan() { + if t.ctx.Err() != nil { + return + } now := time.Now() + + logger.GetLogger().Debugw("read packet from track", + "peerId", t.peerId, + "track", t.track.ID()) t.lock.RLock() for dstPeerId, forwarder := range t.forwarders { // There exists a bug in chrome where setLocalDescription diff --git a/pkg/service/rtc.go b/pkg/service/rtc.go index 0aad5210c..c1bff9e8f 100644 --- a/pkg/service/rtc.go +++ b/pkg/service/rtc.go @@ -144,6 +144,7 @@ func (s *RTCService) handleJoin(sc SignalConnection, room *rtc.Room, sdp string) // TODO: it might be better to return error instead of nil peer.OnICECandidate = func(c *webrtc.ICECandidateInit) { + log.Debugw("sending ICE candidate", "peerId", peer.ID()) err = sc.WriteResponse(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Trickle{ Trickle: &livekit.Trickle{ @@ -153,7 +154,7 @@ func (s *RTCService) handleJoin(sc SignalConnection, room *rtc.Room, sdp string) }, }) if err != nil { - logger.GetLogger().Errorw("could not send trickle", "err", err) + log.Errorw("could not send trickle", "err", err) } } @@ -222,6 +223,7 @@ func (s *RTCService) handleNegotiate(sc SignalConnection, peer *rtc.WebRTCPeer, func (s *RTCService) handleTrickle(peer *rtc.WebRTCPeer, trickle *livekit.Trickle) error { candidateInit := FromProtoTrickle(trickle) + logger.GetLogger().Debugw("adding peer candidate", "peerId", peer.ID()) if err := peer.AddICECandidate(*candidateInit); err != nil { return err }