diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 781007a64..45b7de0e8 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1327,7 +1327,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) { p.sendTrackPublished(req.Cid, ti) - // RAJA-TODO: is this needed for single peer connection case + // SINGLE-PEER-CONNECTION-TODO: is this needed? p.handlePendingRemoteTracks() } @@ -2989,7 +2989,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver p.pendingTracksLock.Lock() newTrack := false - mid := p.TransportManager.GetPublisherMid(rtpReceiver) + mid := p.TransportManager.GetMid(rtpReceiver) p.pubLogger.Debugw( "media track received", "kind", track.Kind().String(), diff --git a/pkg/rtc/participant_sdp.go b/pkg/rtc/participant_sdp.go index f88402919..91f3acc37 100644 --- a/pkg/rtc/participant_sdp.go +++ b/pkg/rtc/participant_sdp.go @@ -141,6 +141,7 @@ func (p *ParticipantImpl) populateSdpCid(parsedOffer *sdp.SessionDescription) ([ p.pubLogger.Warnw("could not get unmatch audios", err) return nil, nil } + p.pubLogger.Infow("RAJA unmatch", "audios", unmatchAudios, "videos", unmatchVideos) // REMOVE processUnmatch(unmatchAudios, livekit.TrackType_AUDIO) processUnmatch(unmatchVideos, livekit.TrackType_VIDEO) @@ -348,12 +349,8 @@ func (p *ParticipantImpl) setCodecPreferencesForPublisherMedia( if trackType == livekit.TrackType_VIDEO { // if the client don't comply with codec order in SDP answer, only keep preferred codecs to force client to use it if p.params.ClientInfo.ComplyWithCodecOrderInSDPAnswer() { - p.pubLogger.Infow("complying with codec order") // REMOVE unmatch.MediaName.Formats = append(unmatch.MediaName.Formats, leftCodecs...) - } else { - p.pubLogger.Infow("not complying with codec order") // REMOVE } - p.pubLogger.Infow("formats", "formats", unmatch.MediaName.Formats) // REMOVE } else { // ensure nack enabled for audio in publisher offer var nackFound bool diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index ff55b4c93..570dca384 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -245,9 +245,10 @@ type PCTransport struct { remoteOfferId atomic.Uint32 localAnswerId atomic.Uint32 - // RAJA-TODO: check if this can be gotten from pc + /* RAJA-REMOVE lastLocalDescription atomic.Value lastRemoteDescription atomic.Value + */ eventsQueue *utils.TypedOpsQueue[event] @@ -980,61 +981,6 @@ func (t *PCTransport) AddRemoteTrackAndNegotiate(ti *livekit.TrackInfo) error { return nil } - /* RAJA-REMOVE - if codecIdx >= len(ti.Codecs) { - t.params.Logger.Warnw( - "invalid codec index", ErrInvalidCodecIndex, - "trackID", ti.Sid, - "track", logger.Proto(ti), - ) - return ErrInvalidCodecIndex - } - - sci := ti.Codecs[codecIdx] - var mimeType mime.MimeType - if sci.MimeType == "" { - if ti.Type == livekit.TrackType_AUDIO { - mimeType = mime.MimeTypeOpus - } else { - mimeType = mime.MimeTypeVP8 - } - } else { - mimeType = mime.MimeTypeFromString(sci.MimeType) - } - track, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: mimeType.String(), - }, - sci.Cid, - sci.Cid, - ) - if err != nil { - t.params.Logger.Warnw( - "could not create local track", err, - "trackID", ti.Sid, - "track", logger.Proto(ti), - "sci", logger.Proto(sci), - ) - return err - } - - _, err = t.pc.AddTransceiverFromTrack( - track, - webrtc.RTPTransceiverInit{ - Direction: webrtc.RTPTransceiverDirectionSendrecv, - }, - ) - if err != nil { - t.params.Logger.Warnw( - "could not add transceiver from track", err, - "trackID", ti.Sid, - "track", logger.Proto(ti), - "sci", logger.Proto(sci), - ) - return err - } - */ - rtpCodecType := webrtc.RTPCodecTypeVideo if ti.Type == livekit.TrackType_AUDIO { rtpCodecType = webrtc.RTPCodecTypeAudio @@ -1063,6 +1009,10 @@ func (t *PCTransport) RemoveTrack(sender *webrtc.RTPSender) error { return t.pc.RemoveTrack(sender) } +func (t *PCTransport) CurrentLocalDescription() *webrtc.SessionDescription { + return t.pc.CurrentLocalDescription() +} + func (t *PCTransport) CurrentRemoteDescription() *webrtc.SessionDescription { return t.pc.CurrentRemoteDescription() } @@ -2376,7 +2326,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { ) } - t.lastLocalDescription.Store(offer) + // RAJA-REMOVE t.lastLocalDescription.Store(offer) if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc()); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) return errors.Wrap(err, "could not send offer") @@ -2397,7 +2347,7 @@ type remoteDescriptionData struct { func (t *PCTransport) handleRemoteDescriptionReceived(e event) error { rdd := e.data.(remoteDescriptionData) - t.lastRemoteDescription.Store(*rdd.sessionDescription) + // RAJA-REMOVE t.lastRemoteDescription.Store(*rdd.sessionDescription) if rdd.sessionDescription.Type == webrtc.SDPTypeOffer { return t.handleRemoteOfferReceived(rdd.sessionDescription, rdd.remoteId) } else { @@ -2504,7 +2454,7 @@ func (t *PCTransport) createAndSendAnswer() error { ) } - t.lastLocalDescription.Store(answer) + // RAJA-REMOVE t.lastLocalDescription.Store(answer) answerId := t.remoteOfferId.Load() if err := t.params.Handler.OnAnswer(answer, answerId); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1) @@ -2670,7 +2620,7 @@ func (t *PCTransport) doICERestart() error { ) } - t.lastLocalDescription.Store(*offer) + // RAJA-REMOVE t.lastLocalDescription.Store(*offer) err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc()) if err != nil { prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 92f6085c0..fce86c0bd 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -71,18 +71,6 @@ func (h TransportManagerTransportHandler) OnFailed(isShortLived bool, iceConnect // ------------------------------- -/* RAJA-REMOVE -type TransportManagerPublisherTransportHandler struct { - TransportManagerTransportHandler -} - -func (h TransportManagerPublisherTransportHandler) OnAnswer(sd webrtc.SessionDescription, answerId uint32) error { - return h.Handler.OnAnswer(sd, answerId) -} -*/ - -// ------------------------------- - type TransportManagerParams struct { SubscriberAsPrimary bool SinglePeerConnection bool @@ -153,17 +141,16 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro if t.params.UseOneShotSignallingMode || !t.params.SinglePeerConnection { lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER) publisher, err := NewPCTransport(TransportParams{ - ProtocolVersion: params.ProtocolVersion, - Config: params.Config, - Twcc: params.Twcc, - DirectionConfig: params.Config.Publisher, - CongestionControlConfig: params.CongestionControlConfig, - EnabledCodecs: params.EnabledPublishCodecs, - Logger: lgr, - SimTracks: params.SimTracks, - ClientInfo: params.ClientInfo, - Transport: livekit.SignalTarget_PUBLISHER, - // RAJA-REMOVE Handler: TransportManagerPublisherTransportHandler{TransportManagerTransportHandler{params.PublisherHandler, t, lgr}}, + ProtocolVersion: params.ProtocolVersion, + Config: params.Config, + Twcc: params.Twcc, + DirectionConfig: params.Config.Publisher, + CongestionControlConfig: params.CongestionControlConfig, + EnabledCodecs: params.EnabledPublishCodecs, + Logger: lgr, + SimTracks: params.SimTracks, + ClientInfo: params.ClientInfo, + Transport: livekit.SignalTarget_PUBLISHER, Handler: params.PublisherHandler, UseOneShotSignallingMode: params.UseOneShotSignallingMode, DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount, @@ -202,9 +189,11 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro } else { t.subscriber = t.publisher } + /* RAJA-REMOVE if t.params.SinglePeerConnection { t.publisher = t.subscriber } + */ if !t.params.Migration { if err := t.createDataChannelsForSubscriber(nil); err != nil { return nil, err @@ -216,7 +205,9 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro } func (t *TransportManager) Close() { - t.publisher.Close() + if t.publisher != nil { + t.publisher.Close() + } t.subscriber.Close() } @@ -290,6 +281,14 @@ func (t *TransportManager) RemoveTrackLocal(sender *webrtc.RTPSender) error { } } +func (t *TransportManager) GetMid(rtpReceiver *webrtc.RTPReceiver) string { + if t.params.SinglePeerConnection { + return t.subscriber.GetMid(rtpReceiver) + } else { + return t.publisher.GetMid(rtpReceiver) + } +} + func (t *TransportManager) WriteSubscriberRTCP(pkts []rtcp.Packet) error { if t.params.UseOneShotSignallingMode { return t.publisher.WriteRTCP(pkts) @@ -411,10 +410,15 @@ func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels [ } func (t *TransportManager) GetUnmatchMediaForOffer(parsedOffer *sdp.SessionDescription, mediaType string) (unmatched []*sdp.MediaDescription, err error) { - // prefer codec from offer for clients that don't support setCodecPreferences var lastMatchedMid string - lastAnswer := t.publisher.CurrentRemoteDescription() + var lastAnswer *webrtc.SessionDescription + if t.params.SinglePeerConnection { + lastAnswer = t.subscriber.CurrentLocalDescription() + } else { + lastAnswer = t.publisher.CurrentLocalDescription() + } if lastAnswer != nil { + t.params.Logger.Infow("RAJA lastAnswer", "lastAnswer", lastAnswer) // REMOVE parsedAnswer, err1 := lastAnswer.Unmarshal() if err1 != nil { // should not happen @@ -445,11 +449,14 @@ func (t *TransportManager) GetUnmatchMediaForOffer(parsedOffer *sdp.SessionDescr return } -func (t *TransportManager) LastPublisherOffer() webrtc.SessionDescription { +func (t *TransportManager) LastPublisherOffer() *webrtc.SessionDescription { + /* RAJA-REMOVE if sd := t.lastPublisherOffer.Load(); sd != nil { return sd.(webrtc.SessionDescription) } return webrtc.SessionDescription{} + */ + return t.publisher.CurrentRemoteDescription() } func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, offerId uint32, shouldPend bool) error { @@ -617,6 +624,10 @@ func (t *TransportManager) SubscriberAsPrimary() bool { func (t *TransportManager) GetICEConnectionInfo() []*types.ICEConnectionInfo { infos := make([]*types.ICEConnectionInfo, 0, 2) for _, pc := range []*PCTransport{t.publisher, t.subscriber} { + if pc == nil { + continue + } + info := pc.GetICEConnectionInfo() if info.HasCandidates() { infos = append(infos, info) @@ -848,7 +859,9 @@ func (t *TransportManager) UpdateSignalingRTT(rtt uint32) { t.lock.Lock() t.signalingRTT = rtt t.lock.Unlock() - t.publisher.SetSignalingRTT(rtt) + if t.publisher != nil { + t.publisher.SetSignalingRTT(rtt) + } t.subscriber.SetSignalingRTT(rtt) // TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established diff --git a/test/client/client.go b/test/client/client.go index b4c6ccdc6..d4f05e964 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -747,30 +747,6 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string, c.lock.Lock() defer c.lock.Unlock() - /* RAJA-TODO - var sender *webrtc.RTPSender - if types.ProtocolVersion(types.CurrentProtocol).SupportsSinglePeerConnection() { - sender, _, err = c.subscriber.AddTrack(track, types.AddTrackParams{}) - } else { - sender, _, err = c.publisher.AddTrack(track, types.AddTrackParams{}) - } - if err != nil { - logger.Errorw( - "add track failed", err, - "participant", c.localParticipant.Identity, - "pID", c.localParticipant.Sid, - "trackID", ti.Sid, - ) - return - } - logger.Infow( - "RAJA add track sender", - "participant", c.localParticipant.Identity, - "pID", c.localParticipant.Sid, - "trackID", ti.Sid, - "sender", sender.GetParameters(), - ) // REMOVE - */ c.localTracks[ti.Sid] = track c.trackSenders[ti.Sid] = sender if !c.protocolVersion.SupportsSinglePeerConnection() { diff --git a/test/client/trackwriter.go b/test/client/trackwriter.go index 07adbdd86..803bf3348 100644 --- a/test/client/trackwriter.go +++ b/test/client/trackwriter.go @@ -146,7 +146,6 @@ func (w *TrackWriter) writeOgg() { return } - logger.Infow("RAJA wrote sample") // REMOVE time.Sleep(sampleDuration) } }