updated cli to v3

This commit is contained in:
David Zhao
2020-12-12 22:07:06 -08:00
parent 7d7787590c
commit 4c9623bb49
4 changed files with 54 additions and 60 deletions
+34 -44
View File
@@ -10,7 +10,6 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/pion/randutil"
"github.com/pion/webrtc/v3"
"github.com/pkg/errors"
"google.golang.org/protobuf/encoding/protojson"
@@ -24,14 +23,14 @@ import (
type RTCClient struct {
conn *websocket.Conn
PeerConn *webrtc.PeerConnection
localTracks []*webrtc.Track
localTracks []webrtc.TrackLocal
lock sync.Mutex
ctx context.Context
cancel context.CancelFunc
connected bool
iceConnected bool
paused bool
me *rtc.MediaEngine // optional, populated only when receiving tracks
me *webrtc.MediaEngine // optional, populated only when receiving tracks
receivers []*rtc.Receiver
localParticipant *livekit.ParticipantInfo
@@ -53,16 +52,11 @@ var (
},
},
}
maxLogs = 256
extFormatMapping = map[string]string{
".ivf": webrtc.VP8,
".h264": webrtc.H264,
".ogg": webrtc.Opus,
}
payloadTypes = map[string]int{
webrtc.VP8: webrtc.DefaultPayloadTypeVP8,
webrtc.H264: webrtc.DefaultPayloadTypeH264,
webrtc.Opus: webrtc.DefaultPayloadTypeOpus,
maxLogs = 256
extMimeMapping = map[string]string{
".ivf": webrtc.MimeTypeVP8,
".h264": webrtc.MimeTypeH264,
".ogg": webrtc.MimeTypeOpus,
}
)
@@ -78,7 +72,7 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
conn: conn,
lock: sync.Mutex{},
pendingCandidates: make([]*webrtc.ICECandidate, 0),
localTracks: make([]*webrtc.Track, 0),
localTracks: make([]webrtc.TrackLocal, 0),
reader: logRing,
writer: logRing,
PeerConn: peerConn,
@@ -104,14 +98,10 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) {
}
})
peerConn.OnTrack(func(track *webrtc.Track, rtpReceiver *webrtc.RTPReceiver) {
c.AppendLog("track received", "label", track.Label(), "id", track.ID())
peerConn.OnTrack(func(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
c.AppendLog("track received", "label", track.StreamID(), "id", track.ID())
peerId, _ := rtc.UnpackTrackId(track.ID())
tccExt := 0
if c.me != nil {
tccExt = c.me.TCCExt
}
r := rtc.NewReceiver(c.ctx, peerId, rtpReceiver, rtc.ReceiverConfig{}, tccExt)
r := rtc.NewReceiver(c.ctx, peerId, rtpReceiver, nil)
c.lock.Lock()
c.receivers = append(c.receivers, r)
r.Start()
@@ -356,8 +346,8 @@ func (c *RTCClient) handleNegotiate(desc webrtc.SessionDescription) error {
// if we received an offer, we'd have to answer
if desc.Type == webrtc.SDPTypeOffer {
// create media engine
c.me = &rtc.MediaEngine{}
if err := c.me.PopulateFromSDP(desc); err != nil {
c.me = &webrtc.MediaEngine{}
if err := c.me.RegisterDefaultCodecs(); err != nil {
return errors.Wrapf(err, "could not parse SDP")
}
@@ -380,18 +370,22 @@ func (c *RTCClient) handleNegotiate(desc webrtc.SessionDescription) error {
return nil
}
func (c *RTCClient) AddTrack(path string, codecType webrtc.RTPCodecType, id string, label string) error {
// determine file type
format, ok := extFormatMapping[filepath.Ext(path)]
func (c *RTCClient) AddTrack(path string, id string, label string) error {
// determine file mime
mime, ok := extMimeMapping[filepath.Ext(path)]
if !ok {
return fmt.Errorf("%s has an unsupported extension", filepath.Base(path))
}
payloadType := uint8(payloadTypes[format])
logger.GetLogger().Infow("adding track",
"format", format,
"mime", mime,
)
track, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: mime},
id,
label,
)
track, err := c.PeerConn.NewTrack(payloadType, randutil.NewMathRandomGenerator().Uint32(), id, label)
if err != nil {
return err
}
@@ -404,7 +398,7 @@ func (c *RTCClient) AddTrack(path string, codecType webrtc.RTPCodecType, id stri
return err
}
tw := NewTrackWriter(c.ctx, track, path, format)
tw := NewTrackWriter(c.ctx, track, path)
// write tracks only after ICE connectivity
if c.iceConnected {
@@ -457,22 +451,18 @@ func (c *RTCClient) consumeReceiver(r *rtc.Receiver) {
peerId, trackId := rtc.UnpackTrackId(r.TrackId())
numBytes := 0
for {
select {
case packet, ok := <-r.RTPChan():
if !ok {
// channel closed, we are done
return
}
numBytes += packet.MarshalSize()
if time.Now().Sub(lastUpdate) > 30*time.Second {
c.AppendLog("consumed from peer",
"track", trackId, "peer", peerId,
"size", numBytes)
lastUpdate = time.Now()
}
case <-c.ctx.Done():
pkt, err := r.ReadRTP()
if err == io.EOF || err == io.ErrClosedPipe {
// all done
return
}
numBytes += pkt.MarshalSize()
if time.Now().Sub(lastUpdate) > 30*time.Second {
c.AppendLog("consumed from peer",
"track", trackId, "peer", peerId,
"size", numBytes)
lastUpdate = time.Now()
}
if c.ctx.Err() != nil {
return
+13 -12
View File
@@ -19,9 +19,9 @@ import (
// makes it easier to debug and create RTP streams
type TrackWriter struct {
ctx context.Context
track *webrtc.Track
track *webrtc.TrackLocalStaticSample
filePath string
format string
mime string
ogg *oggreader.OggReader
ivfheader *ivfreader.IVFFileHeader
@@ -29,12 +29,12 @@ type TrackWriter struct {
h264 *h264reader.H264Reader
}
func NewTrackWriter(ctx context.Context, track *webrtc.Track, filePath string, format string) *TrackWriter {
func NewTrackWriter(ctx context.Context, track *webrtc.TrackLocalStaticSample, filePath string) *TrackWriter {
return &TrackWriter{
ctx: ctx,
track: track,
filePath: filePath,
format: format,
mime: track.Codec().MimeType,
}
}
@@ -46,21 +46,21 @@ func (w *TrackWriter) Start() error {
logger.GetLogger().Infow("starting track writer",
"track", w.track.ID(),
"format", w.format)
switch w.format {
case webrtc.Opus:
"mime", w.mime)
switch w.mime {
case webrtc.MimeTypeOpus:
w.ogg, _, err = oggreader.NewWith(file)
if err != nil {
return err
}
go w.writeOgg()
case webrtc.VP8:
case webrtc.MimeTypeVP8:
w.ivf, w.ivfheader, err = ivfreader.NewWith(file)
if err != nil {
return err
}
go w.writeVP8()
case webrtc.H264:
case webrtc.MimeTypeH264:
w.h264, err = h264reader.NewReader(file)
if err != nil {
return err
@@ -92,14 +92,15 @@ func (w *TrackWriter) writeOgg() {
// The amount of samples is the difference between the last and current timestamp
sampleCount := float64(pageHeader.GranulePosition - lastGranule)
lastGranule = pageHeader.GranulePosition
sampleDuration := time.Duration((sampleCount/48000)*1000) * time.Millisecond
if err = w.track.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); err != nil {
if err = w.track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil {
logger.GetLogger().Errorw("could not write sample", "err", err)
return
}
// Convert seconds to Milliseconds, Sleep doesn't accept floats
time.Sleep(time.Duration((sampleCount/48000)*1000) * time.Millisecond)
time.Sleep(sampleDuration)
}
}
@@ -124,7 +125,7 @@ func (w *TrackWriter) writeVP8() {
}
time.Sleep(sleepTime)
if err = w.track.WriteSample(media.Sample{Data: frame, Samples: 90000}); err != nil {
if err = w.track.WriteSample(media.Sample{Data: frame, Duration: time.Second}); err != nil {
logger.GetLogger().Errorw("could not write sample", "err", err)
return
}
+4 -4
View File
@@ -81,10 +81,10 @@ func joinRoom(c *cli.Context) error {
audioFile := c.String("audio")
videoFile := c.String("video")
if audioFile != "" {
rc.AddTrack(audioFile, webrtc.RTPCodecTypeAudio, "audio", filepath.Base(audioFile))
rc.AddTrack(audioFile, "audio", filepath.Base(audioFile))
}
if videoFile != "" {
rc.AddTrack(videoFile, webrtc.RTPCodecTypeVideo, "video", filepath.Base(videoFile))
rc.AddTrack(videoFile, "video", filepath.Base(videoFile))
}
// start loop to detect input
@@ -171,7 +171,7 @@ func handleAddMedia(rc *client.RTCClient, isAudio bool) error {
mediaPath = ExpandUser(mediaPath)
// TODO: see what the ID should be
err = rc.AddTrack(mediaPath, codecType, codecType.String(), filepath.Base(mediaPath))
err = rc.AddTrack(mediaPath, codecType.String(), filepath.Base(mediaPath))
if err != nil {
return err
}
@@ -189,7 +189,7 @@ func handleAddMedia(rc *client.RTCClient, isAudio bool) error {
audioPath := mediaPath[0:len(mediaPath)-len(videoExt)] + ".ogg"
if _, err = os.Stat(audioPath); err == nil {
err = rc.AddTrack(audioPath, webrtc.RTPCodecTypeAudio, codecType.String(), filepath.Base(audioPath))
err = rc.AddTrack(audioPath, codecType.String(), filepath.Base(audioPath))
if err != nil {
fmt.Printf("added audio track: %s\n", audioPath)
}
+3
View File
@@ -68,6 +68,9 @@ func (r *Receiver) Close() {
// PacketBuffer interface, to provide forwarders packets from the buffer
func (r *Receiver) GetBufferedPackets(mediaSSRC uint32, snOffset uint16, tsOffset uint32, sn []uint16) []rtp.Packet {
if r.bi == nil {
return nil
}
return r.bi.GetBufferedPackets(uint32(r.track.SSRC()), mediaSSRC, snOffset, tsOffset, sn)
}