From b7f32dfffd98a2ea85c268868a25942a127877d8 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 17 Nov 2021 21:18:43 +0800 Subject: [PATCH] Handle multiple codecs in renegotiation (#191) * Handle multiple codecs in renegotiation update pion to v3.1.9 for answer same order of codec as publisher. register enable codecs in subscriber peerconnectin created. add codec parameter to buffer.bind buffer should use the codec of TrackRemote as it's codec mime. sent h264blankframe when DownTrack closing --- go.mod | 2 +- go.sum | 4 +- pkg/config/config.go | 2 +- pkg/rtc/mediaengine.go | 19 +++++--- pkg/rtc/mediatrack.go | 6 +-- pkg/rtc/participant.go | 1 + pkg/rtc/transport.go | 2 +- pkg/sfu/buffer/buffer.go | 3 +- pkg/sfu/buffer/buffer_test.go | 72 ++++++++++++------------------ pkg/sfu/downtrack.go | 83 ++++++++++++++++++++++++----------- test/client/client.go | 9 +++- test/client/trackwriter.go | 8 +++- test/singlenode_test.go | 77 ++++++++++++++++++++++++++++++++ 13 files changed, 198 insertions(+), 90 deletions(-) diff --git a/go.mod b/go.mod index da2a3248c..cdaf24e90 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/pion/stun v0.3.5 github.com/pion/transport v0.12.3 github.com/pion/turn/v2 v2.0.5 - github.com/pion/webrtc/v3 v3.1.9-0.20211111210619-5f6baf732555 + github.com/pion/webrtc/v3 v3.1.9 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/rs/zerolog v1.26.0 diff --git a/go.sum b/go.sum index 971c243aa..f14e542ea 100644 --- a/go.sum +++ b/go.sum @@ -206,8 +206,8 @@ github.com/pion/turn/v2 v2.0.5 h1:iwMHqDfPEDEOFzwWKT56eFmh6DYC6o/+xnLAEzgISbA= github.com/pion/turn/v2 v2.0.5/go.mod h1:APg43CFyt/14Uy7heYUOGWdkem/Wu4PhCO/bjyrTqMw= github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o= github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M= -github.com/pion/webrtc/v3 v3.1.9-0.20211111210619-5f6baf732555 h1:8BgeM6iVasgZxId6/f6ZPQ7tFJOXrln4zN+of5PzDOI= -github.com/pion/webrtc/v3 v3.1.9-0.20211111210619-5f6baf732555/go.mod h1:2b1+xQu7GP0vaMGPLbmEX+uB+Fvn2F4sDBp90ZrPBdI= +github.com/pion/webrtc/v3 v3.1.9 h1:Nyimo16me180ZBT35nV2YcrZvjOs1i4ktUi/UmXVQGg= +github.com/pion/webrtc/v3 v3.1.9/go.mod h1:2b1+xQu7GP0vaMGPLbmEX+uB+Fvn2F4sDBp90ZrPBdI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/pkg/config/config.go b/pkg/config/config.go index ce0c0ebf8..fcb641028 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -154,7 +154,7 @@ func NewConfig(confString string, c *cli.Context) (*Config, error) { EnabledCodecs: []CodecSpec{ {Mime: webrtc.MimeTypeOpus}, {Mime: webrtc.MimeTypeVP8}, - // {Mime: webrtc.MimeTypeH264}, + {Mime: webrtc.MimeTypeH264}, // {Mime: webrtc.MimeTypeVP9}, }, EmptyTimeout: 5 * 60, diff --git a/pkg/rtc/mediaengine.go b/pkg/rtc/mediaengine.go index fe3ed80bb..35168b586 100644 --- a/pkg/rtc/mediaengine.go +++ b/pkg/rtc/mediaengine.go @@ -12,15 +12,14 @@ const ( frameMarking = "urn:ietf:params:rtp-hdrext:framemarking" ) -func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) { - me := &webrtc.MediaEngine{} +func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec) error { opusCodec := webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "minptime=10;useinbandfec=1", RTCPFeedback: nil} if isCodecEnabled(codecs, opusCodec) { if err := me.RegisterCodec(webrtc.RTPCodecParameters{ RTPCodecCapability: opusCodec, PayloadType: 111, }, webrtc.RTPCodecTypeAudio); err != nil { - return nil, err + return err } } @@ -57,11 +56,18 @@ func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) } { if isCodecEnabled(codecs, codec.RTPCodecCapability) { if err := me.RegisterCodec(codec, webrtc.RTPCodecTypeVideo); err != nil { - return nil, err + return err } } } + return nil +} +func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) { + me := &webrtc.MediaEngine{} + if err := registerCodecs(me, codecs); err != nil { + return nil, err + } for _, extension := range []string{ sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, @@ -85,8 +91,11 @@ func createPubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) return me, nil } -func createSubMediaEngine() (*webrtc.MediaEngine, error) { +func createSubMediaEngine(codecs []*livekit.Codec) (*webrtc.MediaEngine, error) { me := &webrtc.MediaEngine{} + if err := registerCodecs(me, codecs); err != nil { + return nil, err + } for _, extension := range []string{ sdp.ABSSendTimeURI, diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index d27df1e40..2762c819f 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -169,10 +169,6 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { } codec := t.receiver.Codec() - if err := sub.SubscriberMediaEngine().RegisterCodec(codec, t.receiver.Kind()); err != nil { - return err - } - // using DownTrack from ion-sfu streamId := t.params.ParticipantID if sub.ProtocolVersion().SupportsPackedStreamId() { @@ -403,7 +399,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.simulcasted.TrySet(true) } - buff.Bind(receiver.GetParameters(), buffer.Options{ + buff.Bind(receiver.GetParameters(), track.Codec().RTPCodecCapability, buffer.Options{ MaxBitRate: t.params.ReceiverConfig.maxBitrate, }) } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index f37a954f7..2ee957812 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -133,6 +133,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { Target: livekit.SignalTarget_SUBSCRIBER, Config: params.Config, Telemetry: p.params.Telemetry, + EnabledCodecs: p.params.EnabledCodecs, Logger: params.Logger, }) if err != nil { diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 3f0fa5eb9..7018d4b66 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -56,7 +56,7 @@ func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc. if params.Target == livekit.SignalTarget_PUBLISHER { me, err = createPubMediaEngine(params.EnabledCodecs) } else { - me, err = createSubMediaEngine() + me, err = createSubMediaEngine(params.EnabledCodecs) } if err != nil { return nil, nil, err diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index f2fa2ba2f..5f23b0b59 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -123,11 +123,10 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool, logger logr.Logger) *Buffer { return b } -func (b *Buffer) Bind(params webrtc.RTPParameters, o Options) { +func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapability, o Options) { b.Lock() defer b.Unlock() - codec := params.Codecs[0] b.clockRate = codec.ClockRate b.maxBitrate = o.MaxBitRate b.mime = strings.ToLower(codec.MimeType) diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index 5942b74a9..ea23a90a9 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -43,6 +43,25 @@ func CreateTestListPackets(snsAndTSs []SequenceNumberAndTimeStamp) (packetList [ return packetList } +var vp8Codec webrtc.RTPCodecParameters = webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: "video/vp8", + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{ + Type: "nack", + }}, + }, + PayloadType: 96, +} + +var opusCodec webrtc.RTPCodecParameters = webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: "audio/opus", + ClockRate: 48000, + }, + PayloadType: 96, +} + func TestNack(t *testing.T) { pool := &sync.Pool{ New: func() interface{} { @@ -75,19 +94,8 @@ func TestNack(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/vp8", - ClockRate: 90000, - RTCPFeedback: []webrtc.RTCPFeedback{{ - Type: "nack", - }}, - }, - PayloadType: 96, - }, - }, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{vp8Codec}, + }, vp8Codec.RTPCodecCapability, Options{}) for i := 0; i < 15; i++ { if i == 1 { continue @@ -142,19 +150,8 @@ func TestNack(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/vp8", - ClockRate: 90000, - RTCPFeedback: []webrtc.RTCPFeedback{{ - Type: "nack", - }}, - }, - PayloadType: 96, - }, - }, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{vp8Codec}, + }, vp8Codec.RTPCodecCapability, Options{}) for i := 0; i < 15; i++ { if i > 0 && i < 5 { continue @@ -231,15 +228,8 @@ func TestNewBuffer(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{{ - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/vp8", - ClockRate: 9600, - RTCPFeedback: nil, - }, - PayloadType: 0, - }}, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{vp8Codec}, + }, vp8Codec.RTPCodecCapability, Options{}) for _, p := range TestPackets { buf, _ := p.Marshal() @@ -278,16 +268,8 @@ func TestFractionLostReport(t *testing.T) { }) buff.Bind(webrtc.RTPParameters{ HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "audio/opus", - ClockRate: 48000, - }, - PayloadType: 96, - }, - }, - }, Options{}) + Codecs: []webrtc.RTPCodecParameters{opusCodec}, + }, opusCodec.RTPCodecCapability, Options{}) for i := 0; i < 15; i++ { pkt := rtp.Packet{ Header: rtp.Header{SequenceNumber: uint16(i), Timestamp: uint32(i)}, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 128b04972..5f47277d7 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1,6 +1,7 @@ package sfu import ( + "encoding/binary" "errors" "fmt" "strings" @@ -49,10 +50,17 @@ var ( ErrNotVP8 = errors.New("not VP8") ErrOutOfOrderVP8PictureIdCacheMiss = errors.New("out-of-order VP8 picture id not found in cache") ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") + ErrNoRequiredBuff = errors.New("buff size if less than required") ) var ( VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00} + + H264KeyFrame2x2SPS = []byte{0x67, 0x42, 0xc0, 0x1f, 0x0f, 0xd9, 0x1f, 0x88, 0x88, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xc8, 0x3c, 0x60, 0xc9, 0x20} + H264KeyFrame2x2PPS = []byte{0x68, 0x87, 0xcb, 0x83, 0xcb, 0x20} + H264KeyFrame2x2IDR = []byte{0x65, 0x88, 0x84, 0x0a, 0xf2, 0x62, 0x80, 0x00, 0xa7, 0xbe} + + H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR} ) type simulcastTrackHelpers struct { @@ -1074,15 +1082,14 @@ func (d *DownTrack) writeBlankFrameRTP() error { } // LK-TODO: Support other video codecs - if d.Kind() == webrtc.RTPCodecTypeAudio || d.mime != "video/vp8" { + if d.Kind() == webrtc.RTPCodecTypeAudio || (d.mime != "video/vp8" && d.mime != "video/h264") { return nil } // send a number of blank frames just in case there is loss. // Intentionally ignoring check for mute or bandwidth constrained mute // as this is used to clear client side buffer. - i := 0 - for { + for i := 0; i < RTPBlankFramesMax; { frameEndNeeded := false if !d.munger.IsOnFrameBoundary() { frameEndNeeded = true @@ -1113,40 +1120,64 @@ func (d *DownTrack) writeBlankFrameRTP() error { return err } - blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) - if err != nil { - return err + switch d.mime { + case "video/vp8": + err = d.writeVP8BlankFrame(&hdr, frameEndNeeded) + case "video/h264": + err = d.writeH264BlankFrame(&hdr, frameEndNeeded) + default: + return nil } - // 1x1 key frame - // Used even when closing out a previous frame. Looks like receivers - // do not care about content (it will probably end up being an undecodable - // frame, but that should be okay as there are key frames following) - payload := make([]byte, blankVP8.HeaderSize+len(VP8KeyFrame1x1)) - vp8Header := payload[:blankVP8.HeaderSize] - err = blankVP8.MarshalTo(vp8Header) - if err != nil { - return err - } - - copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1) - - _, err = d.writeStream.WriteRTP(&hdr, payload) - if err != nil { - return err - } if !frameEndNeeded { i++ } - if i >= RTPBlankFramesMax { - break - } } return nil } +func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error { + blankVP8, err := d.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) + if err != nil { + return err + } + + // 1x1 key frame + // Used even when closing out a previous frame. Looks like receivers + // do not care about content (it will probably end up being an undecodable + // frame, but that should be okay as there are key frames following) + payload := make([]byte, blankVP8.HeaderSize+len(VP8KeyFrame1x1)) + vp8Header := payload[:blankVP8.HeaderSize] + err = blankVP8.MarshalTo(vp8Header) + if err != nil { + return err + } + + copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1) + + _, err = d.writeStream.WriteRTP(hdr, payload) + return err +} + +func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error { + // TODO - Jie Zeng + // now use STAP-A to compose sps, pps, idr together, most decoder support packetization-mode 1. + // if client only support packetization-mode 0, use single nalu unit packet + buf := make([]byte, 1462) + offset := 0 + buf[0] = 0x18 // STAP-A + offset++ + for _, payload := range H264KeyFrame2x2 { + binary.BigEndian.PutUint16(buf[offset:], uint16(len(payload))) + offset += 2 + copy(buf[offset:offset+len(payload)], payload) + offset += len(payload) + } + _, err := d.writeStream.WriteRTP(hdr, buf[:offset]) + return err +} func (d *DownTrack) handleRTCP(bytes []byte) { // LK-TODO - should probably handle RTCP even if muted diff --git a/test/client/client.go b/test/client/client.go index a61471221..b04f1555f 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -131,6 +131,9 @@ func NewRTCClient(conn *websocket.Conn) (*RTCClient, error) { { Mime: "video/vp8", }, + { + Mime: "video/h264", + }, } c.publisher, err = rtc.NewPCTransport(rtc.TransportParams{ Target: livekit.SignalTarget_PUBLISHER, @@ -462,7 +465,11 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string) } func (c *RTCClient) AddStaticTrack(mime string, id string, label string) (writer *TrackWriter, err error) { - track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: mime}, id, label) + return c.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{MimeType: mime}, id, label) +} + +func (c *RTCClient) AddStaticTrackWithCodec(codec webrtc.RTPCodecCapability, id string, label string) (writer *TrackWriter, err error) { + track, err := webrtc.NewTrackLocalStaticSample(codec, id, label) if err != nil { return } diff --git a/test/client/trackwriter.go b/test/client/trackwriter.go index 9694b7b49..df9865779 100644 --- a/test/client/trackwriter.go +++ b/test/client/trackwriter.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "strings" "time" "github.com/pion/webrtc/v3" @@ -85,10 +86,15 @@ func (w *TrackWriter) Stop() { func (w *TrackWriter) writeNull() { defer w.onWriteComplete() sample := media.Sample{Data: []byte{0x0, 0xff, 0xff, 0xff, 0xff}, Duration: 30 * time.Millisecond} + h264Sample := media.Sample{Data: []byte{0x5, 0xff, 0xff, 0xff, 0xff}, Duration: 30 * time.Millisecond} for { select { case <-time.After(20 * time.Millisecond): - w.track.WriteSample(sample) + if strings.EqualFold(w.mime, webrtc.MimeTypeH264) { + w.track.WriteSample(h264Sample) + } else { + w.track.WriteSample(sample) + } case <-w.ctx.Done(): break } diff --git a/test/singlenode_test.go b/test/singlenode_test.go index a50cbee84..3121b49c4 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/livekit/livekit-server/pkg/rtc" + "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/testutils" @@ -139,3 +140,79 @@ func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks( require.Empty(t, client.SubscribedTracks()[publisher.ID()]) } + +func Test_RenegotiationWithDifferentCodecs(t *testing.T) { + if testing.Short() { + t.SkipNow() + return + } + + _, finish := setupSingleNodeTest("TestRenegotiationWithDifferentCodecs", testRoom) + defer finish() + + c1 := createRTCClient("c1", defaultServerPort, nil) + c2 := createRTCClient("c2", defaultServerPort, nil) + waitUntilConnected(t, c1, c2) + + // publish a vp8 video track and ensure clients receive it ok + t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam") + require.NoError(t, err) + defer t1.Stop() + t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam") + require.NoError(t, err) + defer t2.Stop() + + success := testutils.WithTimeout(t, "c2 should receive two tracks, video is vp8", func() bool { + if len(c2.SubscribedTracks()) == 0 { + return false + } + // should have received two tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 2 { + return false + } + + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if strings.EqualFold(t.Codec().MimeType, "video/vp8") { + return true + } + } + return false + }) + if !success { + t.FailNow() + } + + t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{ + MimeType: "video/h264", + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", + }, "videoscreen", "screen") + defer t3.Stop() + require.NoError(t, err) + + success = testutils.WithTimeout(t, "c2 should receive two video tracks, one vp8 one h264", func() bool { + if len(c2.SubscribedTracks()) == 0 { + return false + } + // should have received three tracks + if len(c2.SubscribedTracks()[c1.ID()]) != 3 { + return false + } + + var vp8Found, h264Found bool + tracks := c2.SubscribedTracks()[c1.ID()] + for _, t := range tracks { + if strings.EqualFold(t.Codec().MimeType, "video/vp8") { + vp8Found = true + } else if strings.EqualFold(t.Codec().MimeType, "video/h264") { + h264Found = true + } + } + return vp8Found && h264Found + }) + if !success { + t.FailNow() + } + +}