diff --git a/cmd/cli/client/client.go b/cmd/cli/client/client.go index 10a6ea0c4..1a37fed81 100644 --- a/cmd/cli/client/client.go +++ b/cmd/cli/client/client.go @@ -10,7 +10,6 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/pion/randutil" "github.com/pion/webrtc/v3" "github.com/pkg/errors" "google.golang.org/protobuf/encoding/protojson" @@ -24,14 +23,14 @@ import ( type RTCClient struct { conn *websocket.Conn PeerConn *webrtc.PeerConnection - localTracks []*webrtc.Track + localTracks []webrtc.TrackLocal lock sync.Mutex ctx context.Context cancel context.CancelFunc connected bool iceConnected bool paused bool - me *rtc.MediaEngine // optional, populated only when receiving tracks + me *webrtc.MediaEngine // optional, populated only when receiving tracks receivers []*rtc.Receiver localParticipant *livekit.ParticipantInfo @@ -53,16 +52,11 @@ var ( }, }, } - maxLogs = 256 - extFormatMapping = map[string]string{ - ".ivf": webrtc.VP8, - ".h264": webrtc.H264, - ".ogg": webrtc.Opus, - } - payloadTypes = map[string]int{ - webrtc.VP8: webrtc.DefaultPayloadTypeVP8, - webrtc.H264: webrtc.DefaultPayloadTypeH264, - webrtc.Opus: webrtc.DefaultPayloadTypeOpus, + maxLogs = 256 + extMimeMapping = map[string]string{ + ".ivf": webrtc.MimeTypeVP8, + ".h264": webrtc.MimeTypeH264, + ".ogg": webrtc.MimeTypeOpus, } ) @@ -78,7 +72,7 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { conn: conn, lock: sync.Mutex{}, pendingCandidates: make([]*webrtc.ICECandidate, 0), - localTracks: make([]*webrtc.Track, 0), + localTracks: make([]webrtc.TrackLocal, 0), reader: logRing, writer: logRing, PeerConn: peerConn, @@ -104,14 +98,10 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { } }) - peerConn.OnTrack(func(track *webrtc.Track, rtpReceiver *webrtc.RTPReceiver) { - c.AppendLog("track received", "label", track.Label(), "id", track.ID()) + peerConn.OnTrack(func(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { + c.AppendLog("track received", "label", track.StreamID(), "id", track.ID()) peerId, _ := rtc.UnpackTrackId(track.ID()) - tccExt := 0 - if c.me != nil { - tccExt = c.me.TCCExt - } - r := rtc.NewReceiver(c.ctx, peerId, rtpReceiver, rtc.ReceiverConfig{}, tccExt) + r := rtc.NewReceiver(c.ctx, peerId, rtpReceiver, nil) c.lock.Lock() c.receivers = append(c.receivers, r) r.Start() @@ -356,8 +346,8 @@ func (c *RTCClient) handleNegotiate(desc webrtc.SessionDescription) error { // if we received an offer, we'd have to answer if desc.Type == webrtc.SDPTypeOffer { // create media engine - c.me = &rtc.MediaEngine{} - if err := c.me.PopulateFromSDP(desc); err != nil { + c.me = &webrtc.MediaEngine{} + if err := c.me.RegisterDefaultCodecs(); err != nil { return errors.Wrapf(err, "could not parse SDP") } @@ -380,18 +370,22 @@ func (c *RTCClient) handleNegotiate(desc webrtc.SessionDescription) error { return nil } -func (c *RTCClient) AddTrack(path string, codecType webrtc.RTPCodecType, id string, label string) error { - // determine file type - format, ok := extFormatMapping[filepath.Ext(path)] +func (c *RTCClient) AddTrack(path string, id string, label string) error { + // determine file mime + mime, ok := extMimeMapping[filepath.Ext(path)] if !ok { return fmt.Errorf("%s has an unsupported extension", filepath.Base(path)) } - payloadType := uint8(payloadTypes[format]) logger.GetLogger().Infow("adding track", - "format", format, + "mime", mime, + ) + + track, err := webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: mime}, + id, + label, ) - track, err := c.PeerConn.NewTrack(payloadType, randutil.NewMathRandomGenerator().Uint32(), id, label) if err != nil { return err } @@ -404,7 +398,7 @@ func (c *RTCClient) AddTrack(path string, codecType webrtc.RTPCodecType, id stri return err } - tw := NewTrackWriter(c.ctx, track, path, format) + tw := NewTrackWriter(c.ctx, track, path) // write tracks only after ICE connectivity if c.iceConnected { @@ -457,22 +451,18 @@ func (c *RTCClient) consumeReceiver(r *rtc.Receiver) { peerId, trackId := rtc.UnpackTrackId(r.TrackId()) numBytes := 0 for { - select { - case packet, ok := <-r.RTPChan(): - if !ok { - // channel closed, we are done - return - } - numBytes += packet.MarshalSize() - if time.Now().Sub(lastUpdate) > 30*time.Second { - c.AppendLog("consumed from peer", - "track", trackId, "peer", peerId, - "size", numBytes) - lastUpdate = time.Now() - } - case <-c.ctx.Done(): + pkt, err := r.ReadRTP() + if err == io.EOF || err == io.ErrClosedPipe { + // all done return } + numBytes += pkt.MarshalSize() + if time.Now().Sub(lastUpdate) > 30*time.Second { + c.AppendLog("consumed from peer", + "track", trackId, "peer", peerId, + "size", numBytes) + lastUpdate = time.Now() + } if c.ctx.Err() != nil { return diff --git a/cmd/cli/client/trackwriter.go b/cmd/cli/client/trackwriter.go index ee0573c31..39d1a0b69 100644 --- a/cmd/cli/client/trackwriter.go +++ b/cmd/cli/client/trackwriter.go @@ -19,9 +19,9 @@ import ( // makes it easier to debug and create RTP streams type TrackWriter struct { ctx context.Context - track *webrtc.Track + track *webrtc.TrackLocalStaticSample filePath string - format string + mime string ogg *oggreader.OggReader ivfheader *ivfreader.IVFFileHeader @@ -29,12 +29,12 @@ type TrackWriter struct { h264 *h264reader.H264Reader } -func NewTrackWriter(ctx context.Context, track *webrtc.Track, filePath string, format string) *TrackWriter { +func NewTrackWriter(ctx context.Context, track *webrtc.TrackLocalStaticSample, filePath string) *TrackWriter { return &TrackWriter{ ctx: ctx, track: track, filePath: filePath, - format: format, + mime: track.Codec().MimeType, } } @@ -46,21 +46,21 @@ func (w *TrackWriter) Start() error { logger.GetLogger().Infow("starting track writer", "track", w.track.ID(), - "format", w.format) - switch w.format { - case webrtc.Opus: + "mime", w.mime) + switch w.mime { + case webrtc.MimeTypeOpus: w.ogg, _, err = oggreader.NewWith(file) if err != nil { return err } go w.writeOgg() - case webrtc.VP8: + case webrtc.MimeTypeVP8: w.ivf, w.ivfheader, err = ivfreader.NewWith(file) if err != nil { return err } go w.writeVP8() - case webrtc.H264: + case webrtc.MimeTypeH264: w.h264, err = h264reader.NewReader(file) if err != nil { return err @@ -92,14 +92,15 @@ func (w *TrackWriter) writeOgg() { // The amount of samples is the difference between the last and current timestamp sampleCount := float64(pageHeader.GranulePosition - lastGranule) lastGranule = pageHeader.GranulePosition + sampleDuration := time.Duration((sampleCount/48000)*1000) * time.Millisecond - if err = w.track.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); err != nil { + if err = w.track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil { logger.GetLogger().Errorw("could not write sample", "err", err) return } // Convert seconds to Milliseconds, Sleep doesn't accept floats - time.Sleep(time.Duration((sampleCount/48000)*1000) * time.Millisecond) + time.Sleep(sampleDuration) } } @@ -124,7 +125,7 @@ func (w *TrackWriter) writeVP8() { } time.Sleep(sleepTime) - if err = w.track.WriteSample(media.Sample{Data: frame, Samples: 90000}); err != nil { + if err = w.track.WriteSample(media.Sample{Data: frame, Duration: time.Second}); err != nil { logger.GetLogger().Errorw("could not write sample", "err", err) return } diff --git a/cmd/cli/commands/rtc.go b/cmd/cli/commands/rtc.go index 99bc28158..e4a7da84b 100644 --- a/cmd/cli/commands/rtc.go +++ b/cmd/cli/commands/rtc.go @@ -81,10 +81,10 @@ func joinRoom(c *cli.Context) error { audioFile := c.String("audio") videoFile := c.String("video") if audioFile != "" { - rc.AddTrack(audioFile, webrtc.RTPCodecTypeAudio, "audio", filepath.Base(audioFile)) + rc.AddTrack(audioFile, "audio", filepath.Base(audioFile)) } if videoFile != "" { - rc.AddTrack(videoFile, webrtc.RTPCodecTypeVideo, "video", filepath.Base(videoFile)) + rc.AddTrack(videoFile, "video", filepath.Base(videoFile)) } // start loop to detect input @@ -171,7 +171,7 @@ func handleAddMedia(rc *client.RTCClient, isAudio bool) error { mediaPath = ExpandUser(mediaPath) // TODO: see what the ID should be - err = rc.AddTrack(mediaPath, codecType, codecType.String(), filepath.Base(mediaPath)) + err = rc.AddTrack(mediaPath, codecType.String(), filepath.Base(mediaPath)) if err != nil { return err } @@ -189,7 +189,7 @@ func handleAddMedia(rc *client.RTCClient, isAudio bool) error { audioPath := mediaPath[0:len(mediaPath)-len(videoExt)] + ".ogg" if _, err = os.Stat(audioPath); err == nil { - err = rc.AddTrack(audioPath, webrtc.RTPCodecTypeAudio, codecType.String(), filepath.Base(audioPath)) + err = rc.AddTrack(audioPath, codecType.String(), filepath.Base(audioPath)) if err != nil { fmt.Printf("added audio track: %s\n", audioPath) } diff --git a/pkg/rtc/receiver.go b/pkg/rtc/receiver.go index efe3fa491..d699accdc 100644 --- a/pkg/rtc/receiver.go +++ b/pkg/rtc/receiver.go @@ -68,6 +68,9 @@ func (r *Receiver) Close() { // PacketBuffer interface, to provide forwarders packets from the buffer func (r *Receiver) GetBufferedPackets(mediaSSRC uint32, snOffset uint16, tsOffset uint32, sn []uint16) []rtp.Packet { + if r.bi == nil { + return nil + } return r.bi.GetBufferedPackets(uint32(r.track.SSRC()), mediaSSRC, snOffset, tsOffset, sn) }