diff --git a/go.mod b/go.mod index 75b6c9839..472c179ae 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,9 @@ module github.com/livekit/livekit-server +replace github.com/livekit/mediatransportutil => ../mediatransportutil + +replace github.com/livekit/protocol => ../protocol + go 1.22 require ( diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7cb7db17f..d58092f90 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -33,6 +33,7 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" + "github.com/livekit/mediatransportutil" "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" @@ -1499,7 +1500,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w } func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byte) { - if p.IsDisconnected() || !p.CanPublishData() { + if p.IsDisconnected() { return } @@ -1521,6 +1522,7 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt } shouldForward := false + isPublisher := true // only forward on user payloads switch payload := dp.Value.(type) { case *livekit.DataPacket_User: @@ -1551,10 +1553,32 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt if p.Kind() == livekit.ParticipantInfo_AGENT { shouldForward = true } + case *livekit.DataPacket_TimeSyncRequest: + isPublisher = false + resp := mediatransportutil.HandleTimeSyncRequest(*payload.TimeSyncRequest) + + rdp := &livekit.DataPacket{ + Value: &livekit.DataPacket_TimeSyncResponse{ + TimeSyncResponse: &resp, + }, + } + + rdpData, err := proto.Marshal(rdp) + if err != nil { + p.pubLogger.Warnw("failed marshalling time sync response", err) + return + } + + err = p.SendDataPacket(kind, rdpData) + if err != nil { + p.pubLogger.Infow("failed sending time sync response", "error", err) + return + } + default: p.pubLogger.Warnw("received unsupported data packet", nil, "payload", payload) } - if shouldForward { + if shouldForward && p.CanPublishData() { p.lock.RLock() onDataPacket := p.onDataPacket p.lock.RUnlock() @@ -1563,7 +1587,9 @@ func (p *ParticipantImpl) onDataMessage(kind livekit.DataPacket_Kind, data []byt } } - p.setIsPublisher(true) + if isPublisher && p.CanPublishData() { + p.setIsPublisher(true) + } } func (p *ParticipantImpl) onICECandidate(c *webrtc.ICECandidate, target livekit.SignalTarget) error {