This commit is contained in:
boks1971
2025-08-15 11:34:00 +05:30
parent cdc5628d1f
commit bd621290c2
6 changed files with 54 additions and 119 deletions

View File

@@ -1327,7 +1327,7 @@ func (p *ParticipantImpl) AddTrack(req *livekit.AddTrackRequest) {
p.sendTrackPublished(req.Cid, ti)
// RAJA-TODO: is this needed for single peer connection case
// SINGLE-PEER-CONNECTION-TODO: is this needed?
p.handlePendingRemoteTracks()
}
@@ -2989,7 +2989,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track sfu.TrackRemote, rtpReceiver
p.pendingTracksLock.Lock()
newTrack := false
mid := p.TransportManager.GetPublisherMid(rtpReceiver)
mid := p.TransportManager.GetMid(rtpReceiver)
p.pubLogger.Debugw(
"media track received",
"kind", track.Kind().String(),

View File

@@ -141,6 +141,7 @@ func (p *ParticipantImpl) populateSdpCid(parsedOffer *sdp.SessionDescription) ([
p.pubLogger.Warnw("could not get unmatch audios", err)
return nil, nil
}
p.pubLogger.Infow("RAJA unmatch", "audios", unmatchAudios, "videos", unmatchVideos) // REMOVE
processUnmatch(unmatchAudios, livekit.TrackType_AUDIO)
processUnmatch(unmatchVideos, livekit.TrackType_VIDEO)
@@ -348,12 +349,8 @@ func (p *ParticipantImpl) setCodecPreferencesForPublisherMedia(
if trackType == livekit.TrackType_VIDEO {
// if the client don't comply with codec order in SDP answer, only keep preferred codecs to force client to use it
if p.params.ClientInfo.ComplyWithCodecOrderInSDPAnswer() {
p.pubLogger.Infow("complying with codec order") // REMOVE
unmatch.MediaName.Formats = append(unmatch.MediaName.Formats, leftCodecs...)
} else {
p.pubLogger.Infow("not complying with codec order") // REMOVE
}
p.pubLogger.Infow("formats", "formats", unmatch.MediaName.Formats) // REMOVE
} else {
// ensure nack enabled for audio in publisher offer
var nackFound bool

View File

@@ -245,9 +245,10 @@ type PCTransport struct {
remoteOfferId atomic.Uint32
localAnswerId atomic.Uint32
// RAJA-TODO: check if this can be gotten from pc
/* RAJA-REMOVE
lastLocalDescription atomic.Value
lastRemoteDescription atomic.Value
*/
eventsQueue *utils.TypedOpsQueue[event]
@@ -980,61 +981,6 @@ func (t *PCTransport) AddRemoteTrackAndNegotiate(ti *livekit.TrackInfo) error {
return nil
}
/* RAJA-REMOVE
if codecIdx >= len(ti.Codecs) {
t.params.Logger.Warnw(
"invalid codec index", ErrInvalidCodecIndex,
"trackID", ti.Sid,
"track", logger.Proto(ti),
)
return ErrInvalidCodecIndex
}
sci := ti.Codecs[codecIdx]
var mimeType mime.MimeType
if sci.MimeType == "" {
if ti.Type == livekit.TrackType_AUDIO {
mimeType = mime.MimeTypeOpus
} else {
mimeType = mime.MimeTypeVP8
}
} else {
mimeType = mime.MimeTypeFromString(sci.MimeType)
}
track, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: mimeType.String(),
},
sci.Cid,
sci.Cid,
)
if err != nil {
t.params.Logger.Warnw(
"could not create local track", err,
"trackID", ti.Sid,
"track", logger.Proto(ti),
"sci", logger.Proto(sci),
)
return err
}
_, err = t.pc.AddTransceiverFromTrack(
track,
webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendrecv,
},
)
if err != nil {
t.params.Logger.Warnw(
"could not add transceiver from track", err,
"trackID", ti.Sid,
"track", logger.Proto(ti),
"sci", logger.Proto(sci),
)
return err
}
*/
rtpCodecType := webrtc.RTPCodecTypeVideo
if ti.Type == livekit.TrackType_AUDIO {
rtpCodecType = webrtc.RTPCodecTypeAudio
@@ -1063,6 +1009,10 @@ func (t *PCTransport) RemoveTrack(sender *webrtc.RTPSender) error {
return t.pc.RemoveTrack(sender)
}
func (t *PCTransport) CurrentLocalDescription() *webrtc.SessionDescription {
return t.pc.CurrentLocalDescription()
}
func (t *PCTransport) CurrentRemoteDescription() *webrtc.SessionDescription {
return t.pc.CurrentRemoteDescription()
}
@@ -2376,7 +2326,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
)
}
t.lastLocalDescription.Store(offer)
// RAJA-REMOVE t.lastLocalDescription.Store(offer)
if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc()); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
return errors.Wrap(err, "could not send offer")
@@ -2397,7 +2347,7 @@ type remoteDescriptionData struct {
func (t *PCTransport) handleRemoteDescriptionReceived(e event) error {
rdd := e.data.(remoteDescriptionData)
t.lastRemoteDescription.Store(*rdd.sessionDescription)
// RAJA-REMOVE t.lastRemoteDescription.Store(*rdd.sessionDescription)
if rdd.sessionDescription.Type == webrtc.SDPTypeOffer {
return t.handleRemoteOfferReceived(rdd.sessionDescription, rdd.remoteId)
} else {
@@ -2504,7 +2454,7 @@ func (t *PCTransport) createAndSendAnswer() error {
)
}
t.lastLocalDescription.Store(answer)
// RAJA-REMOVE t.lastLocalDescription.Store(answer)
answerId := t.remoteOfferId.Load()
if err := t.params.Handler.OnAnswer(answer, answerId); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1)
@@ -2670,7 +2620,7 @@ func (t *PCTransport) doICERestart() error {
)
}
t.lastLocalDescription.Store(*offer)
// RAJA-REMOVE t.lastLocalDescription.Store(*offer)
err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc())
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)

View File

@@ -71,18 +71,6 @@ func (h TransportManagerTransportHandler) OnFailed(isShortLived bool, iceConnect
// -------------------------------
/* RAJA-REMOVE
type TransportManagerPublisherTransportHandler struct {
TransportManagerTransportHandler
}
func (h TransportManagerPublisherTransportHandler) OnAnswer(sd webrtc.SessionDescription, answerId uint32) error {
return h.Handler.OnAnswer(sd, answerId)
}
*/
// -------------------------------
type TransportManagerParams struct {
SubscriberAsPrimary bool
SinglePeerConnection bool
@@ -153,17 +141,16 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
if t.params.UseOneShotSignallingMode || !t.params.SinglePeerConnection {
lgr := LoggerWithPCTarget(params.Logger, livekit.SignalTarget_PUBLISHER)
publisher, err := NewPCTransport(TransportParams{
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
Twcc: params.Twcc,
DirectionConfig: params.Config.Publisher,
CongestionControlConfig: params.CongestionControlConfig,
EnabledCodecs: params.EnabledPublishCodecs,
Logger: lgr,
SimTracks: params.SimTracks,
ClientInfo: params.ClientInfo,
Transport: livekit.SignalTarget_PUBLISHER,
// RAJA-REMOVE Handler: TransportManagerPublisherTransportHandler{TransportManagerTransportHandler{params.PublisherHandler, t, lgr}},
ProtocolVersion: params.ProtocolVersion,
Config: params.Config,
Twcc: params.Twcc,
DirectionConfig: params.Config.Publisher,
CongestionControlConfig: params.CongestionControlConfig,
EnabledCodecs: params.EnabledPublishCodecs,
Logger: lgr,
SimTracks: params.SimTracks,
ClientInfo: params.ClientInfo,
Transport: livekit.SignalTarget_PUBLISHER,
Handler: params.PublisherHandler,
UseOneShotSignallingMode: params.UseOneShotSignallingMode,
DataChannelMaxBufferedAmount: params.DataChannelMaxBufferedAmount,
@@ -202,9 +189,11 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
} else {
t.subscriber = t.publisher
}
/* RAJA-REMOVE
if t.params.SinglePeerConnection {
t.publisher = t.subscriber
}
*/
if !t.params.Migration {
if err := t.createDataChannelsForSubscriber(nil); err != nil {
return nil, err
@@ -216,7 +205,9 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro
}
func (t *TransportManager) Close() {
t.publisher.Close()
if t.publisher != nil {
t.publisher.Close()
}
t.subscriber.Close()
}
@@ -290,6 +281,14 @@ func (t *TransportManager) RemoveTrackLocal(sender *webrtc.RTPSender) error {
}
}
func (t *TransportManager) GetMid(rtpReceiver *webrtc.RTPReceiver) string {
if t.params.SinglePeerConnection {
return t.subscriber.GetMid(rtpReceiver)
} else {
return t.publisher.GetMid(rtpReceiver)
}
}
func (t *TransportManager) WriteSubscriberRTCP(pkts []rtcp.Packet) error {
if t.params.UseOneShotSignallingMode {
return t.publisher.WriteRTCP(pkts)
@@ -411,10 +410,15 @@ func (t *TransportManager) createDataChannelsForSubscriber(pendingDataChannels [
}
func (t *TransportManager) GetUnmatchMediaForOffer(parsedOffer *sdp.SessionDescription, mediaType string) (unmatched []*sdp.MediaDescription, err error) {
// prefer codec from offer for clients that don't support setCodecPreferences
var lastMatchedMid string
lastAnswer := t.publisher.CurrentRemoteDescription()
var lastAnswer *webrtc.SessionDescription
if t.params.SinglePeerConnection {
lastAnswer = t.subscriber.CurrentLocalDescription()
} else {
lastAnswer = t.publisher.CurrentLocalDescription()
}
if lastAnswer != nil {
t.params.Logger.Infow("RAJA lastAnswer", "lastAnswer", lastAnswer) // REMOVE
parsedAnswer, err1 := lastAnswer.Unmarshal()
if err1 != nil {
// should not happen
@@ -445,11 +449,14 @@ func (t *TransportManager) GetUnmatchMediaForOffer(parsedOffer *sdp.SessionDescr
return
}
func (t *TransportManager) LastPublisherOffer() webrtc.SessionDescription {
func (t *TransportManager) LastPublisherOffer() *webrtc.SessionDescription {
/* RAJA-REMOVE
if sd := t.lastPublisherOffer.Load(); sd != nil {
return sd.(webrtc.SessionDescription)
}
return webrtc.SessionDescription{}
*/
return t.publisher.CurrentRemoteDescription()
}
func (t *TransportManager) HandleOffer(offer webrtc.SessionDescription, offerId uint32, shouldPend bool) error {
@@ -617,6 +624,10 @@ func (t *TransportManager) SubscriberAsPrimary() bool {
func (t *TransportManager) GetICEConnectionInfo() []*types.ICEConnectionInfo {
infos := make([]*types.ICEConnectionInfo, 0, 2)
for _, pc := range []*PCTransport{t.publisher, t.subscriber} {
if pc == nil {
continue
}
info := pc.GetICEConnectionInfo()
if info.HasCandidates() {
infos = append(infos, info)
@@ -848,7 +859,9 @@ func (t *TransportManager) UpdateSignalingRTT(rtt uint32) {
t.lock.Lock()
t.signalingRTT = rtt
t.lock.Unlock()
t.publisher.SetSignalingRTT(rtt)
if t.publisher != nil {
t.publisher.SetSignalingRTT(rtt)
}
t.subscriber.SetSignalingRTT(rtt)
// TODO: considering using tcp rtt to calculate ice connection cost, if ice connection can't be established

View File

@@ -747,30 +747,6 @@ func (c *RTCClient) AddTrack(track *webrtc.TrackLocalStaticSample, path string,
c.lock.Lock()
defer c.lock.Unlock()
/* RAJA-TODO
var sender *webrtc.RTPSender
if types.ProtocolVersion(types.CurrentProtocol).SupportsSinglePeerConnection() {
sender, _, err = c.subscriber.AddTrack(track, types.AddTrackParams{})
} else {
sender, _, err = c.publisher.AddTrack(track, types.AddTrackParams{})
}
if err != nil {
logger.Errorw(
"add track failed", err,
"participant", c.localParticipant.Identity,
"pID", c.localParticipant.Sid,
"trackID", ti.Sid,
)
return
}
logger.Infow(
"RAJA add track sender",
"participant", c.localParticipant.Identity,
"pID", c.localParticipant.Sid,
"trackID", ti.Sid,
"sender", sender.GetParameters(),
) // REMOVE
*/
c.localTracks[ti.Sid] = track
c.trackSenders[ti.Sid] = sender
if !c.protocolVersion.SupportsSinglePeerConnection() {

View File

@@ -146,7 +146,6 @@ func (w *TrackWriter) writeOgg() {
return
}
logger.Infow("RAJA wrote sample") // REMOVE
time.Sleep(sampleDuration)
}
}