diff --git a/go.mod b/go.mod index da2a3248c..cdaf24e90 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/pion/stun v0.3.5 github.com/pion/transport v0.12.3 github.com/pion/turn/v2 v2.0.5 - github.com/pion/webrtc/v3 v3.1.9-0.20211111210619-5f6baf732555 + github.com/pion/webrtc/v3 v3.1.9 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/rs/zerolog v1.26.0 diff --git a/go.sum b/go.sum index 971c243aa..f14e542ea 100644 --- a/go.sum +++ b/go.sum @@ -206,8 +206,8 @@ github.com/pion/turn/v2 v2.0.5 h1:iwMHqDfPEDEOFzwWKT56eFmh6DYC6o/+xnLAEzgISbA= github.com/pion/turn/v2 v2.0.5/go.mod h1:APg43CFyt/14Uy7heYUOGWdkem/Wu4PhCO/bjyrTqMw= github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o= github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M= -github.com/pion/webrtc/v3 v3.1.9-0.20211111210619-5f6baf732555 h1:8BgeM6iVasgZxId6/f6ZPQ7tFJOXrln4zN+of5PzDOI= -github.com/pion/webrtc/v3 v3.1.9-0.20211111210619-5f6baf732555/go.mod h1:2b1+xQu7GP0vaMGPLbmEX+uB+Fvn2F4sDBp90ZrPBdI= +github.com/pion/webrtc/v3 v3.1.9 h1:Nyimo16me180ZBT35nV2YcrZvjOs1i4ktUi/UmXVQGg= +github.com/pion/webrtc/v3 v3.1.9/go.mod h1:2b1+xQu7GP0vaMGPLbmEX+uB+Fvn2F4sDBp90ZrPBdI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/pkg/config/config.go b/pkg/config/config.go index ce0c0ebf8..fcb641028 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -154,7 +154,7 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) { EnabledCodecs: []CodecSpec{ {Mime: webrtc.MimeTypeOpus}, {Mime: webrtc.MimeTypeVP8}, - // {Mime: webrtc.MimeTypeH264}, + {Mime: webrtc.MimeTypeH264}, // {Mime: webrtc.MimeTypeVP9}, }, EmptyTimeout: 5 * 60, diff --git a/pkg/rtc/mediaengine.go b/pkg/rtc/mediaengine.go index fe3ed80bb..35168b586 100644 --- a/pkg/rtc/mediaengine.go +++ b/pkg/rtc/mediaengine.go @@ -12,15 +12,14 @@ const ( frameMarking = "urn:ietf:params:rtp-hdrext:framemarking" ) -func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) { - me := &webrtc.MediaEngine{} +func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec) error { opusCodec := webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "minptime=10;useinbandfec=1", RTCPFeedback: nil} if isCodecEnabled(codecs, opusCodec) { if err := me.RegisterCodec(webrtc.RTPCodecParameters{ RTPCodecCapability: opusCodec, PayloadType: 111, }, webrtc.RTPCodecTypeAudio); err != nil { - return nil, err + return err } } @@ -57,11 +56,18 @@ func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) } { if isCodecEnabled(codecs, codec.RTPCodecCapability) { if err := me.RegisterCodec(codec, webrtc.RTPCodecTypeVideo); err != nil { - return nil, err + return err } } } + return nil +} +func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) { + me := &webrtc.MediaEngine{} + if err := registerCodecs(me, codecs); err != nil { + return nil, err + } for _, extension := range []string{ sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, @@ -85,8 +91,11 @@ func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) return me, nil } -func createSubMediaEngine() (*webrtc.MediaEngine, error) { +func createSubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) { me := &webrtc.MediaEngine{} + if err := registerCodecs(me, codecs); err != nil { + return nil, err + } for _, extension := range []string{ sdp.ABSSendTimeURI, diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index d27df1e40..2762c819f 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -169,10 +169,6 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { } codec := t.receiver.Codec() - if err := sub.SubscriberMediaEngine().RegisterCodec(codec, t.receiver.Kind()); err != nil { - return err - } - // using DownTrack from ion-sfu streamId := t.params.ParticipantID if sub.ProtocolVersion().SupportsPackedStreamId() { @@ -403,7 +399,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.simulcasted.TrySet(true) } - buff.Bind(receiver.GetParameters(), buffer.Options{ + buff.Bind(receiver.GetParameters(), track.Codec().RTPCodecCapability, buffer.Options{ MaxBitRate: t.params.ReceiverConfig.maxBitrate, }) } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index f37a954f7..2ee957812 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -133,6 +133,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { Target: livekit.SignalTarget_SUBSCRIBER, Config: params.Config, Telemetry: p.params.Telemetry, + EnabledCodecs: p.params.EnabledCodecs, Logger: params.Logger, }) if err != nil { diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 3f0fa5eb9..7018d4b66 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -56,7 +56,7 @@ func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc. if params.Target == livekit.SignalTarget_PUBLISHER { me, err = createPubMediaEngine(params.EnabledCodecs) } else { - me, err = createSubMediaEngine() + me, err = createSubMediaEngine(params.EnabledCodecs) } if err != nil { return nil, nil, err diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index f2fa2ba2f..5f23b0b59 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -123,11 +123,10 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool, logger logr.Logger) *Buffer { return b } -func (b *Buffer) Bind(params webrtc.RTPParameters, o Options) { +func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, o Options) { b.Lock() defer b.Unlock() - codec := params.Codecs[0] b.clockRate = codec.ClockRate b.maxBitrate = o.MaxBitRate b.mime = strings.ToLower(codec.MimeType) diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index 5942b74a9..ea23a90a9 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -43,6 +43,25 @@ func CreateTestListPackets(snsAndTSs []SequenceNumberAndTimeStamp) (packetList [ return packetList } +var vp8Codec webrtc.RTPCodecParameters = webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: "video/vp8", + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{ + Type: "nack", + }}, + }, + PayloadType: 96, +} + +var opusCodec webrtc.RTPCodecParameters = webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: "audio/opus", + ClockRate: 48000, + }, + PayloadType: 96, +} + func TestNack(t *testing.T) { pool := &sync.Pool{ New: func() interface{} { @@ -75,19 +94,8 @@ func TestNack(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/vp8", - ClockRate: 90000, - RTCPFeedback: []webrtc.RTCPFeedback{{ - Type: "nack", - }}, - }, - PayloadType: 96, - }, - }, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{vp8Codec}, + }, vp8Codec.RTPCodecCapability, Options{}) for i := 0; i < 15; i++ { if i == 1 { continue @@ -142,19 +150,8 @@ func TestNack(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/vp8", - ClockRate: 90000, - RTCPFeedback: []webrtc.RTCPFeedback{{ - Type: "nack", - }}, - }, - PayloadType: 96, - }, - }, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{vp8Codec}, + }, vp8Codec.RTPCodecCapability, Options{}) for i := 0; i < 15; i++ { if i > 0 && i < 5 { continue @@ -231,15 +228,8 @@ func TestNewBuffer(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{{ - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/vp8", - ClockRate: 9600, - RTCPFeedback: nil, - }, - PayloadType: 0, - }}, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{vp8Codec}, + }, vp8Codec.RTPCodecCapability, Options{}) for _, p := range TestPackets { buf, _ := p.Marshal() @@ -278,16 +268,8 @@ func TestFractionLostReport(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "audio/opus", - ClockRate: 48000, - }, - PayloadType: 96, - }, - }, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{opusCodec}, + }, opusCodec.RTPCodecCapability, Options{}) for i := 0; i < 15; i++ { pkt := rtp.Packet{ Header: rtp.Header{SequenceNumber: uint16(i), Timestamp: uint32(i)}, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 128b04972..5f47277d7 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1,6 +1,7 @@ package sfu import ( + "encoding/binary" "errors" "fmt" "strings" @@ -49,10 +50,17 @@ var ( ErrNotVP8 = errors.New("not VP8") ErrOutOfOrderVP8PictureIdCacheMiss = errors.New("out-of-order VP8 picture id not found in cache") ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") + ErrNoRequiredBuff = errors.New("buff size if less than required") ) var ( VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00} + + H264KeyFrame2x2SPS = []byte{0x67, 0x42, 0xc0, 0x1f, 0x0f, 0xd9, 0x1f, 0x88, 0x88, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xc8, 0x3c, 0x60, 0xc9, 0x20} + H264KeyFrame2x2PPS = []byte{0x68, 0x87, 0xcb, 0x83, 0xcb, 0x20} + H264KeyFrame2x2IDR = []byte{0x65, 0x88, 0x84, 0x0a, 0xf2, 0x62, 0x80, 0x00, 0xa7, 0xbe} + + H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR} ) type simulcastTrackHelpers struct { @@ -1074,15 +1082,14 @@ func (d *DownTrack) writeBlankFrameRTP() error { } // LK-TODO: Support other video codecs - if d.Kind() == webrtc.RTPCodecTypeAudio || d.mime != "video/vp8" { + if d.Kind() == webrtc.RTPCodecTypeAudio || (d.mime != "video/vp8" && d.mime != "video/h264") { return nil } // send a number of blank frames just in case there is loss. // Intentionally ignoring check for mute or bandwidth constrained mute // as this is used to clear client side buffer. - i := 0 - for { + for i := 0; i < RTPBlankFramesMax; { frameEndNeeded := false if !d.munger.IsOnFrameBoundary() { frameEndNeeded = true @@ -1113,40 +1120,64 @@ func (d *DownTrack) writeBlankFrameRTP() error { return err } - blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) - if err != nil { - return err + switch d.mime { + case "video/vp8": + err = d.writeVP8BlankFrame(&hdr, frameEndNeeded) + case "video/h264": + err = d.writeH264BlankFrame(&hdr, frameEndNeeded) + default: + return nil } - // 1x1 key frame - // Used even when closing out a previous frame. Looks like receivers - // do not care about content (it will probably end up being an undecodable - // frame, but that should be okay as there are key frames following) - payload := make([]byte, blankVP8.HeaderSize+len(VP8KeyFrame1x1)) - vp8Header := payload[:blankVP8.HeaderSize] - err = blankVP8.MarshalTo(vp8Header) - if err != nil { - return err - } - - copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1) - - _, err = d.writeStream.WriteRTP(&hdr, payload) - if err != nil { - return err - } if !frameEndNeeded { i++ } - if i >= RTPBlankFramesMax { - break - } } return nil } +func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error { + blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) + if err != nil { + return err + } + + // 1x1 key frame + // Used even when closing out a previous frame. Looks like receivers + // do not care about content (it will probably end up being an undecodable + // frame, but that should be okay as there are key frames following) + payload := make([]byte, blankVP8.HeaderSize+len(VP8KeyFrame1x1)) + vp8Header := payload[:blankVP8.HeaderSize] + err = blankVP8.MarshalTo(vp8Header) + if err != nil { + return err + } + + copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1) + + _, err = d.writeStream.WriteRTP(hdr, payload) + return err +} + +func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error { + // TODO - Jie Zeng + // now use STAP-A to compose sps, pps, idr together, most decoder support packetization-mode 1. + // if client only support packetization-mode 0, use single nalu unit packet + buf := make([]byte, 1462) + offset := 0 + buf[0] = 0x18 // STAP-A + offset++ + for _, payload := range H264KeyFrame2x2 { + binary.BigEndian.PutUint16(buf[offset:], uint16(len(payload))) + offset += 2 + copy(buf[offset:offset+len(payload)], payload) + offset += len(payload) + } + _, err := d.writeStream.WriteRTP(hdr, buf[:offset]) + return err +} func (d *DownTrack) handleRTCP(bytes []byte) { // LK-TODO - should probably handle RTCP even if muted diff --git a/test/client/client.go b/test/client/client.go index a61471221..b04f1555f 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -131,6 +131,9 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { { Mime: "video/vp8", }, + { + Mime: "video/h264", + }, } c.publisher, err = rtc.NewPCTransport(rtc.TransportParams{ Target: livekit.SignalTarget_PUBLISHER, @@ -462,7 +465,11 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string) } func (c *RTCClient) AddStaticTrack(mime string, id string, label string) (writer *TrackWriter, err error) { - track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mime}, id, label) + return c.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{MimeType: mime}, id, label) +} + +func (c *RTCClient) AddStaticTrackWithCodec(codec webrtc.RTPCodecCapability, id string, label string) (writer *TrackWriter, err error) { + track, err := webrtc.NewTrackLocalStaticSample(codec, id, label) if err != nil { return } diff --git a/test/client/trackwriter.go b/test/client/trackwriter.go index 9694b7b49..df9865779 100644 --- a/test/client/trackwriter.go +++ b/test/client/trackwriter.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "strings" "time" "github.com/pion/webrtc/v3" @@ -85,10 +86,15 @@ func (w *TrackWriter) Stop() { func (w *TrackWriter) writeNull() { defer w.onWriteComplete() sample := media.Sample{Data: []byte{0x0, 0xff, 0xff, 0xff, 0xff}, Duration: 30 * time.Millisecond} + h264Sample := media.Sample{Data: []byte{0x5, 0xff, 0xff, 0xff, 0xff}, Duration: 30 * time.Millisecond} for { select { case <-time.After(20 * time.Millisecond): - w.track.WriteSample(sample) + if strings.EqualFold(w.mime, webrtc.MimeTypeH264) { + w.track.WriteSample(h264Sample) + } else { + w.track.WriteSample(sample) + } case <-w.ctx.Done(): break } diff --git a/test/singlenode_test.go b/test/singlenode_test.go index a50cbee84..3121b49c4 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/testutils" @@ -139,3 +140,79 @@ func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks( require.Empty(t, client.SubscribedTracks()[publisher.ID()]) } + +func Test_RenegotiationWithDifferentCodecs(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + _, finish := setupSingleNodeTest("TestRenegotiationWithDifferentCodecs", testRoom) + defer finish() + + c1 := createRTCClient("c1", defaultServerPort, nil) + c2 := createRTCClient("c2", defaultServerPort, nil) + waitUntilConnected(t, c1, c2) + + // publish a vp8 video track and ensure clients receive it ok + t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") + require.NoError(t, err) + defer t1.Stop() + t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t2.Stop() + + success := testutils.WithTimeout(t, "c2 should receive two tracks, video is vp8", func() bool { + if len(c2.SubscribedTracks()) == 0 { + return false + } + // should have received two tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 2 { + return false + } + + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if strings.EqualFold(t.Codec().MimeType, "video/vp8") { + return true + } + } + return false + }) + if !success { + t.FailNow() + } + + t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{ + MimeType: "video/h264", + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", + }, "videoscreen", "screen") + defer t3.Stop() + require.NoError(t, err) + + success = testutils.WithTimeout(t, "c2 should receive two video tracks, one vp8 one h264", func() bool { + if len(c2.SubscribedTracks()) == 0 { + return false + } + // should have received three tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 3 { + return false + } + + var vp8Found, h264Found bool + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if strings.EqualFold(t.Codec().MimeType, "video/vp8") { + vp8Found = true + } else if strings.EqualFold(t.Codec().MimeType, "video/h264") { + h264Found = true + } + } + return vp8Found && h264Found + }) + if !success { + t.FailNow() + } + +}