diff --git a/config-sample.yaml b/config-sample.yaml index e4fd969c2..eba55b100 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -16,6 +16,20 @@ # for production setups, this port should be placed behind a load balancer with TLS port: 7880 +# Experimental receive-only Media over QUIC/WebTransport downstream. Supports +# the LiveKit H264 probe protocol and moq-lite-04 for @moq/net receive testing. +# WebTransport uses HTTP/3 over UDP and requires TLS. In production, expose this +# UDP port directly or through an HTTP/3-capable load balancer. +# moq: +# enabled: false +# port: 7883 +# path: /moq/v1 +# cert_file: /path/to/cert.pem +# key_file: /path/to/key.pem +# track_queue_size: 256 +# cache_max_bytes: 2097152 +# write_timeout: 2s + # when redis is set, LiveKit will automatically operate in a fully distributed fashion # clients could connect to any node and be routed to the same room redis: diff --git a/go.mod b/go.mod index 1f3f4ead3..877ca061f 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,8 @@ require ( github.com/pion/webrtc/v4 v4.2.11 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.2 + github.com/quic-go/quic-go v0.60.0 + github.com/quic-go/webtransport-go v0.11.0 github.com/redis/go-redis/v9 v9.20.0 github.com/rs/cors v1.11.1 github.com/stretchr/testify v1.11.1 @@ -68,6 +70,7 @@ require ( github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/distribution/reference v0.6.0 // indirect + github.com/dunglas/httpsfv v1.1.0 // indirect github.com/fatih/color v1.19.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -82,6 +85,7 @@ require ( github.com/olekukonko/errors v1.3.0 // indirect github.com/olekukonko/ll v0.1.8 // indirect github.com/puzpuzpuz/xsync/v4 v4.5.0 // indirect + github.com/quic-go/qpack v0.6.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.69.0 // indirect go.opentelemetry.io/otel v1.44.0 // indirect diff --git a/go.sum b/go.sum index f5753898a..c4f4d0b48 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/docker/go-connections v0.7.0 h1:6SsRfJddP22WMrCkj19x9WKjEDTB+ahsdiGYf github.com/docker/go-connections v0.7.0/go.mod h1:no1qkHdjq7kLMGUXYAduOhYPSJxxvgWBh7ogVvptn3Q= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dunglas/httpsfv v1.1.0 h1:Jw76nAyKWKZKFrpMMcL76y35tOpYHqQPzHQiwDvpe54= +github.com/dunglas/httpsfv v1.1.0/go.mod h1:zID2mqw9mFsnt7YC3vYQ9/cjq30q41W+1AnDwH8TiMg= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elliotchance/orderedmap/v3 v3.1.0 h1:j4DJ5ObEmMBt/lcwIecKcoRxIQUEnw0L804lXYDt/pg= @@ -280,6 +282,12 @@ github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEy github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo= github.com/puzpuzpuz/xsync/v4 v4.5.0 h1:vOSWu6b57/emh+L/Cw0BeQfvxa/cogFywXHeGUxQxAg= github.com/puzpuzpuz/xsync/v4 v4.5.0/go.mod h1:VJDmTCJMBt8igNxnkQd86r+8KUeN1quSfNKu5bLYFQo= +github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8= +github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII= +github.com/quic-go/quic-go v0.60.0 h1:xcQioE8OM66UQLeUMHltK1CCcOu3JbVB4JAQdDQSB+0= +github.com/quic-go/quic-go v0.60.0/go.mod h1:wpKpjmPpftl30sL6pFh7REVpjbcCVy4zt2vDyK1TuJk= +github.com/quic-go/webtransport-go v0.11.0 h1:3afiZq7MHv3gmKCbMwZ8D5M1u0y/1RdONN9KlWp32J0= +github.com/quic-go/webtransport-go v0.11.0/go.mod h1:SHgEzUFVyj+9WUSuGB1P6Zd351Pww2leWV3SwlTovkA= github.com/redis/go-redis/v9 v9.20.0 h1:WnQYxLkgO2xiXTCJY0ldIiI8dNqCDlQAG+AtaH7a2a0= github.com/redis/go-redis/v9 v9.20.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= github.com/rodaine/protogofakeit v0.1.1 h1:ZKouljuRM3A+TArppfBqnH8tGZHOwM/pjvtXe9DaXH8= diff --git a/pkg/config/config.go b/pkg/config/config.go index 692c0af38..a49740da9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -92,6 +92,7 @@ type Config struct { EnableDataTracks bool `yaml:"enable_data_tracks,omitempty"` API APIConfig `yaml:"api,omitempty"` + MoQ MoQConfig `yaml:"moq,omitempty"` } type RTCConfig struct { @@ -170,6 +171,55 @@ type TURNServer struct { TTL int `yaml:"ttl,omitempty"` } +type MoQConfig struct { + Enabled bool `yaml:"enabled,omitempty"` + // UDP port for the HTTP/3 WebTransport listener. This is intentionally + // separate from the existing TCP HTTP port. + Port uint32 `yaml:"port,omitempty"` + // If unset, bind_addresses from the top-level config are used. If those are + // unset, the listener binds all interfaces. + BindAddresses []string `yaml:"bind_addresses,omitempty"` + Path string `yaml:"path,omitempty"` + + // WebTransport requires TLS. In production, configure cert_file and key_file. + // In development mode only, LiveKit can generate an ephemeral self-signed + // certificate when these are unset. + CertFile string `yaml:"cert_file,omitempty"` + KeyFile string `yaml:"key_file,omitempty"` + + // Number of access units queued per subscriber before non-keyframes are + // dropped to keep latency bounded. + TrackQueueSize int `yaml:"track_queue_size,omitempty"` + // Max bytes retained for the cached keyframe access unit per track/layer. + CacheMaxBytes int `yaml:"cache_max_bytes,omitempty"` + WriteTimeout time.Duration `yaml:"write_timeout,omitempty"` +} + +func (c MoQConfig) WithDefaults(development bool, bindAddresses []string) MoQConfig { + if c.Path == "" { + c.Path = "/moq/v1" + } + if c.TrackQueueSize <= 0 { + c.TrackQueueSize = 256 + } + if c.CacheMaxBytes <= 0 { + c.CacheMaxBytes = 2 * 1024 * 1024 + } + if c.WriteTimeout <= 0 { + c.WriteTimeout = 2 * time.Second + } + if len(c.BindAddresses) == 0 { + c.BindAddresses = bindAddresses + } + if len(c.BindAddresses) == 0 { + c.BindAddresses = []string{""} + } + if development && c.Port == 0 { + c.Port = 7883 + } + return c +} + type CongestionControlConfig struct { Enabled bool `yaml:"enabled,omitempty"` AllowPause bool `yaml:"allow_pause,omitempty"` @@ -478,6 +528,12 @@ var DefaultConfig = Config{ NodeStats: DefaultNodeStatsConfig, API: DefaultAPIConfig(), EnableDataTracks: true, + MoQ: MoQConfig{ + Path: "/moq/v1", + TrackQueueSize: 256, + CacheMaxBytes: 2 * 1024 * 1024, + WriteTimeout: 2 * time.Second, + }, } func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []cli.Flag) (*Config, error) { @@ -517,6 +573,20 @@ func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []c return nil, err } conf.KeyFile = file + if conf.MoQ.CertFile != "" { + file, err = homedir.Expand(os.ExpandEnv(conf.MoQ.CertFile)) + if err != nil { + return nil, err + } + conf.MoQ.CertFile = file + } + if conf.MoQ.KeyFile != "" { + file, err = homedir.Expand(os.ExpandEnv(conf.MoQ.KeyFile)) + if err != nil { + return nil, err + } + conf.MoQ.KeyFile = file + } // set defaults for Turn relay if none are set if conf.TURN.RelayPortRangeStart == 0 || conf.TURN.RelayPortRangeEnd == 0 { @@ -533,6 +603,7 @@ func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []c if conf.LogLevel != "" { conf.Logging.Level = conf.LogLevel } + conf.MoQ = conf.MoQ.WithDefaults(conf.Development, conf.BindAddresses) if conf.Logging.Level == "" && conf.Development { conf.Logging.Level = "debug" } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 86ddcd05e..4c9d632ce 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -136,9 +136,10 @@ type Room struct { trailer []byte - onParticipantChanged func(p types.Participant) - onRoomUpdated func() - onClose func() + onParticipantChanged func(p types.Participant) + onRoomUpdated func() + onClose func() + onTrackPublishedCallbacks []func(types.Participant, types.MediaTrack) simulationLock sync.Mutex disconnectSignalOnResumeParticipants map[livekit.ParticipantIdentity]time.Time @@ -837,6 +838,16 @@ func (r *Room) OnParticipantChanged(f func(participant types.Participant)) { r.onParticipantChanged = f } +func (r *Room) AddOnTrackPublished(f func(types.Participant, types.MediaTrack)) { + if f == nil { + return + } + + r.lock.Lock() + r.onTrackPublishedCallbacks = append(r.onTrackPublishedCallbacks, f) + r.lock.Unlock() +} + func (r *Room) SendDataPacket(dp *livekit.DataPacket, kind livekit.DataPacket_Kind) { r.onDataMessage(nil, kind, dp) } @@ -1073,6 +1084,13 @@ func (r *Room) createJoinResponseLocked( func (r *Room) onTrackPublished(participant types.Participant, track types.MediaTrack) { r.trackManager.AddTrack(track, participant.Identity(), participant.ID()) + r.lock.RLock() + onTrackPublished := slices.Clone(r.onTrackPublishedCallbacks) + r.lock.RUnlock() + for _, f := range onTrackPublished { + f(participant, track) + } + // publish participant update, since track state is changed r.broadcastParticipantState(participant, broadcastOptions{skipSource: true}) diff --git a/pkg/service/moq_h264.go b/pkg/service/moq_h264.go new file mode 100644 index 000000000..41b2b0c9b --- /dev/null +++ b/pkg/service/moq_h264.go @@ -0,0 +1,147 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "bytes" + + "github.com/pion/rtp/codecs" +) + +var annexBStartCode = []byte{0x00, 0x00, 0x00, 0x01} + +type h264AccessUnit struct { + Payload []byte + HasIDR bool + HasSPS bool + HasPPS bool + RTPTime uint32 + CompletedByMarker bool +} + +type h264AccessUnitAssembler struct { + depacketizer codecs.H264Packet + current []byte + currentTime uint32 + started bool + sps []byte + pps []byte +} + +func newH264AccessUnitAssembler() *h264AccessUnitAssembler { + return &h264AccessUnitAssembler{} +} + +func (a *h264AccessUnitAssembler) Push(payload []byte, rtpTime uint32, marker bool) (*h264AccessUnit, error) { + if a.started && a.currentTime != rtpTime && len(a.current) != 0 { + a.current = nil + a.started = false + } + + chunk, err := a.depacketizer.Unmarshal(payload) + if err != nil { + return nil, err + } + if len(chunk) != 0 { + if !a.started { + a.currentTime = rtpTime + a.started = true + } + a.current = append(a.current, chunk...) + } + + if !marker || len(a.current) == 0 { + return nil, nil + } + + auPayload := a.current + a.current = nil + a.started = false + + au := &h264AccessUnit{ + Payload: auPayload, + RTPTime: rtpTime, + CompletedByMarker: true, + } + for _, nalu := range splitAnnexBNALUs(auPayload) { + if len(nalu) == 0 { + continue + } + switch nalu[0] & 0x1f { + case 5: + au.HasIDR = true + case 7: + au.HasSPS = true + a.sps = cloneBytes(nalu) + case 8: + au.HasPPS = true + a.pps = cloneBytes(nalu) + } + } + + if au.HasIDR && (!au.HasSPS || !au.HasPPS) { + au.Payload = a.prependParameterSets(au.Payload, au.HasSPS, au.HasPPS) + au.HasSPS = au.HasSPS || len(a.sps) != 0 + au.HasPPS = au.HasPPS || len(a.pps) != 0 + } + + return au, nil +} + +func (a *h264AccessUnitAssembler) prependParameterSets(payload []byte, hasSPS bool, hasPPS bool) []byte { + if (hasSPS || len(a.sps) == 0) && (hasPPS || len(a.pps) == 0) { + return payload + } + + prepended := make([]byte, 0, len(payload)+len(a.sps)+len(a.pps)+2*len(annexBStartCode)) + if !hasSPS && len(a.sps) != 0 { + prepended = append(prepended, annexBStartCode...) + prepended = append(prepended, a.sps...) + } + if !hasPPS && len(a.pps) != 0 { + prepended = append(prepended, annexBStartCode...) + prepended = append(prepended, a.pps...) + } + prepended = append(prepended, payload...) + return prepended +} + +func splitAnnexBNALUs(payload []byte) [][]byte { + var nalus [][]byte + for { + start := bytes.Index(payload, annexBStartCode) + if start == -1 { + break + } + payload = payload[start+len(annexBStartCode):] + next := bytes.Index(payload, annexBStartCode) + if next == -1 { + nalus = append(nalus, payload) + break + } + nalus = append(nalus, payload[:next]) + payload = payload[next:] + } + return nalus +} + +func cloneBytes(in []byte) []byte { + if len(in) == 0 { + return nil + } + out := make([]byte, len(in)) + copy(out, in) + return out +} diff --git a/pkg/service/moq_h264_test.go b/pkg/service/moq_h264_test.go new file mode 100644 index 000000000..8f11a32f1 --- /dev/null +++ b/pkg/service/moq_h264_test.go @@ -0,0 +1,90 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestH264AccessUnitAssemblerCachesParameterSets(t *testing.T) { + a := newH264AccessUnitAssembler() + + au, err := a.Push([]byte{0x67, 0x42, 0x00, 0x1f}, 10, false) + require.NoError(t, err) + require.Nil(t, au) + au, err = a.Push([]byte{0x68, 0xce, 0x06, 0xe2}, 10, false) + require.NoError(t, err) + require.Nil(t, au) + au, err = a.Push([]byte{0x65, 0x88, 0x84}, 10, true) + require.NoError(t, err) + require.NotNil(t, au) + require.True(t, au.HasIDR) + require.True(t, au.HasSPS) + require.True(t, au.HasPPS) + require.Equal(t, appendAnnexB( + []byte{0x67, 0x42, 0x00, 0x1f}, + []byte{0x68, 0xce, 0x06, 0xe2}, + []byte{0x65, 0x88, 0x84}, + ), au.Payload) +} + +func TestH264AccessUnitAssemblerPrependsCachedParameterSetsToIDR(t *testing.T) { + a := newH264AccessUnitAssembler() + _, err := a.Push([]byte{0x67, 0x42, 0x00, 0x1f}, 10, false) + require.NoError(t, err) + _, err = a.Push([]byte{0x68, 0xce, 0x06, 0xe2}, 10, false) + require.NoError(t, err) + _, err = a.Push([]byte{0x65, 0x88, 0x84}, 10, true) + require.NoError(t, err) + + au, err := a.Push([]byte{0x65, 0xaa, 0xbb}, 20, true) + require.NoError(t, err) + require.NotNil(t, au) + require.True(t, au.HasIDR) + require.True(t, au.HasSPS) + require.True(t, au.HasPPS) + require.Equal(t, appendAnnexB( + []byte{0x67, 0x42, 0x00, 0x1f}, + []byte{0x68, 0xce, 0x06, 0xe2}, + []byte{0x65, 0xaa, 0xbb}, + ), au.Payload) +} + +func TestH264AccessUnitAssemblerReconstructsFUA(t *testing.T) { + a := newH264AccessUnitAssembler() + + au, err := a.Push([]byte{0x7c, 0x85, 0x01, 0x02}, 10, false) + require.NoError(t, err) + require.Nil(t, au) + au, err = a.Push([]byte{0x7c, 0x05, 0x03, 0x04}, 10, false) + require.NoError(t, err) + require.Nil(t, au) + au, err = a.Push([]byte{0x7c, 0x45, 0x05, 0x06}, 10, true) + require.NoError(t, err) + require.NotNil(t, au) + require.True(t, au.HasIDR) + require.Equal(t, appendAnnexB([]byte{0x65, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06}), au.Payload) +} + +func appendAnnexB(nalus ...[]byte) []byte { + var out []byte + for _, nalu := range nalus { + out = append(out, annexBStartCode...) + out = append(out, nalu...) + } + return out +} diff --git a/pkg/service/moq_lite.go b/pkg/service/moq_lite.go new file mode 100644 index 000000000..2ff35c4db --- /dev/null +++ b/pkg/service/moq_lite.go @@ -0,0 +1,482 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" + "github.com/quic-go/quic-go/quicvarint" + webtransport "github.com/quic-go/webtransport-go" +) + +const ( + moqLiteProtocol = "moq-lite-04" + + moqLiteStreamGroup byte = 0 + moqLiteStreamSubscribe uint64 = 2 + moqLiteStreamProbe uint64 = 4 + + moqLiteSubscribeOK uint64 = 0 + moqLiteSubscribeDrop uint64 = 1 + + moqLiteMaxMessageSize = 1024 * 1024 +) + +type moqLiteSubscribe struct { + ID uint64 + Broadcast string + Track string + Priority uint8 + Ordered bool + MaxLatency uint64 + StartGroup uint64 + EndGroup uint64 + HasStart bool + HasEnd bool +} + +type moqLiteResolvedTrack struct { + moqResolvedTrack + Timing bool +} + +type moqLiteTimingSample struct { + Sequence uint64 `json:"sequence"` + TrackID string `json:"trackId"` + TrackName string `json:"trackName"` + Layer int32 `json:"layer"` + RTPTime uint32 `json:"rtpTime"` + KeyFrame bool `json:"keyFrame"` + Cached bool `json:"cached"` + SentAtUnixNs int64 `json:"sentAtUnixNs"` + UserTimestampUs *int64 `json:"userTimestampUs,omitempty"` + FrameID *uint32 `json:"frameId,omitempty"` +} + +func (s *MoQService) serveMoQLiteSession( + sess *webtransport.Session, + roomName livekit.RoomName, + claims *auth.ClaimGrants, + layer int32, +) { + ctx := sess.Context() + for { + stream, err := sess.AcceptStream(ctx) + if err != nil { + if ctx.Err() == nil { + s.logger.Debugw("could not accept moq-lite stream", "error", err) + } + return + } + + go func() { + defer func() { + _ = stream.Close() + }() + if err := s.serveMoQLiteStream(ctx, sess, stream, roomName, claims, layer); err != nil { + s.logger.Debugw("moq-lite stream failed", "error", err) + } + }() + } +} + +func (s *MoQService) serveMoQLiteStream( + ctx context.Context, + sess *webtransport.Session, + stream *webtransport.Stream, + roomName livekit.RoomName, + claims *auth.ClaimGrants, + layer int32, +) error { + reader := quicvarint.NewReader(stream) + streamType, err := quicvarint.Read(reader) + if err != nil { + return err + } + + switch streamType { + case moqLiteStreamSubscribe: + return s.serveMoQLiteSubscribe(ctx, sess, stream, reader, roomName, claims, layer) + case moqLiteStreamProbe: + // Probe is optional telemetry in @moq/net. Closing cleanly tells the client + // no probe data is available without failing the media subscription. + return nil + default: + return fmt.Errorf("unsupported moq-lite stream type: %d", streamType) + } +} + +func (s *MoQService) serveMoQLiteSubscribe( + ctx context.Context, + sess *webtransport.Session, + stream *webtransport.Stream, + reader quicvarint.Reader, + roomName livekit.RoomName, + claims *auth.ClaimGrants, + layer int32, +) error { + subscribe, err := readMoQLiteSubscribe(reader) + if err != nil { + return err + } + + resolved, status, err := s.resolveMoQLiteTrack(ctx, roomName, subscribe, claims) + if err != nil { + _ = writeMoQLiteSubscribeDrop(stream, s.config.WriteTimeout) + return fmt.Errorf("could not resolve moq-lite subscription: status=%d, error=%w", status, err) + } + + if err := writeMoQLiteSubscribeOK(stream, s.config.WriteTimeout, subscribe); err != nil { + return err + } + go discardMoQLiteSubscribeUpdates(ctx, reader) + + trackSub, cached, unsubscribe := resolved.tap.Subscribe(layer) + defer unsubscribe() + + if cached != nil { + if err := s.writeMoQLiteSample(ctx, sess, subscribe.ID, cached, resolved.Timing); err != nil { + return err + } + } + + for { + select { + case <-ctx.Done(): + return nil + case <-trackSub.done: + return nil + case sample := <-trackSub.queue: + if sample == nil { + continue + } + if err := s.writeMoQLiteSample(ctx, sess, subscribe.ID, sample, resolved.Timing); err != nil { + return err + } + } + } +} + +func (s *MoQService) resolveMoQLiteTrack( + ctx context.Context, + defaultRoomName livekit.RoomName, + subscribe moqLiteSubscribe, + claims *auth.ClaimGrants, +) (moqLiteResolvedTrack, int, error) { + var resolved moqLiteResolvedTrack + roomName := defaultRoomName + trackName, timingTrack := splitMoQLiteTrackName(subscribe.Track) + if subscribe.Broadcast != "" { + broadcastRoom := livekit.RoomName(subscribe.Broadcast) + if defaultRoomName != "" && broadcastRoom != defaultRoomName { + return resolved, http.StatusForbidden, fmt.Errorf("broadcast %q does not match token room %q", subscribe.Broadcast, defaultRoomName) + } + roomName = broadcastRoom + } + if roomName == "" { + return resolved, http.StatusBadRequest, ErrNoRoomName + } + + room := s.roomManager.GetRoom(ctx, roomName) + if room == nil { + return resolved, http.StatusNotFound, ErrRoomNotFound + } + + var candidates []moqLiteResolvedTrack + for _, participant := range room.GetParticipants() { + for _, track := range participant.GetPublishedTracks() { + if !isMoQSupportedTrack(track) { + continue + } + if !participant.HasPermission(track.ID(), livekit.ParticipantIdentity(claims.Identity)) { + continue + } + tap := s.tracks.AttachTrack(room, participant, track) + if tap == nil { + continue + } + if trackName == "" || string(track.ID()) == trackName || track.Name() == trackName { + candidates = append(candidates, moqLiteResolvedTrack{ + moqResolvedTrack: moqResolvedTrack{room: room, participant: participant, track: track, tap: tap}, + Timing: timingTrack, + }) + } + } + } + + if subscribe.Track != "" && len(candidates) == 0 { + return resolved, http.StatusNotFound, fmt.Errorf("track not found or not available for moq-lite: %s", subscribe.Track) + } + if subscribe.Track == "" && len(candidates) != 1 { + return resolved, http.StatusBadRequest, fmt.Errorf("track name is required when %d moq-compatible tracks are available", len(candidates)) + } + if len(candidates) > 1 { + return resolved, http.StatusBadRequest, fmt.Errorf("track name %q matched %d moq-compatible tracks", subscribe.Track, len(candidates)) + } + + return candidates[0], http.StatusOK, nil +} + +func splitMoQLiteTrackName(track string) (string, bool) { + for _, suffix := range []string{".timing", ".metadata"} { + if strings.HasSuffix(track, suffix) { + return strings.TrimSuffix(track, suffix), true + } + } + return track, false +} + +func (s *MoQService) writeMoQLiteSample( + ctx context.Context, + sess *webtransport.Session, + subscribeID uint64, + sample *moqSample, + timing bool, +) error { + stream, err := sess.OpenUniStreamSync(ctx) + if err != nil { + return err + } + defer func() { + _ = stream.Close() + }() + payload := sample.Payload + if timing { + var err error + payload, err = json.Marshal(moqLiteTimingSample{ + Sequence: sample.Sequence, + TrackID: string(sample.TrackID), + TrackName: sample.TrackName, + Layer: sample.Layer, + RTPTime: sample.RTPTime, + KeyFrame: sample.KeyFrame, + Cached: sample.Cached, + SentAtUnixNs: sample.SentAtUnixNs, + UserTimestampUs: sample.UserTimestampUs, + FrameID: sample.FrameID, + }) + if err != nil { + return err + } + } + return writeMoQLiteGroup(stream, s.config.WriteTimeout, subscribeID, sample.Sequence, payload) +} + +func readMoQLiteSubscribe(r quicvarint.Reader) (moqLiteSubscribe, error) { + var subscribe moqLiteSubscribe + data, err := readMoQLiteMessage(r) + if err != nil { + return subscribe, err + } + + br := bytes.NewReader(data) + reader := quicvarint.NewReader(br) + subscribe.ID, err = quicvarint.Read(reader) + if err != nil { + return subscribe, err + } + subscribe.Broadcast, err = readMoQLiteString(reader) + if err != nil { + return subscribe, err + } + subscribe.Track, err = readMoQLiteString(reader) + if err != nil { + return subscribe, err + } + subscribe.Priority, err = readMoQLiteU8(reader) + if err != nil { + return subscribe, err + } + subscribe.Ordered, err = readMoQLiteBool(reader) + if err != nil { + return subscribe, err + } + subscribe.MaxLatency, err = quicvarint.Read(reader) + if err != nil { + return subscribe, err + } + startGroup, err := quicvarint.Read(reader) + if err != nil { + return subscribe, err + } + if startGroup > 0 { + subscribe.StartGroup = startGroup - 1 + subscribe.HasStart = true + } + endGroup, err := quicvarint.Read(reader) + if err != nil { + return subscribe, err + } + if endGroup > 0 { + subscribe.EndGroup = endGroup - 1 + subscribe.HasEnd = true + } + if br.Len() != 0 { + return subscribe, fmt.Errorf("moq-lite subscribe had %d trailing bytes", br.Len()) + } + return subscribe, nil +} + +func readMoQLiteMessage(r quicvarint.Reader) ([]byte, error) { + size, err := quicvarint.Read(r) + if err != nil { + return nil, err + } + if size > moqLiteMaxMessageSize { + return nil, fmt.Errorf("moq-lite message too large: %d", size) + } + data := make([]byte, int(size)) + if _, err := io.ReadFull(r, data); err != nil { + return nil, err + } + return data, nil +} + +func readMoQLiteString(r quicvarint.Reader) (string, error) { + data, err := readMoQLiteMessage(r) + if err != nil { + return "", err + } + return string(data), nil +} + +func readMoQLiteU8(r quicvarint.Reader) (uint8, error) { + b, err := r.ReadByte() + return b, err +} + +func readMoQLiteBool(r quicvarint.Reader) (bool, error) { + value, err := readMoQLiteU8(r) + if err != nil { + return false, err + } + switch value { + case 0: + return false, nil + case 1: + return true, nil + default: + return false, fmt.Errorf("invalid moq-lite bool: %d", value) + } +} + +func discardMoQLiteSubscribeUpdates(ctx context.Context, r quicvarint.Reader) { + for { + if ctx.Err() != nil { + return + } + size, err := quicvarint.Read(r) + if err != nil { + return + } + if size > moqLiteMaxMessageSize { + return + } + if _, err = io.CopyN(io.Discard, r, int64(size)); err != nil { + return + } + } +} + +func writeMoQLiteSubscribeOK(w interface { + SetWriteDeadline(time.Time) error + Write([]byte) (int, error) +}, timeout time.Duration, subscribe moqLiteSubscribe) error { + var payload []byte + payload = append(payload, subscribe.Priority) + payload = append(payload, boolByte(subscribe.Ordered)) + payload = quicvarint.Append(payload, subscribe.MaxLatency) + payload = appendMoQLiteOptionalGroup(payload, subscribe.HasStart, subscribe.StartGroup) + payload = appendMoQLiteOptionalGroup(payload, subscribe.HasEnd, subscribe.EndGroup) + + var response []byte + response = quicvarint.Append(response, moqLiteSubscribeOK) + response = appendMoQLiteMessage(response, payload) + return writeMoQLiteBytes(w, timeout, response) +} + +func writeMoQLiteSubscribeDrop(w interface { + SetWriteDeadline(time.Time) error + Write([]byte) (int, error) +}, timeout time.Duration) error { + var payload []byte + payload = quicvarint.Append(payload, 0) + payload = quicvarint.Append(payload, 0) + payload = quicvarint.Append(payload, 1) + + var response []byte + response = quicvarint.Append(response, moqLiteSubscribeDrop) + response = appendMoQLiteMessage(response, payload) + return writeMoQLiteBytes(w, timeout, response) +} + +func writeMoQLiteGroup(w interface { + SetWriteDeadline(time.Time) error + Write([]byte) (int, error) +}, timeout time.Duration, subscribeID uint64, sequence uint64, payload []byte) error { + var group []byte + group = quicvarint.Append(group, subscribeID) + group = quicvarint.Append(group, sequence) + + data := make([]byte, 0, 1+len(group)+len(payload)+16) + data = append(data, moqLiteStreamGroup) + data = appendMoQLiteMessage(data, group) + data = quicvarint.Append(data, uint64(len(payload))) + data = append(data, payload...) + return writeMoQLiteBytes(w, timeout, data) +} + +func appendMoQLiteOptionalGroup(dst []byte, ok bool, group uint64) []byte { + if !ok { + return quicvarint.Append(dst, 0) + } + return quicvarint.Append(dst, group+1) +} + +func appendMoQLiteMessage(dst []byte, payload []byte) []byte { + dst = quicvarint.Append(dst, uint64(len(payload))) + return append(dst, payload...) +} + +func writeMoQLiteBytes(w interface { + SetWriteDeadline(time.Time) error + Write([]byte) (int, error) +}, timeout time.Duration, data []byte) error { + if err := w.SetWriteDeadline(time.Now().Add(timeout)); err != nil { + return err + } + _, err := w.Write(data) + if errors.Is(err, io.EOF) { + return nil + } + return err +} + +func boolByte(v bool) byte { + if v { + return 1 + } + return 0 +} diff --git a/pkg/service/moq_lite_test.go b/pkg/service/moq_lite_test.go new file mode 100644 index 000000000..58c507648 --- /dev/null +++ b/pkg/service/moq_lite_test.go @@ -0,0 +1,83 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "bytes" + "testing" + "time" + + "github.com/quic-go/quic-go/quicvarint" + "github.com/stretchr/testify/require" +) + +func TestMoQLiteReadSubscribe(t *testing.T) { + var payload []byte + payload = quicvarint.Append(payload, 7) + payload = appendMoQLiteMessage(payload, []byte("moq-local-video")) + payload = appendMoQLiteMessage(payload, []byte("camera")) + payload = append(payload, 3) + payload = append(payload, 1) + payload = quicvarint.Append(payload, 250) + payload = quicvarint.Append(payload, 10) + payload = quicvarint.Append(payload, 20) + + data := appendMoQLiteMessage(nil, payload) + subscribe, err := readMoQLiteSubscribe(quicvarint.NewReader(bytes.NewReader(data))) + require.NoError(t, err) + require.Equal(t, uint64(7), subscribe.ID) + require.Equal(t, "moq-local-video", subscribe.Broadcast) + require.Equal(t, "camera", subscribe.Track) + require.Equal(t, uint8(3), subscribe.Priority) + require.True(t, subscribe.Ordered) + require.Equal(t, uint64(250), subscribe.MaxLatency) + require.True(t, subscribe.HasStart) + require.Equal(t, uint64(9), subscribe.StartGroup) + require.True(t, subscribe.HasEnd) + require.Equal(t, uint64(19), subscribe.EndGroup) +} + +func TestMoQLiteWriteGroup(t *testing.T) { + w := &moqLiteTestWriter{} + err := writeMoQLiteGroup(w, time.Second, 7, 42, []byte{0x00, 0x00, 0x01, 0x65}) + require.NoError(t, err) + + reader := quicvarint.NewReader(bytes.NewReader(w.Bytes())) + streamType, err := reader.ReadByte() + require.NoError(t, err) + require.Equal(t, moqLiteStreamGroup, streamType) + + groupData, err := readMoQLiteMessage(reader) + require.NoError(t, err) + groupReader := quicvarint.NewReader(bytes.NewReader(groupData)) + subscribeID, err := quicvarint.Read(groupReader) + require.NoError(t, err) + require.Equal(t, uint64(7), subscribeID) + sequence, err := quicvarint.Read(groupReader) + require.NoError(t, err) + require.Equal(t, uint64(42), sequence) + + frame, err := readMoQLiteMessage(reader) + require.NoError(t, err) + require.Equal(t, []byte{0x00, 0x00, 0x01, 0x65}, frame) +} + +type moqLiteTestWriter struct { + bytes.Buffer +} + +func (w *moqLiteTestWriter) SetWriteDeadline(time.Time) error { + return nil +} diff --git a/pkg/service/moq_service.go b/pkg/service/moq_service.go new file mode 100644 index 000000000..bcdeb7866 --- /dev/null +++ b/pkg/service/moq_service.go @@ -0,0 +1,485 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "errors" + "fmt" + "math/big" + "net" + "net/http" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/quic-go/quic-go" + "github.com/quic-go/quic-go/http3" + webtransport "github.com/quic-go/webtransport-go" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/rtc/types" +) + +type MoQService struct { + config config.MoQConfig + development bool + keyProvider auth.KeyProvider + roomManager *RoomManager + tracks *moqTrackRegistry + logger logger.Logger + + lock sync.Mutex + servers []*webtransport.Server + wg sync.WaitGroup + stopped atomic.Bool +} + +type moqResolvedTrack struct { + room *rtc.Room + participant types.Participant + track types.MediaTrack + tap *moqTrackTap +} + +func NewMoQService(conf *config.Config, keyProvider auth.KeyProvider, roomManager *RoomManager) (*MoQService, error) { + if keyProvider == nil { + return nil, errors.New("moq requires an API key provider") + } + if conf.MoQ.Port == 0 { + return nil, errors.New("moq.port must be configured") + } + + lgr := logger.GetLogger().WithComponent("moq") + s := &MoQService{ + config: conf.MoQ, + development: conf.Development, + keyProvider: keyProvider, + roomManager: roomManager, + logger: lgr, + } + s.tracks = newMoQTrackRegistry(moqTrackRegistryParams{ + Config: s.config, + Logger: lgr, + }) + roomManager.AddOnRoomCreated(s.tracks.AttachRoom) + return s, nil +} + +func (s *MoQService) Start() error { + tlsConf, err := s.tlsConfig() + if err != nil { + return err + } + + type listenTarget struct { + server *webtransport.Server + conn *net.UDPConn + addr string + } + targets := make([]listenTarget, 0, len(s.config.BindAddresses)) + for _, addr := range s.config.BindAddresses { + bindAddr := net.JoinHostPort(addr, strconv.Itoa(int(s.config.Port))) + udpAddr, err := net.ResolveUDPAddr("udp", bindAddr) + if err != nil { + for _, target := range targets { + _ = target.conn.Close() + } + return err + } + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + for _, target := range targets { + _ = target.conn.Close() + } + return err + } + + h3Server := &http3.Server{ + Addr: bindAddr, + TLSConfig: http3.ConfigureTLSConfig(tlsConf.Clone()), + QUICConfig: &quic.Config{ + EnableDatagrams: true, + EnableStreamResetPartialDelivery: true, + }, + } + webtransport.ConfigureHTTP3Server(h3Server) + + wtServer := &webtransport.Server{ + H3: h3Server, + ApplicationProtocols: []string{moqLiteProtocol, moqWireProtocol}, + CheckOrigin: func(*http.Request) bool { return true }, + } + + mux := http.NewServeMux() + mux.HandleFunc(s.config.Path, func(w http.ResponseWriter, r *http.Request) { + s.handleWebTransport(w, r, wtServer) + }) + h3Server.Handler = mux + + targets = append(targets, listenTarget{ + server: wtServer, + conn: conn, + addr: bindAddr, + }) + } + + s.lock.Lock() + defer s.lock.Unlock() + + for _, target := range targets { + s.servers = append(s.servers, target.server) + s.wg.Add(1) + go func(target listenTarget) { + defer s.wg.Done() + defer target.conn.Close() + s.logger.Infow("starting moq webtransport listener", "addr", target.addr, "path", s.config.Path) + if err := target.server.Serve(target.conn); err != nil && !s.stopped.Load() { + s.logger.Errorw("moq webtransport listener stopped", err, "addr", target.addr) + } + }(target) + } + + return nil +} + +func (s *MoQService) Stop(ctx context.Context) error { + s.stopped.Store(true) + + s.lock.Lock() + servers := append([]*webtransport.Server(nil), s.servers...) + s.lock.Unlock() + + var closeErr error + for _, server := range servers { + if err := server.Close(); err != nil && closeErr == nil { + closeErr = err + } + } + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-done: + return closeErr + } +} + +func (s *MoQService) handleWebTransport(w http.ResponseWriter, r *http.Request, wtServer *webtransport.Server) { + claims, err := s.authenticate(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + if claims.Video == nil || !claims.Video.RoomJoin || !claims.Video.GetCanSubscribe() { + http.Error(w, ErrPermissionDenied.Error(), http.StatusForbidden) + return + } + + roomName := livekit.RoomName(claims.Video.Room) + if roomName == "" { + roomName = livekit.RoomName(r.URL.Query().Get("room")) + } + if roomName == "" { + http.Error(w, ErrNoRoomName.Error(), http.StatusBadRequest) + return + } + + layer := int32(0) + if layerValue := r.URL.Query().Get("layer"); layerValue != "" { + parsed, err := strconv.ParseInt(layerValue, 10, 32) + if err != nil || parsed < 0 { + http.Error(w, "invalid layer", http.StatusBadRequest) + return + } + layer = int32(parsed) + } + + sess, err := wtServer.Upgrade(w, r) + if err != nil { + s.logger.Warnw("could not upgrade moq webtransport session", err) + return + } + defer func() { + _ = sess.CloseWithError(0, "") + }() + + switch protocol := sess.SessionState().ApplicationProtocol; protocol { + case moqLiteProtocol: + s.serveMoQLiteSession(sess, roomName, claims, layer) + case "", moqWireProtocol: + resolved, catalog, _, err := s.resolveTrack(sess.Context(), roomName, livekit.TrackID(r.URL.Query().Get("track_id")), claims) + if err != nil { + _ = sess.CloseWithError(1, err.Error()) + return + } + s.serveSubscription(sess, resolved, catalog, layer) + default: + s.logger.Warnw("unsupported moq webtransport protocol", nil, "protocol", protocol) + _ = sess.CloseWithError(1, "unsupported moq protocol") + } +} + +func (s *MoQService) authenticate(r *http.Request) (*auth.ClaimGrants, error) { + authHeader := r.Header.Get(authorizationHeader) + var authToken string + if authHeader != "" { + if !strings.HasPrefix(authHeader, bearerPrefix) { + return nil, ErrMissingAuthorization + } + authToken = authHeader[len(bearerPrefix):] + } else { + authToken = r.FormValue(accessTokenParam) + } + if authToken == "" { + return nil, ErrMissingAuthorization + } + + verifier, err := auth.ParseAPIToken(authToken) + if err != nil { + return nil, ErrInvalidAuthorizationToken + } + secret := s.keyProvider.GetSecret(verifier.APIKey()) + if secret == "" { + return nil, ErrInvalidAPIKey + } + _, claims, err := verifier.Verify(secret) + if err != nil { + return nil, ErrInvalidAuthorizationToken + } + return claims, nil +} + +func (s *MoQService) resolveTrack( + ctx context.Context, + roomName livekit.RoomName, + trackID livekit.TrackID, + claims *auth.ClaimGrants, +) (moqResolvedTrack, []moqWireCatalogTrack, int, error) { + var resolved moqResolvedTrack + room := s.roomManager.GetRoom(ctx, roomName) + if room == nil { + return resolved, nil, http.StatusNotFound, ErrRoomNotFound + } + + var candidates []moqResolvedTrack + var catalog []moqWireCatalogTrack + for _, participant := range room.GetParticipants() { + for _, track := range participant.GetPublishedTracks() { + if !isMoQSupportedTrack(track) { + continue + } + if !participant.HasPermission(track.ID(), livekit.ParticipantIdentity(claims.Identity)) { + continue + } + tap := s.tracks.AttachTrack(room, participant, track) + if tap == nil { + continue + } + catalog = append(catalog, tap.CatalogTracks()...) + candidate := moqResolvedTrack{room: room, participant: participant, track: track, tap: tap} + if trackID == "" || track.ID() == trackID { + candidates = append(candidates, candidate) + } + } + } + + if trackID != "" && len(candidates) == 0 { + return resolved, catalog, http.StatusNotFound, fmt.Errorf("track not found or not available for moq: %s", trackID) + } + if trackID == "" && len(candidates) != 1 { + return resolved, catalog, http.StatusBadRequest, fmt.Errorf("track_id is required when %d moq-compatible tracks are available", len(candidates)) + } + + return candidates[0], catalog, http.StatusOK, nil +} + +func (s *MoQService) serveSubscription( + sess *webtransport.Session, + resolved moqResolvedTrack, + catalog []moqWireCatalogTrack, + layer int32, +) { + ctx := sess.Context() + stream, err := sess.OpenUniStreamSync(ctx) + if err != nil { + s.logger.Warnw("could not open moq media stream", err, "trackID", resolved.track.ID()) + return + } + defer func() { + _ = stream.Close() + }() + + sub, cached, unsubscribe := resolved.tap.Subscribe(layer) + defer unsubscribe() + + if err := s.writeCatalog(stream, resolved.room.Name(), resolved.track.ID(), catalog); err != nil { + s.logger.Debugw("could not write moq catalog", "error", err) + return + } + if cached != nil { + if err := s.writeSample(stream, cached); err != nil { + s.logger.Debugw("could not write cached moq keyframe", "error", err) + return + } + } + + for { + select { + case <-ctx.Done(): + return + case <-sub.done: + return + case sample := <-sub.queue: + if err := s.writeSample(stream, sample); err != nil { + s.logger.Debugw("could not write moq sample", "error", err) + return + } + } + } +} + +func (s *MoQService) writeCatalog(w interface { + SetWriteDeadline(time.Time) error + Write([]byte) (int, error) +}, roomName livekit.RoomName, trackID livekit.TrackID, tracks []moqWireCatalogTrack) error { + if err := w.SetWriteDeadline(time.Now().Add(s.config.WriteTimeout)); err != nil { + return err + } + return writeMoQWireMessage(w, moqWireMessage{ + Type: "catalog", + Protocol: moqWireProtocol, + Room: string(roomName), + TrackID: string(trackID), + Tracks: tracks, + }, nil) +} + +func (s *MoQService) writeSample(w interface { + SetWriteDeadline(time.Time) error + Write([]byte) (int, error) +}, sample *moqSample) error { + if err := w.SetWriteDeadline(time.Now().Add(s.config.WriteTimeout)); err != nil { + return err + } + return writeMoQWireMessage(w, moqWireMessage{ + Type: "sample", + Protocol: moqWireProtocol, + TrackID: string(sample.TrackID), + TrackName: sample.TrackName, + PublisherID: string(sample.PublisherID), + Publisher: string(sample.Publisher), + MimeType: sample.MimeType, + PayloadFormat: "annexb", + Width: sample.Width, + Height: sample.Height, + Layer: sample.Layer, + Sequence: sample.Sequence, + RTPTime: sample.RTPTime, + KeyFrame: sample.KeyFrame, + Cached: sample.Cached, + SentAtUnixNs: sample.SentAtUnixNs, + UserTimestampUs: sample.UserTimestampUs, + FrameID: sample.FrameID, + }, sample.Payload) +} + +func (s *MoQService) tlsConfig() (*tls.Config, error) { + if s.config.CertFile != "" || s.config.KeyFile != "" { + if s.config.CertFile == "" || s.config.KeyFile == "" { + return nil, errors.New("moq cert_file and key_file must be configured together") + } + cert, err := tls.LoadX509KeyPair(s.config.CertFile, s.config.KeyFile) + if err != nil { + return nil, err + } + return &tls.Config{ + MinVersion: tls.VersionTLS13, + Certificates: []tls.Certificate{cert}, + }, nil + } + + if !s.development { + return nil, errors.New("moq cert_file and key_file must be configured outside development mode") + } + + cert, err := generateMoQSelfSignedCert() + if err != nil { + return nil, err + } + s.logger.Warnw("using ephemeral self-signed certificate for moq; configure moq.cert_file and moq.key_file for production", nil) + return &tls.Config{ + MinVersion: tls.VersionTLS13, + Certificates: []tls.Certificate{cert}, + }, nil +} + +func generateMoQSelfSignedCert() (tls.Certificate, error) { + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return tls.Certificate{}, err + } + + serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128)) + if err != nil { + return tls.Certificate{}, err + } + + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"LiveKit Development"}, + }, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + DNSNames: []string{"localhost"}, + IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback}, + } + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + if err != nil { + return tls.Certificate{}, err + } + keyBytes, err := x509.MarshalECPrivateKey(priv) + if err != nil { + return tls.Certificate{}, err + } + + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyBytes}) + return tls.X509KeyPair(certPEM, keyPEM) +} diff --git a/pkg/service/moq_track.go b/pkg/service/moq_track.go new file mode 100644 index 000000000..bb12a32e7 --- /dev/null +++ b/pkg/service/moq_track.go @@ -0,0 +1,492 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/livekit/protocol/codecs/mime" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/pion/webrtc/v4" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/rtc" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/packettrailer" +) + +type moqTrackKey struct { + room livekit.RoomName + trackID livekit.TrackID +} + +type moqTrackRegistry struct { + params moqTrackRegistryParams + + lock sync.RWMutex + tracks map[moqTrackKey]*moqTrackTap +} + +type moqTrackRegistryParams struct { + Config config.MoQConfig + Logger logger.Logger +} + +func newMoQTrackRegistry(params moqTrackRegistryParams) *moqTrackRegistry { + return &moqTrackRegistry{ + params: params, + tracks: make(map[moqTrackKey]*moqTrackTap), + } +} + +func (r *moqTrackRegistry) AttachRoom(room *rtc.Room) { + for _, participant := range room.GetParticipants() { + for _, track := range participant.GetPublishedTracks() { + r.AttachTrack(room, participant, track) + } + } + + room.AddOnTrackPublished(func(participant types.Participant, track types.MediaTrack) { + r.AttachTrack(room, participant, track) + }) +} + +func (r *moqTrackRegistry) AttachTrack(room *rtc.Room, participant types.Participant, track types.MediaTrack) *moqTrackTap { + if !isMoQSupportedTrack(track) { + return nil + } + + key := moqTrackKey{room: room.Name(), trackID: track.ID()} + + r.lock.Lock() + tap := r.tracks[key] + if tap == nil { + tap = newMoQTrackTap(moqTrackTapParams{ + Config: r.params.Config, + Logger: r.params.Logger, + Room: room, + Participant: participant, + Track: track, + }) + r.tracks[key] = tap + track.AddOnClose(func(_ bool) { + r.lock.Lock() + if r.tracks[key] == tap { + delete(r.tracks, key) + } + r.lock.Unlock() + tap.Close() + }) + if observer, ok := track.(interface{ OnSetupReceiver(func(mime.MimeType)) }); ok { + observer.OnSetupReceiver(func(mime.MimeType) { + tap.AttachReceivers() + }) + } + } + r.lock.Unlock() + + tap.AttachReceivers() + return tap +} + +func (r *moqTrackRegistry) Get(roomName livekit.RoomName, trackID livekit.TrackID) *moqTrackTap { + r.lock.RLock() + defer r.lock.RUnlock() + return r.tracks[moqTrackKey{room: roomName, trackID: trackID}] +} + +func isMoQSupportedTrack(track types.MediaTrack) bool { + if track.Kind() != livekit.TrackType_VIDEO || track.IsEncrypted() { + return false + } + ti := track.ToProto() + return ti.GetMimeType() == "" || mime.IsMimeTypeStringEqual(ti.GetMimeType(), mime.MimeTypeH264.String()) +} + +type moqTrackTapParams struct { + Config config.MoQConfig + Logger logger.Logger + Room *rtc.Room + Participant types.Participant + Track types.MediaTrack +} + +type moqTrackTap struct { + params moqTrackTapParams + + id string + subscriberID livekit.ParticipantID + qualityNodeID livekit.NodeID + + lock sync.RWMutex + receivers map[sfu.TrackReceiver]struct{} + layers map[int32]*moqLayerState + subscribers map[string]*moqTrackSubscriber + closed bool + + sequence atomic.Uint64 +} + +type moqLayerState struct { + lock sync.Mutex + assembler *h264AccessUnitAssembler + cache *moqSample +} + +type moqSample struct { + TrackID livekit.TrackID + TrackName string + PublisherID livekit.ParticipantID + Publisher livekit.ParticipantIdentity + MimeType string + Width uint32 + Height uint32 + Layer int32 + Sequence uint64 + RTPTime uint32 + KeyFrame bool + Cached bool + SentAtUnixNs int64 + UserTimestampUs *int64 + FrameID *uint32 + Payload []byte +} + +func (s *moqSample) Clone(cached bool) *moqSample { + if s == nil { + return nil + } + clone := *s + clone.Cached = cached + if s.UserTimestampUs != nil { + userTimestampUs := *s.UserTimestampUs + clone.UserTimestampUs = &userTimestampUs + } + if s.FrameID != nil { + frameID := *s.FrameID + clone.FrameID = &frameID + } + clone.Payload = cloneBytes(s.Payload) + return &clone +} + +type moqTrackSubscriber struct { + id string + layer int32 + queue chan *moqSample + done chan struct{} +} + +func newMoQTrackTap(params moqTrackTapParams) *moqTrackTap { + trackID := params.Track.ID() + return &moqTrackTap{ + params: params, + id: fmt.Sprintf("moq:%s:%s", params.Room.Name(), trackID), + subscriberID: livekit.ParticipantID(fmt.Sprintf("MOQ_%s", trackID)), + qualityNodeID: livekit.NodeID(fmt.Sprintf("moq:%s:%s", params.Room.Name(), trackID)), + receivers: make(map[sfu.TrackReceiver]struct{}), + layers: make(map[int32]*moqLayerState), + subscribers: make(map[string]*moqTrackSubscriber), + } +} + +func (t *moqTrackTap) AttachReceivers() { + t.lock.Lock() + if t.closed || len(t.subscribers) == 0 { + t.lock.Unlock() + return + } + receivers := t.params.Track.Receivers() + for _, receiver := range receivers { + if receiver == nil || receiver.IsClosed() || !mime.IsMimeTypeStringEqual(receiver.Mime().String(), mime.MimeTypeH264.String()) { + continue + } + if _, ok := t.receivers[receiver]; ok { + continue + } + if err := receiver.AddDownTrack(t); err != nil { + t.params.Logger.Warnw("could not attach moq track tap", err, "trackID", t.params.Track.ID()) + continue + } + t.receivers[receiver] = struct{}{} + receiver.SendPLI(0, true) + } + t.lock.Unlock() +} + +func (t *moqTrackTap) Subscribe(layer int32) (*moqTrackSubscriber, *moqSample, func()) { + sub := &moqTrackSubscriber{ + id: fmt.Sprintf("%s:%d", t.id, time.Now().UnixNano()), + layer: layer, + queue: make(chan *moqSample, t.params.Config.TrackQueueSize), + done: make(chan struct{}), + } + + var cached *moqSample + var shouldEnableQuality bool + t.lock.Lock() + if t.closed { + t.lock.Unlock() + close(sub.done) + return sub, nil, func() {} + } + shouldEnableQuality = len(t.subscribers) == 0 + t.subscribers[sub.id] = sub + if layerState := t.layers[layer]; layerState != nil { + layerState.lock.Lock() + cached = layerState.cache.Clone(true) + layerState.lock.Unlock() + } + t.lock.Unlock() + + if shouldEnableQuality { + t.setSubscriberNodeQuality(livekit.VideoQuality_HIGH) + } + t.AttachReceivers() + t.lock.RLock() + for receiver := range t.receivers { + receiver.SendPLI(layer, false) + } + t.lock.RUnlock() + + unsubscribe := func() { + var shouldDisableQuality bool + t.lock.Lock() + if t.subscribers[sub.id] == sub { + delete(t.subscribers, sub.id) + close(sub.done) + shouldDisableQuality = len(t.subscribers) == 0 + } + t.lock.Unlock() + if shouldDisableQuality { + t.detachReceivers() + t.setSubscriberNodeQuality(livekit.VideoQuality_OFF) + } + } + return sub, cached, unsubscribe +} + +func (t *moqTrackTap) setSubscriberNodeQuality(quality livekit.VideoQuality) { + localTrack, ok := t.params.Track.(types.LocalMediaTrack) + if !ok { + return + } + + localTrack.NotifySubscriberNodeMaxQuality(t.qualityNodeID, []types.SubscribedCodecQuality{ + { + CodecMime: mime.MimeTypeH264, + Quality: quality, + }, + }) +} + +func (t *moqTrackTap) detachReceivers() { + t.lock.Lock() + for receiver := range t.receivers { + receiver.DeleteDownTrack(t.subscriberID) + } + t.receivers = make(map[sfu.TrackReceiver]struct{}) + t.lock.Unlock() +} + +func (t *moqTrackTap) CatalogTracks() []moqWireCatalogTrack { + ti := t.params.Track.ToProto() + return []moqWireCatalogTrack{ + { + TrackID: string(t.params.Track.ID()), + TrackName: t.params.Track.Name(), + PublisherID: string(t.params.Participant.ID()), + Publisher: string(t.params.Participant.Identity()), + MimeType: mime.MimeTypeH264.String(), + Width: ti.GetWidth(), + Height: ti.GetHeight(), + }, + } +} + +func (t *moqTrackTap) Close() { + t.lock.Lock() + if t.closed { + t.lock.Unlock() + return + } + t.closed = true + for receiver := range t.receivers { + receiver.DeleteDownTrack(t.subscriberID) + } + for _, sub := range t.subscribers { + close(sub.done) + } + t.subscribers = make(map[string]*moqTrackSubscriber) + t.lock.Unlock() + t.setSubscriberNodeQuality(livekit.VideoQuality_OFF) +} + +func (t *moqTrackTap) UpTrackLayersChange() {} + +func (t *moqTrackTap) UpTrackBitrateAvailabilityChange() {} + +func (t *moqTrackTap) UpTrackMaxPublishedLayerChange(int32) {} + +func (t *moqTrackTap) UpTrackMaxTemporalLayerSeenChange(int32) {} + +func (t *moqTrackTap) UpTrackBitrateReport([]int32, sfu.Bitrates) {} + +func (t *moqTrackTap) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { + if extPkt == nil || extPkt.Packet == nil { + return 0 + } + + payload := cloneBytes(extPkt.Packet.Payload) + trailerMetadata, strip := packettrailer.ParseTrailer(payload, extPkt.Packet.Marker) + if strip > 0 { + payload = payload[:len(payload)-strip] + } + layerState := t.getLayerState(layer) + + layerState.lock.Lock() + au, err := layerState.assembler.Push(payload, extPkt.Packet.Timestamp, extPkt.Packet.Marker) + layerState.lock.Unlock() + if err != nil || au == nil { + return 0 + } + + ti := t.params.Track.ToProto() + sample := &moqSample{ + TrackID: t.params.Track.ID(), + TrackName: t.params.Track.Name(), + PublisherID: t.params.Participant.ID(), + Publisher: t.params.Participant.Identity(), + MimeType: mime.MimeTypeH264.String(), + Width: ti.GetWidth(), + Height: ti.GetHeight(), + Layer: layer, + Sequence: t.sequence.Add(1), + RTPTime: au.RTPTime, + KeyFrame: extPkt.IsKeyFrame || au.HasIDR, + SentAtUnixNs: time.Now().UnixNano(), + Payload: au.Payload, + } + if trailerMetadata.HasUserTimestampUs { + sample.UserTimestampUs = &trailerMetadata.UserTimestampUs + } + if trailerMetadata.HasFrameID { + sample.FrameID = &trailerMetadata.FrameID + } + + if sample.KeyFrame && len(sample.Payload) <= t.params.Config.CacheMaxBytes { + layerState.lock.Lock() + layerState.cache = sample.Clone(false) + layerState.lock.Unlock() + } + + return t.publish(sample) +} + +func (t *moqTrackTap) getLayerState(layer int32) *moqLayerState { + t.lock.Lock() + defer t.lock.Unlock() + + layerState := t.layers[layer] + if layerState == nil { + layerState = &moqLayerState{assembler: newH264AccessUnitAssembler()} + t.layers[layer] = layerState + } + return layerState +} + +func (t *moqTrackTap) publish(sample *moqSample) int32 { + t.lock.RLock() + if t.closed { + t.lock.RUnlock() + return 0 + } + subscribers := make([]*moqTrackSubscriber, 0, len(t.subscribers)) + for _, sub := range t.subscribers { + if sub.layer == sample.Layer { + subscribers = append(subscribers, sub) + } + } + t.lock.RUnlock() + + var written int32 + for _, sub := range subscribers { + if enqueueMoQSample(sub, sample.Clone(false)) { + written++ + } + } + return written +} + +func enqueueMoQSample(sub *moqTrackSubscriber, sample *moqSample) bool { + select { + case <-sub.done: + return false + case sub.queue <- sample: + return true + default: + } + + if !sample.KeyFrame { + return false + } + + select { + case <-sub.done: + return false + case <-sub.queue: + default: + } + select { + case <-sub.done: + return false + case sub.queue <- sample: + return true + default: + return false + } +} + +func (t *moqTrackTap) ID() string { + return t.id +} + +func (t *moqTrackTap) SubscriberID() livekit.ParticipantID { + return t.subscriberID +} + +func (t *moqTrackTap) HandleRTCPSenderReportData(webrtc.PayloadType, int32, *livekit.RTCPSenderReportState) error { + return nil +} + +func (t *moqTrackTap) Resync() {} + +func (t *moqTrackTap) SetReceiver(sfu.TrackReceiver) {} + +func (t *moqTrackTap) ReceiverRestart(sfu.TrackReceiver) { + t.AttachReceivers() +} + +func (t *moqTrackTap) IsClosed() bool { + t.lock.RLock() + defer t.lock.RUnlock() + return t.closed +} diff --git a/pkg/service/moq_wire.go b/pkg/service/moq_wire.go new file mode 100644 index 000000000..57159a0e2 --- /dev/null +++ b/pkg/service/moq_wire.go @@ -0,0 +1,93 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io" + "math" +) + +const ( + moqWireProtocol = "livekit-moq-h264/1" + moqWireVersion = 1 +) + +var moqWireMagic = [4]byte{'L', 'K', 'M', 'Q'} + +type moqWireMessage struct { + Type string `json:"type"` + Protocol string `json:"protocol,omitempty"` + Room string `json:"room,omitempty"` + TrackID string `json:"trackId,omitempty"` + TrackName string `json:"trackName,omitempty"` + PublisherID string `json:"publisherId,omitempty"` + Publisher string `json:"publisher,omitempty"` + MimeType string `json:"mimeType,omitempty"` + PayloadFormat string `json:"payloadFormat,omitempty"` + Width uint32 `json:"width,omitempty"` + Height uint32 `json:"height,omitempty"` + Layer int32 `json:"layer,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + RTPTime uint32 `json:"rtpTime,omitempty"` + KeyFrame bool `json:"keyFrame,omitempty"` + Cached bool `json:"cached,omitempty"` + SentAtUnixNs int64 `json:"sentAtUnixNs,omitempty"` + UserTimestampUs *int64 `json:"userTimestampUs,omitempty"` + FrameID *uint32 `json:"frameId,omitempty"` + Tracks []moqWireCatalogTrack `json:"tracks,omitempty"` + Error string `json:"error,omitempty"` +} + +type moqWireCatalogTrack struct { + TrackID string `json:"trackId"` + TrackName string `json:"trackName,omitempty"` + PublisherID string `json:"publisherId,omitempty"` + Publisher string `json:"publisher,omitempty"` + MimeType string `json:"mimeType,omitempty"` + Width uint32 `json:"width,omitempty"` + Height uint32 `json:"height,omitempty"` +} + +func writeMoQWireMessage(w io.Writer, msg moqWireMessage, payload []byte) error { + meta, err := json.Marshal(msg) + if err != nil { + return err + } + if uint64(len(meta)) > uint64(math.MaxUint32) { + return fmt.Errorf("moq metadata too large: %d", len(meta)) + } + if uint64(len(payload)) > uint64(math.MaxUint32) { + return fmt.Errorf("moq payload too large: %d", len(payload)) + } + + var header [16]byte + copy(header[0:4], moqWireMagic[:]) + header[4] = moqWireVersion + binary.BigEndian.PutUint32(header[8:12], uint32(len(meta))) + binary.BigEndian.PutUint32(header[12:16], uint32(len(payload))) + if _, err := w.Write(header[:]); err != nil { + return err + } + if _, err := w.Write(meta); err != nil { + return err + } + if len(payload) != 0 { + _, err = w.Write(payload) + } + return err +} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 172a7752e..b294c56cb 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -91,7 +91,8 @@ type RoomManager struct { turnAuthHandler *TURNAuthHandler bus psrpc.MessageBus - rooms map[livekit.RoomName]*rtc.Room + rooms map[livekit.RoomName]*rtc.Room + onRoomCreated []func(*rtc.Room) roomServers utils.MultitonService[rpc.RoomTopic] agentDispatchServers utils.MultitonService[rpc.RoomTopic] @@ -188,6 +189,34 @@ func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc return r.rooms[roomName] } +func (r *RoomManager) AddOnRoomCreated(f func(*rtc.Room)) { + if f == nil { + return + } + + r.lock.Lock() + r.onRoomCreated = append(r.onRoomCreated, f) + rooms := make([]*rtc.Room, 0, len(r.rooms)) + for _, room := range r.rooms { + rooms = append(rooms, room) + } + r.lock.Unlock() + + for _, room := range rooms { + f(room) + } +} + +func (r *RoomManager) notifyRoomCreated(room *rtc.Room) { + r.lock.RLock() + onRoomCreated := slices.Clone(r.onRoomCreated) + r.lock.RUnlock() + + for _, f := range onRoomCreated { + f(room) + } +} + // deleteRoom completely deletes all room information, including active sessions, room store, and routing info func (r *RoomManager) deleteRoom(ctx context.Context, roomName livekit.RoomName) error { logger.Infow("deleting room state", "room", roomName) @@ -703,6 +732,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, createRoom *livekit.C r.lock.Unlock() newRoom.Hold() + r.notifyRoomCreated(newRoom) r.telemetry.RoomStarted(ctx, newRoom.ToProto()) prometheus.RoomStarted() diff --git a/pkg/service/server.go b/pkg/service/server.go index 05a51522c..b468eab38 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -50,6 +50,7 @@ type LivekitServer struct { ioService *IOInfoService rtcService *RTCService whipService *WHIPService + moqService *MoQService agentService *AgentService httpServer *http.Server promServer *http.Server @@ -186,6 +187,12 @@ func NewLivekitServer(conf *config.Config, } } + if conf.MoQ.Enabled { + if s.moqService, err = NewMoQService(conf, keyProvider, roomManager); err != nil { + return nil, err + } + } + if err = router.RemoveDeadNodes(); err != nil { return } @@ -305,6 +312,12 @@ func (s *LivekitServer) Start() error { if err := s.signalServer.Start(); err != nil { return err } + if s.moqService != nil { + if err := s.moqService.Start(); err != nil { + s.signalServer.Stop() + return err + } + } httpGroup := &errgroup.Group{} for _, ln := range listeners { @@ -340,6 +353,9 @@ func (s *LivekitServer) Start() error { if s.turnServer != nil { _ = s.turnServer.Close() } + if s.moqService != nil { + _ = s.moqService.Stop(ctx) + } s.roomManager.Stop() s.signalServer.Stop() diff --git a/pkg/sfu/packettrailer/packet_trailer.go b/pkg/sfu/packettrailer/packet_trailer.go index 3b58f6681..a0668760e 100644 --- a/pkg/sfu/packettrailer/packet_trailer.go +++ b/pkg/sfu/packettrailer/packet_trailer.go @@ -20,8 +20,18 @@ const ( xorByte = 0xFF envelopeSize = 5 // 1B trailer_len + 4B magic + + tagUserTimestampUs = 0x01 + tagFrameID = 0x02 ) +type Metadata struct { + UserTimestampUs int64 + HasUserTimestampUs bool + FrameID uint32 + HasFrameID bool +} + // StripTrailer returns the number of bytes to strip from the end of an RTP // payload if it contains an LKTS trailer. The trailer is located by checking // for the "LKTS" magic suffix and then reading the XORed trailer_len byte @@ -44,3 +54,57 @@ func StripTrailer(payload []byte, marker bool) int { return trailerLen } + +func ParseTrailer(payload []byte, marker bool) (Metadata, int) { + var metadata Metadata + trailerLen := StripTrailer(payload, marker) + if trailerLen == 0 { + return metadata, 0 + } + + tlv := payload[len(payload)-trailerLen : len(payload)-envelopeSize] + for len(tlv) != 0 { + if len(tlv) < 2 { + return metadata, trailerLen + } + tag := tlv[0] ^ xorByte + valueLen := int(tlv[1] ^ xorByte) + tlv = tlv[2:] + if valueLen > len(tlv) { + return metadata, trailerLen + } + value := tlv[:valueLen] + tlv = tlv[valueLen:] + + switch tag { + case tagUserTimestampUs: + if valueLen == 8 { + metadata.UserTimestampUs = int64(xorUint64(value)) + metadata.HasUserTimestampUs = true + } + case tagFrameID: + if valueLen == 4 { + metadata.FrameID = xorUint32(value) + metadata.HasFrameID = true + } + } + } + + return metadata, trailerLen +} + +func xorUint64(value []byte) uint64 { + var out uint64 + for _, b := range value { + out = (out << 8) | uint64(b^xorByte) + } + return out +} + +func xorUint32(value []byte) uint32 { + var out uint32 + for _, b := range value { + out = (out << 8) | uint32(b^xorByte) + } + return out +} diff --git a/pkg/sfu/packettrailer/packet_trailer_test.go b/pkg/sfu/packettrailer/packet_trailer_test.go index 653b71fa2..e74cfde94 100644 --- a/pkg/sfu/packettrailer/packet_trailer_test.go +++ b/pkg/sfu/packettrailer/packet_trailer_test.go @@ -19,11 +19,6 @@ import ( "testing" ) -const ( - tagTimestampUs = 0x01 - tagFrameID = 0x02 -) - // appendTLV appends a single XORed TLV element to dst. func appendTLV(dst []byte, tag byte, value []byte) []byte { dst = append(dst, tag^xorByte, byte(len(value))^xorByte) @@ -46,7 +41,7 @@ func makeTrailer(timestampUs int64, frameID uint32) []byte { var tsBuf [8]byte binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs)) - trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:]) var fidBuf [4]byte binary.BigEndian.PutUint32(fidBuf[:], frameID) @@ -71,14 +66,14 @@ func makeTimestampOnlyTrailer(timestampUs int64) []byte { var trailer []byte var tsBuf [8]byte binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs)) - trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:]) trailerLen := byte(len(trailer) + envelopeSize) trailer = appendEnvelope(trailer, trailerLen) return trailer } func TestStripTrailer(t *testing.T) { - fullTrailerSize := 21 // (1+1+8) + (1+1+4) + 5 + fullTrailerSize := 21 // (1+1+8) + (1+1+4) + 5 tsOnlyTrailerSize := 15 // (1+1+8) + 5 tests := []struct { @@ -136,7 +131,7 @@ func TestStripTrailer(t *testing.T) { var trailer []byte var tsBuf [8]byte binary.BigEndian.PutUint64(tsBuf[:], 42) - trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:]) trailer = appendEnvelope(trailer, 200) return trailer }(), @@ -150,7 +145,7 @@ func TestStripTrailer(t *testing.T) { var trailer []byte var tsBuf [8]byte binary.BigEndian.PutUint64(tsBuf[:], 42) - trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:]) + trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:]) trailer = appendEnvelope(trailer, 3) return append(video, trailer...) }(), @@ -180,3 +175,34 @@ func TestStripTrailer(t *testing.T) { }) } } + +func TestParseTrailer(t *testing.T) { + payload := makePayloadWithTrailer(20, 1700000000000000, 42) + + metadata, strip := ParseTrailer(payload, true) + if strip != 21 { + t.Fatalf("ParseTrailer() strip = %d, want 21", strip) + } + if !metadata.HasUserTimestampUs { + t.Fatal("ParseTrailer() did not parse user timestamp") + } + if metadata.UserTimestampUs != 1700000000000000 { + t.Fatalf("ParseTrailer() user timestamp = %d", metadata.UserTimestampUs) + } + if !metadata.HasFrameID { + t.Fatal("ParseTrailer() did not parse frame id") + } + if metadata.FrameID != 42 { + t.Fatalf("ParseTrailer() frame id = %d", metadata.FrameID) + } +} + +func TestParseTrailerAbsent(t *testing.T) { + metadata, strip := ParseTrailer([]byte{0x01, 0x02, 0x03}, true) + if strip != 0 { + t.Fatalf("ParseTrailer() strip = %d, want 0", strip) + } + if metadata.HasUserTimestampUs || metadata.HasFrameID { + t.Fatalf("ParseTrailer() parsed unexpected metadata: %+v", metadata) + } +} diff --git a/test/moq/local_video/README.md b/test/moq/local_video/README.md new file mode 100644 index 000000000..5314f85d6 --- /dev/null +++ b/test/moq/local_video/README.md @@ -0,0 +1,86 @@ +# local_video MoQ Harness + +This harness runs a local LiveKit server with the experimental MoQ/WebTransport +listener enabled, publishes H264 from the Rust SDK `examples/local_video` +publisher, and receives the stream in a browser with the `@moq/net` MoQ client +next to a LiveKit JS SDK WebRTC subscriber for the same room and track. + +The current receiver validates transport compatibility and timing: + +- WebTransport connect time +- first MoQ object time +- received frame/object count +- total payload bytes +- Annex-B H264 NAL types, including whether an IDR was seen +- WebCodecs H264 decode and canvas render count +- rendered canvas pixel validation to catch blank or flat output +- rendered-frame motion validation for the burned timestamp overlay +- packet-trailer timing metadata over a companion `.timing` MoQ track +- subscriber-style timing overlay with exposure, receive, decode, paint, and + e2e latency calculations +- side-by-side LiveKit JS SDK WebRTC rendering of the same publisher video +- WebRTC connect, first-frame, render, visual, motion, and browser-exposed + video-frame timing metadata +- WebRTC packet-trailer extraction through the LiveKit JS SDK packet trailer + worker, with frame ID and user timestamp lookup via `TimeSyncUpdate` + +The MoQ object payload is one Annex-B H264 access unit per MoQ group/frame. +The browser receiver configures WebCodecs from the first SPS, decodes each +access unit, draws decoded frames into a canvas, and validates that nonblank, +moving video is rendered. The WebRTC pane attaches the remote video track with +`livekit-client` and samples the `