example implementation of moq output

This commit is contained in:
David Chen
2026-06-17 12:41:56 -07:00
parent 12a023ae45
commit e8c3156b2d
20 changed files with 3978 additions and 14 deletions
+14
View File
@@ -16,6 +16,20 @@
# for production setups, this port should be placed behind a load balancer with TLS
port: 7880
# Experimental receive-only Media over QUIC/WebTransport downstream. Supports
# the LiveKit H264 probe protocol and moq-lite-04 for @moq/net receive testing.
# WebTransport uses HTTP/3 over UDP and requires TLS. In production, expose this
# UDP port directly or through an HTTP/3-capable load balancer.
# moq:
# enabled: false
# port: 7883
# path: /moq/v1
# cert_file: /path/to/cert.pem
# key_file: /path/to/key.pem
# track_queue_size: 256
# cache_max_bytes: 2097152
# write_timeout: 2s
# when redis is set, LiveKit will automatically operate in a fully distributed fashion
# clients could connect to any node and be routed to the same room
redis:
+4
View File
@@ -43,6 +43,8 @@ require (
github.com/pion/webrtc/v4 v4.2.11
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
github.com/quic-go/quic-go v0.60.0
github.com/quic-go/webtransport-go v0.11.0
github.com/redis/go-redis/v9 v9.20.0
github.com/rs/cors v1.11.1
github.com/stretchr/testify v1.11.1
@@ -68,6 +70,7 @@ require (
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/dunglas/httpsfv v1.1.0 // indirect
github.com/fatih/color v1.19.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
@@ -82,6 +85,7 @@ require (
github.com/olekukonko/errors v1.3.0 // indirect
github.com/olekukonko/ll v0.1.8 // indirect
github.com/puzpuzpuz/xsync/v4 v4.5.0 // indirect
github.com/quic-go/qpack v0.6.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.69.0 // indirect
go.opentelemetry.io/otel v1.44.0 // indirect
+8
View File
@@ -54,6 +54,8 @@ github.com/docker/go-connections v0.7.0 h1:6SsRfJddP22WMrCkj19x9WKjEDTB+ahsdiGYf
github.com/docker/go-connections v0.7.0/go.mod h1:no1qkHdjq7kLMGUXYAduOhYPSJxxvgWBh7ogVvptn3Q=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dunglas/httpsfv v1.1.0 h1:Jw76nAyKWKZKFrpMMcL76y35tOpYHqQPzHQiwDvpe54=
github.com/dunglas/httpsfv v1.1.0/go.mod h1:zID2mqw9mFsnt7YC3vYQ9/cjq30q41W+1AnDwH8TiMg=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/elliotchance/orderedmap/v3 v3.1.0 h1:j4DJ5ObEmMBt/lcwIecKcoRxIQUEnw0L804lXYDt/pg=
@@ -280,6 +282,12 @@ github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEy
github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo=
github.com/puzpuzpuz/xsync/v4 v4.5.0 h1:vOSWu6b57/emh+L/Cw0BeQfvxa/cogFywXHeGUxQxAg=
github.com/puzpuzpuz/xsync/v4 v4.5.0/go.mod h1:VJDmTCJMBt8igNxnkQd86r+8KUeN1quSfNKu5bLYFQo=
github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8=
github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII=
github.com/quic-go/quic-go v0.60.0 h1:xcQioE8OM66UQLeUMHltK1CCcOu3JbVB4JAQdDQSB+0=
github.com/quic-go/quic-go v0.60.0/go.mod h1:wpKpjmPpftl30sL6pFh7REVpjbcCVy4zt2vDyK1TuJk=
github.com/quic-go/webtransport-go v0.11.0 h1:3afiZq7MHv3gmKCbMwZ8D5M1u0y/1RdONN9KlWp32J0=
github.com/quic-go/webtransport-go v0.11.0/go.mod h1:SHgEzUFVyj+9WUSuGB1P6Zd351Pww2leWV3SwlTovkA=
github.com/redis/go-redis/v9 v9.20.0 h1:WnQYxLkgO2xiXTCJY0ldIiI8dNqCDlQAG+AtaH7a2a0=
github.com/redis/go-redis/v9 v9.20.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA=
github.com/rodaine/protogofakeit v0.1.1 h1:ZKouljuRM3A+TArppfBqnH8tGZHOwM/pjvtXe9DaXH8=
+71
View File
@@ -92,6 +92,7 @@ type Config struct {
EnableDataTracks bool `yaml:"enable_data_tracks,omitempty"`
API APIConfig `yaml:"api,omitempty"`
MoQ MoQConfig `yaml:"moq,omitempty"`
}
type RTCConfig struct {
@@ -170,6 +171,55 @@ type TURNServer struct {
TTL int `yaml:"ttl,omitempty"`
}
type MoQConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
// UDP port for the HTTP/3 WebTransport listener. This is intentionally
// separate from the existing TCP HTTP port.
Port uint32 `yaml:"port,omitempty"`
// If unset, bind_addresses from the top-level config are used. If those are
// unset, the listener binds all interfaces.
BindAddresses []string `yaml:"bind_addresses,omitempty"`
Path string `yaml:"path,omitempty"`
// WebTransport requires TLS. In production, configure cert_file and key_file.
// In development mode only, LiveKit can generate an ephemeral self-signed
// certificate when these are unset.
CertFile string `yaml:"cert_file,omitempty"`
KeyFile string `yaml:"key_file,omitempty"`
// Number of access units queued per subscriber before non-keyframes are
// dropped to keep latency bounded.
TrackQueueSize int `yaml:"track_queue_size,omitempty"`
// Max bytes retained for the cached keyframe access unit per track/layer.
CacheMaxBytes int `yaml:"cache_max_bytes,omitempty"`
WriteTimeout time.Duration `yaml:"write_timeout,omitempty"`
}
func (c MoQConfig) WithDefaults(development bool, bindAddresses []string) MoQConfig {
if c.Path == "" {
c.Path = "/moq/v1"
}
if c.TrackQueueSize <= 0 {
c.TrackQueueSize = 256
}
if c.CacheMaxBytes <= 0 {
c.CacheMaxBytes = 2 * 1024 * 1024
}
if c.WriteTimeout <= 0 {
c.WriteTimeout = 2 * time.Second
}
if len(c.BindAddresses) == 0 {
c.BindAddresses = bindAddresses
}
if len(c.BindAddresses) == 0 {
c.BindAddresses = []string{""}
}
if development && c.Port == 0 {
c.Port = 7883
}
return c
}
type CongestionControlConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
AllowPause bool `yaml:"allow_pause,omitempty"`
@@ -478,6 +528,12 @@ var DefaultConfig = Config{
NodeStats: DefaultNodeStatsConfig,
API: DefaultAPIConfig(),
EnableDataTracks: true,
MoQ: MoQConfig{
Path: "/moq/v1",
TrackQueueSize: 256,
CacheMaxBytes: 2 * 1024 * 1024,
WriteTimeout: 2 * time.Second,
},
}
func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []cli.Flag) (*Config, error) {
@@ -517,6 +573,20 @@ func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []c
return nil, err
}
conf.KeyFile = file
if conf.MoQ.CertFile != "" {
file, err = homedir.Expand(os.ExpandEnv(conf.MoQ.CertFile))
if err != nil {
return nil, err
}
conf.MoQ.CertFile = file
}
if conf.MoQ.KeyFile != "" {
file, err = homedir.Expand(os.ExpandEnv(conf.MoQ.KeyFile))
if err != nil {
return nil, err
}
conf.MoQ.KeyFile = file
}
// set defaults for Turn relay if none are set
if conf.TURN.RelayPortRangeStart == 0 || conf.TURN.RelayPortRangeEnd == 0 {
@@ -533,6 +603,7 @@ func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []c
if conf.LogLevel != "" {
conf.Logging.Level = conf.LogLevel
}
conf.MoQ = conf.MoQ.WithDefaults(conf.Development, conf.BindAddresses)
if conf.Logging.Level == "" && conf.Development {
conf.Logging.Level = "debug"
}
+21 -3
View File
@@ -136,9 +136,10 @@ type Room struct {
trailer []byte
onParticipantChanged func(p types.Participant)
onRoomUpdated func()
onClose func()
onParticipantChanged func(p types.Participant)
onRoomUpdated func()
onClose func()
onTrackPublishedCallbacks []func(types.Participant, types.MediaTrack)
simulationLock sync.Mutex
disconnectSignalOnResumeParticipants map[livekit.ParticipantIdentity]time.Time
@@ -837,6 +838,16 @@ func (r *Room) OnParticipantChanged(f func(participant types.Participant)) {
r.onParticipantChanged = f
}
func (r *Room) AddOnTrackPublished(f func(types.Participant, types.MediaTrack)) {
if f == nil {
return
}
r.lock.Lock()
r.onTrackPublishedCallbacks = append(r.onTrackPublishedCallbacks, f)
r.lock.Unlock()
}
func (r *Room) SendDataPacket(dp *livekit.DataPacket, kind livekit.DataPacket_Kind) {
r.onDataMessage(nil, kind, dp)
}
@@ -1073,6 +1084,13 @@ func (r *Room) createJoinResponseLocked(
func (r *Room) onTrackPublished(participant types.Participant, track types.MediaTrack) {
r.trackManager.AddTrack(track, participant.Identity(), participant.ID())
r.lock.RLock()
onTrackPublished := slices.Clone(r.onTrackPublishedCallbacks)
r.lock.RUnlock()
for _, f := range onTrackPublished {
f(participant, track)
}
// publish participant update, since track state is changed
r.broadcastParticipantState(participant, broadcastOptions{skipSource: true})
+147
View File
@@ -0,0 +1,147 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"bytes"
"github.com/pion/rtp/codecs"
)
var annexBStartCode = []byte{0x00, 0x00, 0x00, 0x01}
type h264AccessUnit struct {
Payload []byte
HasIDR bool
HasSPS bool
HasPPS bool
RTPTime uint32
CompletedByMarker bool
}
type h264AccessUnitAssembler struct {
depacketizer codecs.H264Packet
current []byte
currentTime uint32
started bool
sps []byte
pps []byte
}
func newH264AccessUnitAssembler() *h264AccessUnitAssembler {
return &h264AccessUnitAssembler{}
}
func (a *h264AccessUnitAssembler) Push(payload []byte, rtpTime uint32, marker bool) (*h264AccessUnit, error) {
if a.started && a.currentTime != rtpTime && len(a.current) != 0 {
a.current = nil
a.started = false
}
chunk, err := a.depacketizer.Unmarshal(payload)
if err != nil {
return nil, err
}
if len(chunk) != 0 {
if !a.started {
a.currentTime = rtpTime
a.started = true
}
a.current = append(a.current, chunk...)
}
if !marker || len(a.current) == 0 {
return nil, nil
}
auPayload := a.current
a.current = nil
a.started = false
au := &h264AccessUnit{
Payload: auPayload,
RTPTime: rtpTime,
CompletedByMarker: true,
}
for _, nalu := range splitAnnexBNALUs(auPayload) {
if len(nalu) == 0 {
continue
}
switch nalu[0] & 0x1f {
case 5:
au.HasIDR = true
case 7:
au.HasSPS = true
a.sps = cloneBytes(nalu)
case 8:
au.HasPPS = true
a.pps = cloneBytes(nalu)
}
}
if au.HasIDR && (!au.HasSPS || !au.HasPPS) {
au.Payload = a.prependParameterSets(au.Payload, au.HasSPS, au.HasPPS)
au.HasSPS = au.HasSPS || len(a.sps) != 0
au.HasPPS = au.HasPPS || len(a.pps) != 0
}
return au, nil
}
func (a *h264AccessUnitAssembler) prependParameterSets(payload []byte, hasSPS bool, hasPPS bool) []byte {
if (hasSPS || len(a.sps) == 0) && (hasPPS || len(a.pps) == 0) {
return payload
}
prepended := make([]byte, 0, len(payload)+len(a.sps)+len(a.pps)+2*len(annexBStartCode))
if !hasSPS && len(a.sps) != 0 {
prepended = append(prepended, annexBStartCode...)
prepended = append(prepended, a.sps...)
}
if !hasPPS && len(a.pps) != 0 {
prepended = append(prepended, annexBStartCode...)
prepended = append(prepended, a.pps...)
}
prepended = append(prepended, payload...)
return prepended
}
func splitAnnexBNALUs(payload []byte) [][]byte {
var nalus [][]byte
for {
start := bytes.Index(payload, annexBStartCode)
if start == -1 {
break
}
payload = payload[start+len(annexBStartCode):]
next := bytes.Index(payload, annexBStartCode)
if next == -1 {
nalus = append(nalus, payload)
break
}
nalus = append(nalus, payload[:next])
payload = payload[next:]
}
return nalus
}
func cloneBytes(in []byte) []byte {
if len(in) == 0 {
return nil
}
out := make([]byte, len(in))
copy(out, in)
return out
}
+90
View File
@@ -0,0 +1,90 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestH264AccessUnitAssemblerCachesParameterSets(t *testing.T) {
a := newH264AccessUnitAssembler()
au, err := a.Push([]byte{0x67, 0x42, 0x00, 0x1f}, 10, false)
require.NoError(t, err)
require.Nil(t, au)
au, err = a.Push([]byte{0x68, 0xce, 0x06, 0xe2}, 10, false)
require.NoError(t, err)
require.Nil(t, au)
au, err = a.Push([]byte{0x65, 0x88, 0x84}, 10, true)
require.NoError(t, err)
require.NotNil(t, au)
require.True(t, au.HasIDR)
require.True(t, au.HasSPS)
require.True(t, au.HasPPS)
require.Equal(t, appendAnnexB(
[]byte{0x67, 0x42, 0x00, 0x1f},
[]byte{0x68, 0xce, 0x06, 0xe2},
[]byte{0x65, 0x88, 0x84},
), au.Payload)
}
func TestH264AccessUnitAssemblerPrependsCachedParameterSetsToIDR(t *testing.T) {
a := newH264AccessUnitAssembler()
_, err := a.Push([]byte{0x67, 0x42, 0x00, 0x1f}, 10, false)
require.NoError(t, err)
_, err = a.Push([]byte{0x68, 0xce, 0x06, 0xe2}, 10, false)
require.NoError(t, err)
_, err = a.Push([]byte{0x65, 0x88, 0x84}, 10, true)
require.NoError(t, err)
au, err := a.Push([]byte{0x65, 0xaa, 0xbb}, 20, true)
require.NoError(t, err)
require.NotNil(t, au)
require.True(t, au.HasIDR)
require.True(t, au.HasSPS)
require.True(t, au.HasPPS)
require.Equal(t, appendAnnexB(
[]byte{0x67, 0x42, 0x00, 0x1f},
[]byte{0x68, 0xce, 0x06, 0xe2},
[]byte{0x65, 0xaa, 0xbb},
), au.Payload)
}
func TestH264AccessUnitAssemblerReconstructsFUA(t *testing.T) {
a := newH264AccessUnitAssembler()
au, err := a.Push([]byte{0x7c, 0x85, 0x01, 0x02}, 10, false)
require.NoError(t, err)
require.Nil(t, au)
au, err = a.Push([]byte{0x7c, 0x05, 0x03, 0x04}, 10, false)
require.NoError(t, err)
require.Nil(t, au)
au, err = a.Push([]byte{0x7c, 0x45, 0x05, 0x06}, 10, true)
require.NoError(t, err)
require.NotNil(t, au)
require.True(t, au.HasIDR)
require.Equal(t, appendAnnexB([]byte{0x65, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06}), au.Payload)
}
func appendAnnexB(nalus ...[]byte) []byte {
var out []byte
for _, nalu := range nalus {
out = append(out, annexBStartCode...)
out = append(out, nalu...)
}
return out
}
+482
View File
@@ -0,0 +1,482 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/quic-go/quic-go/quicvarint"
webtransport "github.com/quic-go/webtransport-go"
)
const (
moqLiteProtocol = "moq-lite-04"
moqLiteStreamGroup byte = 0
moqLiteStreamSubscribe uint64 = 2
moqLiteStreamProbe uint64 = 4
moqLiteSubscribeOK uint64 = 0
moqLiteSubscribeDrop uint64 = 1
moqLiteMaxMessageSize = 1024 * 1024
)
type moqLiteSubscribe struct {
ID uint64
Broadcast string
Track string
Priority uint8
Ordered bool
MaxLatency uint64
StartGroup uint64
EndGroup uint64
HasStart bool
HasEnd bool
}
type moqLiteResolvedTrack struct {
moqResolvedTrack
Timing bool
}
type moqLiteTimingSample struct {
Sequence uint64 `json:"sequence"`
TrackID string `json:"trackId"`
TrackName string `json:"trackName"`
Layer int32 `json:"layer"`
RTPTime uint32 `json:"rtpTime"`
KeyFrame bool `json:"keyFrame"`
Cached bool `json:"cached"`
SentAtUnixNs int64 `json:"sentAtUnixNs"`
UserTimestampUs *int64 `json:"userTimestampUs,omitempty"`
FrameID *uint32 `json:"frameId,omitempty"`
}
func (s *MoQService) serveMoQLiteSession(
sess *webtransport.Session,
roomName livekit.RoomName,
claims *auth.ClaimGrants,
layer int32,
) {
ctx := sess.Context()
for {
stream, err := sess.AcceptStream(ctx)
if err != nil {
if ctx.Err() == nil {
s.logger.Debugw("could not accept moq-lite stream", "error", err)
}
return
}
go func() {
defer func() {
_ = stream.Close()
}()
if err := s.serveMoQLiteStream(ctx, sess, stream, roomName, claims, layer); err != nil {
s.logger.Debugw("moq-lite stream failed", "error", err)
}
}()
}
}
func (s *MoQService) serveMoQLiteStream(
ctx context.Context,
sess *webtransport.Session,
stream *webtransport.Stream,
roomName livekit.RoomName,
claims *auth.ClaimGrants,
layer int32,
) error {
reader := quicvarint.NewReader(stream)
streamType, err := quicvarint.Read(reader)
if err != nil {
return err
}
switch streamType {
case moqLiteStreamSubscribe:
return s.serveMoQLiteSubscribe(ctx, sess, stream, reader, roomName, claims, layer)
case moqLiteStreamProbe:
// Probe is optional telemetry in @moq/net. Closing cleanly tells the client
// no probe data is available without failing the media subscription.
return nil
default:
return fmt.Errorf("unsupported moq-lite stream type: %d", streamType)
}
}
func (s *MoQService) serveMoQLiteSubscribe(
ctx context.Context,
sess *webtransport.Session,
stream *webtransport.Stream,
reader quicvarint.Reader,
roomName livekit.RoomName,
claims *auth.ClaimGrants,
layer int32,
) error {
subscribe, err := readMoQLiteSubscribe(reader)
if err != nil {
return err
}
resolved, status, err := s.resolveMoQLiteTrack(ctx, roomName, subscribe, claims)
if err != nil {
_ = writeMoQLiteSubscribeDrop(stream, s.config.WriteTimeout)
return fmt.Errorf("could not resolve moq-lite subscription: status=%d, error=%w", status, err)
}
if err := writeMoQLiteSubscribeOK(stream, s.config.WriteTimeout, subscribe); err != nil {
return err
}
go discardMoQLiteSubscribeUpdates(ctx, reader)
trackSub, cached, unsubscribe := resolved.tap.Subscribe(layer)
defer unsubscribe()
if cached != nil {
if err := s.writeMoQLiteSample(ctx, sess, subscribe.ID, cached, resolved.Timing); err != nil {
return err
}
}
for {
select {
case <-ctx.Done():
return nil
case <-trackSub.done:
return nil
case sample := <-trackSub.queue:
if sample == nil {
continue
}
if err := s.writeMoQLiteSample(ctx, sess, subscribe.ID, sample, resolved.Timing); err != nil {
return err
}
}
}
}
func (s *MoQService) resolveMoQLiteTrack(
ctx context.Context,
defaultRoomName livekit.RoomName,
subscribe moqLiteSubscribe,
claims *auth.ClaimGrants,
) (moqLiteResolvedTrack, int, error) {
var resolved moqLiteResolvedTrack
roomName := defaultRoomName
trackName, timingTrack := splitMoQLiteTrackName(subscribe.Track)
if subscribe.Broadcast != "" {
broadcastRoom := livekit.RoomName(subscribe.Broadcast)
if defaultRoomName != "" && broadcastRoom != defaultRoomName {
return resolved, http.StatusForbidden, fmt.Errorf("broadcast %q does not match token room %q", subscribe.Broadcast, defaultRoomName)
}
roomName = broadcastRoom
}
if roomName == "" {
return resolved, http.StatusBadRequest, ErrNoRoomName
}
room := s.roomManager.GetRoom(ctx, roomName)
if room == nil {
return resolved, http.StatusNotFound, ErrRoomNotFound
}
var candidates []moqLiteResolvedTrack
for _, participant := range room.GetParticipants() {
for _, track := range participant.GetPublishedTracks() {
if !isMoQSupportedTrack(track) {
continue
}
if !participant.HasPermission(track.ID(), livekit.ParticipantIdentity(claims.Identity)) {
continue
}
tap := s.tracks.AttachTrack(room, participant, track)
if tap == nil {
continue
}
if trackName == "" || string(track.ID()) == trackName || track.Name() == trackName {
candidates = append(candidates, moqLiteResolvedTrack{
moqResolvedTrack: moqResolvedTrack{room: room, participant: participant, track: track, tap: tap},
Timing: timingTrack,
})
}
}
}
if subscribe.Track != "" && len(candidates) == 0 {
return resolved, http.StatusNotFound, fmt.Errorf("track not found or not available for moq-lite: %s", subscribe.Track)
}
if subscribe.Track == "" && len(candidates) != 1 {
return resolved, http.StatusBadRequest, fmt.Errorf("track name is required when %d moq-compatible tracks are available", len(candidates))
}
if len(candidates) > 1 {
return resolved, http.StatusBadRequest, fmt.Errorf("track name %q matched %d moq-compatible tracks", subscribe.Track, len(candidates))
}
return candidates[0], http.StatusOK, nil
}
func splitMoQLiteTrackName(track string) (string, bool) {
for _, suffix := range []string{".timing", ".metadata"} {
if strings.HasSuffix(track, suffix) {
return strings.TrimSuffix(track, suffix), true
}
}
return track, false
}
func (s *MoQService) writeMoQLiteSample(
ctx context.Context,
sess *webtransport.Session,
subscribeID uint64,
sample *moqSample,
timing bool,
) error {
stream, err := sess.OpenUniStreamSync(ctx)
if err != nil {
return err
}
defer func() {
_ = stream.Close()
}()
payload := sample.Payload
if timing {
var err error
payload, err = json.Marshal(moqLiteTimingSample{
Sequence: sample.Sequence,
TrackID: string(sample.TrackID),
TrackName: sample.TrackName,
Layer: sample.Layer,
RTPTime: sample.RTPTime,
KeyFrame: sample.KeyFrame,
Cached: sample.Cached,
SentAtUnixNs: sample.SentAtUnixNs,
UserTimestampUs: sample.UserTimestampUs,
FrameID: sample.FrameID,
})
if err != nil {
return err
}
}
return writeMoQLiteGroup(stream, s.config.WriteTimeout, subscribeID, sample.Sequence, payload)
}
func readMoQLiteSubscribe(r quicvarint.Reader) (moqLiteSubscribe, error) {
var subscribe moqLiteSubscribe
data, err := readMoQLiteMessage(r)
if err != nil {
return subscribe, err
}
br := bytes.NewReader(data)
reader := quicvarint.NewReader(br)
subscribe.ID, err = quicvarint.Read(reader)
if err != nil {
return subscribe, err
}
subscribe.Broadcast, err = readMoQLiteString(reader)
if err != nil {
return subscribe, err
}
subscribe.Track, err = readMoQLiteString(reader)
if err != nil {
return subscribe, err
}
subscribe.Priority, err = readMoQLiteU8(reader)
if err != nil {
return subscribe, err
}
subscribe.Ordered, err = readMoQLiteBool(reader)
if err != nil {
return subscribe, err
}
subscribe.MaxLatency, err = quicvarint.Read(reader)
if err != nil {
return subscribe, err
}
startGroup, err := quicvarint.Read(reader)
if err != nil {
return subscribe, err
}
if startGroup > 0 {
subscribe.StartGroup = startGroup - 1
subscribe.HasStart = true
}
endGroup, err := quicvarint.Read(reader)
if err != nil {
return subscribe, err
}
if endGroup > 0 {
subscribe.EndGroup = endGroup - 1
subscribe.HasEnd = true
}
if br.Len() != 0 {
return subscribe, fmt.Errorf("moq-lite subscribe had %d trailing bytes", br.Len())
}
return subscribe, nil
}
func readMoQLiteMessage(r quicvarint.Reader) ([]byte, error) {
size, err := quicvarint.Read(r)
if err != nil {
return nil, err
}
if size > moqLiteMaxMessageSize {
return nil, fmt.Errorf("moq-lite message too large: %d", size)
}
data := make([]byte, int(size))
if _, err := io.ReadFull(r, data); err != nil {
return nil, err
}
return data, nil
}
func readMoQLiteString(r quicvarint.Reader) (string, error) {
data, err := readMoQLiteMessage(r)
if err != nil {
return "", err
}
return string(data), nil
}
func readMoQLiteU8(r quicvarint.Reader) (uint8, error) {
b, err := r.ReadByte()
return b, err
}
func readMoQLiteBool(r quicvarint.Reader) (bool, error) {
value, err := readMoQLiteU8(r)
if err != nil {
return false, err
}
switch value {
case 0:
return false, nil
case 1:
return true, nil
default:
return false, fmt.Errorf("invalid moq-lite bool: %d", value)
}
}
func discardMoQLiteSubscribeUpdates(ctx context.Context, r quicvarint.Reader) {
for {
if ctx.Err() != nil {
return
}
size, err := quicvarint.Read(r)
if err != nil {
return
}
if size > moqLiteMaxMessageSize {
return
}
if _, err = io.CopyN(io.Discard, r, int64(size)); err != nil {
return
}
}
}
func writeMoQLiteSubscribeOK(w interface {
SetWriteDeadline(time.Time) error
Write([]byte) (int, error)
}, timeout time.Duration, subscribe moqLiteSubscribe) error {
var payload []byte
payload = append(payload, subscribe.Priority)
payload = append(payload, boolByte(subscribe.Ordered))
payload = quicvarint.Append(payload, subscribe.MaxLatency)
payload = appendMoQLiteOptionalGroup(payload, subscribe.HasStart, subscribe.StartGroup)
payload = appendMoQLiteOptionalGroup(payload, subscribe.HasEnd, subscribe.EndGroup)
var response []byte
response = quicvarint.Append(response, moqLiteSubscribeOK)
response = appendMoQLiteMessage(response, payload)
return writeMoQLiteBytes(w, timeout, response)
}
func writeMoQLiteSubscribeDrop(w interface {
SetWriteDeadline(time.Time) error
Write([]byte) (int, error)
}, timeout time.Duration) error {
var payload []byte
payload = quicvarint.Append(payload, 0)
payload = quicvarint.Append(payload, 0)
payload = quicvarint.Append(payload, 1)
var response []byte
response = quicvarint.Append(response, moqLiteSubscribeDrop)
response = appendMoQLiteMessage(response, payload)
return writeMoQLiteBytes(w, timeout, response)
}
func writeMoQLiteGroup(w interface {
SetWriteDeadline(time.Time) error
Write([]byte) (int, error)
}, timeout time.Duration, subscribeID uint64, sequence uint64, payload []byte) error {
var group []byte
group = quicvarint.Append(group, subscribeID)
group = quicvarint.Append(group, sequence)
data := make([]byte, 0, 1+len(group)+len(payload)+16)
data = append(data, moqLiteStreamGroup)
data = appendMoQLiteMessage(data, group)
data = quicvarint.Append(data, uint64(len(payload)))
data = append(data, payload...)
return writeMoQLiteBytes(w, timeout, data)
}
func appendMoQLiteOptionalGroup(dst []byte, ok bool, group uint64) []byte {
if !ok {
return quicvarint.Append(dst, 0)
}
return quicvarint.Append(dst, group+1)
}
func appendMoQLiteMessage(dst []byte, payload []byte) []byte {
dst = quicvarint.Append(dst, uint64(len(payload)))
return append(dst, payload...)
}
func writeMoQLiteBytes(w interface {
SetWriteDeadline(time.Time) error
Write([]byte) (int, error)
}, timeout time.Duration, data []byte) error {
if err := w.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return err
}
_, err := w.Write(data)
if errors.Is(err, io.EOF) {
return nil
}
return err
}
func boolByte(v bool) byte {
if v {
return 1
}
return 0
}
+83
View File
@@ -0,0 +1,83 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"bytes"
"testing"
"time"
"github.com/quic-go/quic-go/quicvarint"
"github.com/stretchr/testify/require"
)
func TestMoQLiteReadSubscribe(t *testing.T) {
var payload []byte
payload = quicvarint.Append(payload, 7)
payload = appendMoQLiteMessage(payload, []byte("moq-local-video"))
payload = appendMoQLiteMessage(payload, []byte("camera"))
payload = append(payload, 3)
payload = append(payload, 1)
payload = quicvarint.Append(payload, 250)
payload = quicvarint.Append(payload, 10)
payload = quicvarint.Append(payload, 20)
data := appendMoQLiteMessage(nil, payload)
subscribe, err := readMoQLiteSubscribe(quicvarint.NewReader(bytes.NewReader(data)))
require.NoError(t, err)
require.Equal(t, uint64(7), subscribe.ID)
require.Equal(t, "moq-local-video", subscribe.Broadcast)
require.Equal(t, "camera", subscribe.Track)
require.Equal(t, uint8(3), subscribe.Priority)
require.True(t, subscribe.Ordered)
require.Equal(t, uint64(250), subscribe.MaxLatency)
require.True(t, subscribe.HasStart)
require.Equal(t, uint64(9), subscribe.StartGroup)
require.True(t, subscribe.HasEnd)
require.Equal(t, uint64(19), subscribe.EndGroup)
}
func TestMoQLiteWriteGroup(t *testing.T) {
w := &moqLiteTestWriter{}
err := writeMoQLiteGroup(w, time.Second, 7, 42, []byte{0x00, 0x00, 0x01, 0x65})
require.NoError(t, err)
reader := quicvarint.NewReader(bytes.NewReader(w.Bytes()))
streamType, err := reader.ReadByte()
require.NoError(t, err)
require.Equal(t, moqLiteStreamGroup, streamType)
groupData, err := readMoQLiteMessage(reader)
require.NoError(t, err)
groupReader := quicvarint.NewReader(bytes.NewReader(groupData))
subscribeID, err := quicvarint.Read(groupReader)
require.NoError(t, err)
require.Equal(t, uint64(7), subscribeID)
sequence, err := quicvarint.Read(groupReader)
require.NoError(t, err)
require.Equal(t, uint64(42), sequence)
frame, err := readMoQLiteMessage(reader)
require.NoError(t, err)
require.Equal(t, []byte{0x00, 0x00, 0x01, 0x65}, frame)
}
type moqLiteTestWriter struct {
bytes.Buffer
}
func (w *moqLiteTestWriter) SetWriteDeadline(time.Time) error {
return nil
}
+485
View File
@@ -0,0 +1,485 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math/big"
"net"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
webtransport "github.com/quic-go/webtransport-go"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
type MoQService struct {
config config.MoQConfig
development bool
keyProvider auth.KeyProvider
roomManager *RoomManager
tracks *moqTrackRegistry
logger logger.Logger
lock sync.Mutex
servers []*webtransport.Server
wg sync.WaitGroup
stopped atomic.Bool
}
type moqResolvedTrack struct {
room *rtc.Room
participant types.Participant
track types.MediaTrack
tap *moqTrackTap
}
func NewMoQService(conf *config.Config, keyProvider auth.KeyProvider, roomManager *RoomManager) (*MoQService, error) {
if keyProvider == nil {
return nil, errors.New("moq requires an API key provider")
}
if conf.MoQ.Port == 0 {
return nil, errors.New("moq.port must be configured")
}
lgr := logger.GetLogger().WithComponent("moq")
s := &MoQService{
config: conf.MoQ,
development: conf.Development,
keyProvider: keyProvider,
roomManager: roomManager,
logger: lgr,
}
s.tracks = newMoQTrackRegistry(moqTrackRegistryParams{
Config: s.config,
Logger: lgr,
})
roomManager.AddOnRoomCreated(s.tracks.AttachRoom)
return s, nil
}
func (s *MoQService) Start() error {
tlsConf, err := s.tlsConfig()
if err != nil {
return err
}
type listenTarget struct {
server *webtransport.Server
conn *net.UDPConn
addr string
}
targets := make([]listenTarget, 0, len(s.config.BindAddresses))
for _, addr := range s.config.BindAddresses {
bindAddr := net.JoinHostPort(addr, strconv.Itoa(int(s.config.Port)))
udpAddr, err := net.ResolveUDPAddr("udp", bindAddr)
if err != nil {
for _, target := range targets {
_ = target.conn.Close()
}
return err
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
for _, target := range targets {
_ = target.conn.Close()
}
return err
}
h3Server := &http3.Server{
Addr: bindAddr,
TLSConfig: http3.ConfigureTLSConfig(tlsConf.Clone()),
QUICConfig: &quic.Config{
EnableDatagrams: true,
EnableStreamResetPartialDelivery: true,
},
}
webtransport.ConfigureHTTP3Server(h3Server)
wtServer := &webtransport.Server{
H3: h3Server,
ApplicationProtocols: []string{moqLiteProtocol, moqWireProtocol},
CheckOrigin: func(*http.Request) bool { return true },
}
mux := http.NewServeMux()
mux.HandleFunc(s.config.Path, func(w http.ResponseWriter, r *http.Request) {
s.handleWebTransport(w, r, wtServer)
})
h3Server.Handler = mux
targets = append(targets, listenTarget{
server: wtServer,
conn: conn,
addr: bindAddr,
})
}
s.lock.Lock()
defer s.lock.Unlock()
for _, target := range targets {
s.servers = append(s.servers, target.server)
s.wg.Add(1)
go func(target listenTarget) {
defer s.wg.Done()
defer target.conn.Close()
s.logger.Infow("starting moq webtransport listener", "addr", target.addr, "path", s.config.Path)
if err := target.server.Serve(target.conn); err != nil && !s.stopped.Load() {
s.logger.Errorw("moq webtransport listener stopped", err, "addr", target.addr)
}
}(target)
}
return nil
}
func (s *MoQService) Stop(ctx context.Context) error {
s.stopped.Store(true)
s.lock.Lock()
servers := append([]*webtransport.Server(nil), s.servers...)
s.lock.Unlock()
var closeErr error
for _, server := range servers {
if err := server.Close(); err != nil && closeErr == nil {
closeErr = err
}
}
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return closeErr
}
}
func (s *MoQService) handleWebTransport(w http.ResponseWriter, r *http.Request, wtServer *webtransport.Server) {
claims, err := s.authenticate(r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
if claims.Video == nil || !claims.Video.RoomJoin || !claims.Video.GetCanSubscribe() {
http.Error(w, ErrPermissionDenied.Error(), http.StatusForbidden)
return
}
roomName := livekit.RoomName(claims.Video.Room)
if roomName == "" {
roomName = livekit.RoomName(r.URL.Query().Get("room"))
}
if roomName == "" {
http.Error(w, ErrNoRoomName.Error(), http.StatusBadRequest)
return
}
layer := int32(0)
if layerValue := r.URL.Query().Get("layer"); layerValue != "" {
parsed, err := strconv.ParseInt(layerValue, 10, 32)
if err != nil || parsed < 0 {
http.Error(w, "invalid layer", http.StatusBadRequest)
return
}
layer = int32(parsed)
}
sess, err := wtServer.Upgrade(w, r)
if err != nil {
s.logger.Warnw("could not upgrade moq webtransport session", err)
return
}
defer func() {
_ = sess.CloseWithError(0, "")
}()
switch protocol := sess.SessionState().ApplicationProtocol; protocol {
case moqLiteProtocol:
s.serveMoQLiteSession(sess, roomName, claims, layer)
case "", moqWireProtocol:
resolved, catalog, _, err := s.resolveTrack(sess.Context(), roomName, livekit.TrackID(r.URL.Query().Get("track_id")), claims)
if err != nil {
_ = sess.CloseWithError(1, err.Error())
return
}
s.serveSubscription(sess, resolved, catalog, layer)
default:
s.logger.Warnw("unsupported moq webtransport protocol", nil, "protocol", protocol)
_ = sess.CloseWithError(1, "unsupported moq protocol")
}
}
func (s *MoQService) authenticate(r *http.Request) (*auth.ClaimGrants, error) {
authHeader := r.Header.Get(authorizationHeader)
var authToken string
if authHeader != "" {
if !strings.HasPrefix(authHeader, bearerPrefix) {
return nil, ErrMissingAuthorization
}
authToken = authHeader[len(bearerPrefix):]
} else {
authToken = r.FormValue(accessTokenParam)
}
if authToken == "" {
return nil, ErrMissingAuthorization
}
verifier, err := auth.ParseAPIToken(authToken)
if err != nil {
return nil, ErrInvalidAuthorizationToken
}
secret := s.keyProvider.GetSecret(verifier.APIKey())
if secret == "" {
return nil, ErrInvalidAPIKey
}
_, claims, err := verifier.Verify(secret)
if err != nil {
return nil, ErrInvalidAuthorizationToken
}
return claims, nil
}
func (s *MoQService) resolveTrack(
ctx context.Context,
roomName livekit.RoomName,
trackID livekit.TrackID,
claims *auth.ClaimGrants,
) (moqResolvedTrack, []moqWireCatalogTrack, int, error) {
var resolved moqResolvedTrack
room := s.roomManager.GetRoom(ctx, roomName)
if room == nil {
return resolved, nil, http.StatusNotFound, ErrRoomNotFound
}
var candidates []moqResolvedTrack
var catalog []moqWireCatalogTrack
for _, participant := range room.GetParticipants() {
for _, track := range participant.GetPublishedTracks() {
if !isMoQSupportedTrack(track) {
continue
}
if !participant.HasPermission(track.ID(), livekit.ParticipantIdentity(claims.Identity)) {
continue
}
tap := s.tracks.AttachTrack(room, participant, track)
if tap == nil {
continue
}
catalog = append(catalog, tap.CatalogTracks()...)
candidate := moqResolvedTrack{room: room, participant: participant, track: track, tap: tap}
if trackID == "" || track.ID() == trackID {
candidates = append(candidates, candidate)
}
}
}
if trackID != "" && len(candidates) == 0 {
return resolved, catalog, http.StatusNotFound, fmt.Errorf("track not found or not available for moq: %s", trackID)
}
if trackID == "" && len(candidates) != 1 {
return resolved, catalog, http.StatusBadRequest, fmt.Errorf("track_id is required when %d moq-compatible tracks are available", len(candidates))
}
return candidates[0], catalog, http.StatusOK, nil
}
func (s *MoQService) serveSubscription(
sess *webtransport.Session,
resolved moqResolvedTrack,
catalog []moqWireCatalogTrack,
layer int32,
) {
ctx := sess.Context()
stream, err := sess.OpenUniStreamSync(ctx)
if err != nil {
s.logger.Warnw("could not open moq media stream", err, "trackID", resolved.track.ID())
return
}
defer func() {
_ = stream.Close()
}()
sub, cached, unsubscribe := resolved.tap.Subscribe(layer)
defer unsubscribe()
if err := s.writeCatalog(stream, resolved.room.Name(), resolved.track.ID(), catalog); err != nil {
s.logger.Debugw("could not write moq catalog", "error", err)
return
}
if cached != nil {
if err := s.writeSample(stream, cached); err != nil {
s.logger.Debugw("could not write cached moq keyframe", "error", err)
return
}
}
for {
select {
case <-ctx.Done():
return
case <-sub.done:
return
case sample := <-sub.queue:
if err := s.writeSample(stream, sample); err != nil {
s.logger.Debugw("could not write moq sample", "error", err)
return
}
}
}
}
func (s *MoQService) writeCatalog(w interface {
SetWriteDeadline(time.Time) error
Write([]byte) (int, error)
}, roomName livekit.RoomName, trackID livekit.TrackID, tracks []moqWireCatalogTrack) error {
if err := w.SetWriteDeadline(time.Now().Add(s.config.WriteTimeout)); err != nil {
return err
}
return writeMoQWireMessage(w, moqWireMessage{
Type: "catalog",
Protocol: moqWireProtocol,
Room: string(roomName),
TrackID: string(trackID),
Tracks: tracks,
}, nil)
}
func (s *MoQService) writeSample(w interface {
SetWriteDeadline(time.Time) error
Write([]byte) (int, error)
}, sample *moqSample) error {
if err := w.SetWriteDeadline(time.Now().Add(s.config.WriteTimeout)); err != nil {
return err
}
return writeMoQWireMessage(w, moqWireMessage{
Type: "sample",
Protocol: moqWireProtocol,
TrackID: string(sample.TrackID),
TrackName: sample.TrackName,
PublisherID: string(sample.PublisherID),
Publisher: string(sample.Publisher),
MimeType: sample.MimeType,
PayloadFormat: "annexb",
Width: sample.Width,
Height: sample.Height,
Layer: sample.Layer,
Sequence: sample.Sequence,
RTPTime: sample.RTPTime,
KeyFrame: sample.KeyFrame,
Cached: sample.Cached,
SentAtUnixNs: sample.SentAtUnixNs,
UserTimestampUs: sample.UserTimestampUs,
FrameID: sample.FrameID,
}, sample.Payload)
}
func (s *MoQService) tlsConfig() (*tls.Config, error) {
if s.config.CertFile != "" || s.config.KeyFile != "" {
if s.config.CertFile == "" || s.config.KeyFile == "" {
return nil, errors.New("moq cert_file and key_file must be configured together")
}
cert, err := tls.LoadX509KeyPair(s.config.CertFile, s.config.KeyFile)
if err != nil {
return nil, err
}
return &tls.Config{
MinVersion: tls.VersionTLS13,
Certificates: []tls.Certificate{cert},
}, nil
}
if !s.development {
return nil, errors.New("moq cert_file and key_file must be configured outside development mode")
}
cert, err := generateMoQSelfSignedCert()
if err != nil {
return nil, err
}
s.logger.Warnw("using ephemeral self-signed certificate for moq; configure moq.cert_file and moq.key_file for production", nil)
return &tls.Config{
MinVersion: tls.VersionTLS13,
Certificates: []tls.Certificate{cert},
}, nil
}
func generateMoQSelfSignedCert() (tls.Certificate, error) {
priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return tls.Certificate{}, err
}
serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
if err != nil {
return tls.Certificate{}, err
}
template := x509.Certificate{
SerialNumber: serialNumber,
Subject: pkix.Name{
Organization: []string{"LiveKit Development"},
},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
DNSNames: []string{"localhost"},
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback},
}
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv)
if err != nil {
return tls.Certificate{}, err
}
keyBytes, err := x509.MarshalECPrivateKey(priv)
if err != nil {
return tls.Certificate{}, err
}
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyBytes})
return tls.X509KeyPair(certPEM, keyPEM)
}
+492
View File
@@ -0,0 +1,492 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/livekit/protocol/codecs/mime"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/pion/webrtc/v4"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/packettrailer"
)
type moqTrackKey struct {
room livekit.RoomName
trackID livekit.TrackID
}
type moqTrackRegistry struct {
params moqTrackRegistryParams
lock sync.RWMutex
tracks map[moqTrackKey]*moqTrackTap
}
type moqTrackRegistryParams struct {
Config config.MoQConfig
Logger logger.Logger
}
func newMoQTrackRegistry(params moqTrackRegistryParams) *moqTrackRegistry {
return &moqTrackRegistry{
params: params,
tracks: make(map[moqTrackKey]*moqTrackTap),
}
}
func (r *moqTrackRegistry) AttachRoom(room *rtc.Room) {
for _, participant := range room.GetParticipants() {
for _, track := range participant.GetPublishedTracks() {
r.AttachTrack(room, participant, track)
}
}
room.AddOnTrackPublished(func(participant types.Participant, track types.MediaTrack) {
r.AttachTrack(room, participant, track)
})
}
func (r *moqTrackRegistry) AttachTrack(room *rtc.Room, participant types.Participant, track types.MediaTrack) *moqTrackTap {
if !isMoQSupportedTrack(track) {
return nil
}
key := moqTrackKey{room: room.Name(), trackID: track.ID()}
r.lock.Lock()
tap := r.tracks[key]
if tap == nil {
tap = newMoQTrackTap(moqTrackTapParams{
Config: r.params.Config,
Logger: r.params.Logger,
Room: room,
Participant: participant,
Track: track,
})
r.tracks[key] = tap
track.AddOnClose(func(_ bool) {
r.lock.Lock()
if r.tracks[key] == tap {
delete(r.tracks, key)
}
r.lock.Unlock()
tap.Close()
})
if observer, ok := track.(interface{ OnSetupReceiver(func(mime.MimeType)) }); ok {
observer.OnSetupReceiver(func(mime.MimeType) {
tap.AttachReceivers()
})
}
}
r.lock.Unlock()
tap.AttachReceivers()
return tap
}
func (r *moqTrackRegistry) Get(roomName livekit.RoomName, trackID livekit.TrackID) *moqTrackTap {
r.lock.RLock()
defer r.lock.RUnlock()
return r.tracks[moqTrackKey{room: roomName, trackID: trackID}]
}
func isMoQSupportedTrack(track types.MediaTrack) bool {
if track.Kind() != livekit.TrackType_VIDEO || track.IsEncrypted() {
return false
}
ti := track.ToProto()
return ti.GetMimeType() == "" || mime.IsMimeTypeStringEqual(ti.GetMimeType(), mime.MimeTypeH264.String())
}
type moqTrackTapParams struct {
Config config.MoQConfig
Logger logger.Logger
Room *rtc.Room
Participant types.Participant
Track types.MediaTrack
}
type moqTrackTap struct {
params moqTrackTapParams
id string
subscriberID livekit.ParticipantID
qualityNodeID livekit.NodeID
lock sync.RWMutex
receivers map[sfu.TrackReceiver]struct{}
layers map[int32]*moqLayerState
subscribers map[string]*moqTrackSubscriber
closed bool
sequence atomic.Uint64
}
type moqLayerState struct {
lock sync.Mutex
assembler *h264AccessUnitAssembler
cache *moqSample
}
type moqSample struct {
TrackID livekit.TrackID
TrackName string
PublisherID livekit.ParticipantID
Publisher livekit.ParticipantIdentity
MimeType string
Width uint32
Height uint32
Layer int32
Sequence uint64
RTPTime uint32
KeyFrame bool
Cached bool
SentAtUnixNs int64
UserTimestampUs *int64
FrameID *uint32
Payload []byte
}
func (s *moqSample) Clone(cached bool) *moqSample {
if s == nil {
return nil
}
clone := *s
clone.Cached = cached
if s.UserTimestampUs != nil {
userTimestampUs := *s.UserTimestampUs
clone.UserTimestampUs = &userTimestampUs
}
if s.FrameID != nil {
frameID := *s.FrameID
clone.FrameID = &frameID
}
clone.Payload = cloneBytes(s.Payload)
return &clone
}
type moqTrackSubscriber struct {
id string
layer int32
queue chan *moqSample
done chan struct{}
}
func newMoQTrackTap(params moqTrackTapParams) *moqTrackTap {
trackID := params.Track.ID()
return &moqTrackTap{
params: params,
id: fmt.Sprintf("moq:%s:%s", params.Room.Name(), trackID),
subscriberID: livekit.ParticipantID(fmt.Sprintf("MOQ_%s", trackID)),
qualityNodeID: livekit.NodeID(fmt.Sprintf("moq:%s:%s", params.Room.Name(), trackID)),
receivers: make(map[sfu.TrackReceiver]struct{}),
layers: make(map[int32]*moqLayerState),
subscribers: make(map[string]*moqTrackSubscriber),
}
}
func (t *moqTrackTap) AttachReceivers() {
t.lock.Lock()
if t.closed || len(t.subscribers) == 0 {
t.lock.Unlock()
return
}
receivers := t.params.Track.Receivers()
for _, receiver := range receivers {
if receiver == nil || receiver.IsClosed() || !mime.IsMimeTypeStringEqual(receiver.Mime().String(), mime.MimeTypeH264.String()) {
continue
}
if _, ok := t.receivers[receiver]; ok {
continue
}
if err := receiver.AddDownTrack(t); err != nil {
t.params.Logger.Warnw("could not attach moq track tap", err, "trackID", t.params.Track.ID())
continue
}
t.receivers[receiver] = struct{}{}
receiver.SendPLI(0, true)
}
t.lock.Unlock()
}
func (t *moqTrackTap) Subscribe(layer int32) (*moqTrackSubscriber, *moqSample, func()) {
sub := &moqTrackSubscriber{
id: fmt.Sprintf("%s:%d", t.id, time.Now().UnixNano()),
layer: layer,
queue: make(chan *moqSample, t.params.Config.TrackQueueSize),
done: make(chan struct{}),
}
var cached *moqSample
var shouldEnableQuality bool
t.lock.Lock()
if t.closed {
t.lock.Unlock()
close(sub.done)
return sub, nil, func() {}
}
shouldEnableQuality = len(t.subscribers) == 0
t.subscribers[sub.id] = sub
if layerState := t.layers[layer]; layerState != nil {
layerState.lock.Lock()
cached = layerState.cache.Clone(true)
layerState.lock.Unlock()
}
t.lock.Unlock()
if shouldEnableQuality {
t.setSubscriberNodeQuality(livekit.VideoQuality_HIGH)
}
t.AttachReceivers()
t.lock.RLock()
for receiver := range t.receivers {
receiver.SendPLI(layer, false)
}
t.lock.RUnlock()
unsubscribe := func() {
var shouldDisableQuality bool
t.lock.Lock()
if t.subscribers[sub.id] == sub {
delete(t.subscribers, sub.id)
close(sub.done)
shouldDisableQuality = len(t.subscribers) == 0
}
t.lock.Unlock()
if shouldDisableQuality {
t.detachReceivers()
t.setSubscriberNodeQuality(livekit.VideoQuality_OFF)
}
}
return sub, cached, unsubscribe
}
func (t *moqTrackTap) setSubscriberNodeQuality(quality livekit.VideoQuality) {
localTrack, ok := t.params.Track.(types.LocalMediaTrack)
if !ok {
return
}
localTrack.NotifySubscriberNodeMaxQuality(t.qualityNodeID, []types.SubscribedCodecQuality{
{
CodecMime: mime.MimeTypeH264,
Quality: quality,
},
})
}
func (t *moqTrackTap) detachReceivers() {
t.lock.Lock()
for receiver := range t.receivers {
receiver.DeleteDownTrack(t.subscriberID)
}
t.receivers = make(map[sfu.TrackReceiver]struct{})
t.lock.Unlock()
}
func (t *moqTrackTap) CatalogTracks() []moqWireCatalogTrack {
ti := t.params.Track.ToProto()
return []moqWireCatalogTrack{
{
TrackID: string(t.params.Track.ID()),
TrackName: t.params.Track.Name(),
PublisherID: string(t.params.Participant.ID()),
Publisher: string(t.params.Participant.Identity()),
MimeType: mime.MimeTypeH264.String(),
Width: ti.GetWidth(),
Height: ti.GetHeight(),
},
}
}
func (t *moqTrackTap) Close() {
t.lock.Lock()
if t.closed {
t.lock.Unlock()
return
}
t.closed = true
for receiver := range t.receivers {
receiver.DeleteDownTrack(t.subscriberID)
}
for _, sub := range t.subscribers {
close(sub.done)
}
t.subscribers = make(map[string]*moqTrackSubscriber)
t.lock.Unlock()
t.setSubscriberNodeQuality(livekit.VideoQuality_OFF)
}
func (t *moqTrackTap) UpTrackLayersChange() {}
func (t *moqTrackTap) UpTrackBitrateAvailabilityChange() {}
func (t *moqTrackTap) UpTrackMaxPublishedLayerChange(int32) {}
func (t *moqTrackTap) UpTrackMaxTemporalLayerSeenChange(int32) {}
func (t *moqTrackTap) UpTrackBitrateReport([]int32, sfu.Bitrates) {}
func (t *moqTrackTap) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 {
if extPkt == nil || extPkt.Packet == nil {
return 0
}
payload := cloneBytes(extPkt.Packet.Payload)
trailerMetadata, strip := packettrailer.ParseTrailer(payload, extPkt.Packet.Marker)
if strip > 0 {
payload = payload[:len(payload)-strip]
}
layerState := t.getLayerState(layer)
layerState.lock.Lock()
au, err := layerState.assembler.Push(payload, extPkt.Packet.Timestamp, extPkt.Packet.Marker)
layerState.lock.Unlock()
if err != nil || au == nil {
return 0
}
ti := t.params.Track.ToProto()
sample := &moqSample{
TrackID: t.params.Track.ID(),
TrackName: t.params.Track.Name(),
PublisherID: t.params.Participant.ID(),
Publisher: t.params.Participant.Identity(),
MimeType: mime.MimeTypeH264.String(),
Width: ti.GetWidth(),
Height: ti.GetHeight(),
Layer: layer,
Sequence: t.sequence.Add(1),
RTPTime: au.RTPTime,
KeyFrame: extPkt.IsKeyFrame || au.HasIDR,
SentAtUnixNs: time.Now().UnixNano(),
Payload: au.Payload,
}
if trailerMetadata.HasUserTimestampUs {
sample.UserTimestampUs = &trailerMetadata.UserTimestampUs
}
if trailerMetadata.HasFrameID {
sample.FrameID = &trailerMetadata.FrameID
}
if sample.KeyFrame && len(sample.Payload) <= t.params.Config.CacheMaxBytes {
layerState.lock.Lock()
layerState.cache = sample.Clone(false)
layerState.lock.Unlock()
}
return t.publish(sample)
}
func (t *moqTrackTap) getLayerState(layer int32) *moqLayerState {
t.lock.Lock()
defer t.lock.Unlock()
layerState := t.layers[layer]
if layerState == nil {
layerState = &moqLayerState{assembler: newH264AccessUnitAssembler()}
t.layers[layer] = layerState
}
return layerState
}
func (t *moqTrackTap) publish(sample *moqSample) int32 {
t.lock.RLock()
if t.closed {
t.lock.RUnlock()
return 0
}
subscribers := make([]*moqTrackSubscriber, 0, len(t.subscribers))
for _, sub := range t.subscribers {
if sub.layer == sample.Layer {
subscribers = append(subscribers, sub)
}
}
t.lock.RUnlock()
var written int32
for _, sub := range subscribers {
if enqueueMoQSample(sub, sample.Clone(false)) {
written++
}
}
return written
}
func enqueueMoQSample(sub *moqTrackSubscriber, sample *moqSample) bool {
select {
case <-sub.done:
return false
case sub.queue <- sample:
return true
default:
}
if !sample.KeyFrame {
return false
}
select {
case <-sub.done:
return false
case <-sub.queue:
default:
}
select {
case <-sub.done:
return false
case sub.queue <- sample:
return true
default:
return false
}
}
func (t *moqTrackTap) ID() string {
return t.id
}
func (t *moqTrackTap) SubscriberID() livekit.ParticipantID {
return t.subscriberID
}
func (t *moqTrackTap) HandleRTCPSenderReportData(webrtc.PayloadType, int32, *livekit.RTCPSenderReportState) error {
return nil
}
func (t *moqTrackTap) Resync() {}
func (t *moqTrackTap) SetReceiver(sfu.TrackReceiver) {}
func (t *moqTrackTap) ReceiverRestart(sfu.TrackReceiver) {
t.AttachReceivers()
}
func (t *moqTrackTap) IsClosed() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return t.closed
}
+93
View File
@@ -0,0 +1,93 @@
// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math"
)
const (
moqWireProtocol = "livekit-moq-h264/1"
moqWireVersion = 1
)
var moqWireMagic = [4]byte{'L', 'K', 'M', 'Q'}
type moqWireMessage struct {
Type string `json:"type"`
Protocol string `json:"protocol,omitempty"`
Room string `json:"room,omitempty"`
TrackID string `json:"trackId,omitempty"`
TrackName string `json:"trackName,omitempty"`
PublisherID string `json:"publisherId,omitempty"`
Publisher string `json:"publisher,omitempty"`
MimeType string `json:"mimeType,omitempty"`
PayloadFormat string `json:"payloadFormat,omitempty"`
Width uint32 `json:"width,omitempty"`
Height uint32 `json:"height,omitempty"`
Layer int32 `json:"layer,omitempty"`
Sequence uint64 `json:"sequence,omitempty"`
RTPTime uint32 `json:"rtpTime,omitempty"`
KeyFrame bool `json:"keyFrame,omitempty"`
Cached bool `json:"cached,omitempty"`
SentAtUnixNs int64 `json:"sentAtUnixNs,omitempty"`
UserTimestampUs *int64 `json:"userTimestampUs,omitempty"`
FrameID *uint32 `json:"frameId,omitempty"`
Tracks []moqWireCatalogTrack `json:"tracks,omitempty"`
Error string `json:"error,omitempty"`
}
type moqWireCatalogTrack struct {
TrackID string `json:"trackId"`
TrackName string `json:"trackName,omitempty"`
PublisherID string `json:"publisherId,omitempty"`
Publisher string `json:"publisher,omitempty"`
MimeType string `json:"mimeType,omitempty"`
Width uint32 `json:"width,omitempty"`
Height uint32 `json:"height,omitempty"`
}
func writeMoQWireMessage(w io.Writer, msg moqWireMessage, payload []byte) error {
meta, err := json.Marshal(msg)
if err != nil {
return err
}
if uint64(len(meta)) > uint64(math.MaxUint32) {
return fmt.Errorf("moq metadata too large: %d", len(meta))
}
if uint64(len(payload)) > uint64(math.MaxUint32) {
return fmt.Errorf("moq payload too large: %d", len(payload))
}
var header [16]byte
copy(header[0:4], moqWireMagic[:])
header[4] = moqWireVersion
binary.BigEndian.PutUint32(header[8:12], uint32(len(meta)))
binary.BigEndian.PutUint32(header[12:16], uint32(len(payload)))
if _, err := w.Write(header[:]); err != nil {
return err
}
if _, err := w.Write(meta); err != nil {
return err
}
if len(payload) != 0 {
_, err = w.Write(payload)
}
return err
}
+31 -1
View File
@@ -91,7 +91,8 @@ type RoomManager struct {
turnAuthHandler *TURNAuthHandler
bus psrpc.MessageBus
rooms map[livekit.RoomName]*rtc.Room
rooms map[livekit.RoomName]*rtc.Room
onRoomCreated []func(*rtc.Room)
roomServers utils.MultitonService[rpc.RoomTopic]
agentDispatchServers utils.MultitonService[rpc.RoomTopic]
@@ -188,6 +189,34 @@ func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc
return r.rooms[roomName]
}
func (r *RoomManager) AddOnRoomCreated(f func(*rtc.Room)) {
if f == nil {
return
}
r.lock.Lock()
r.onRoomCreated = append(r.onRoomCreated, f)
rooms := make([]*rtc.Room, 0, len(r.rooms))
for _, room := range r.rooms {
rooms = append(rooms, room)
}
r.lock.Unlock()
for _, room := range rooms {
f(room)
}
}
func (r *RoomManager) notifyRoomCreated(room *rtc.Room) {
r.lock.RLock()
onRoomCreated := slices.Clone(r.onRoomCreated)
r.lock.RUnlock()
for _, f := range onRoomCreated {
f(room)
}
}
// deleteRoom completely deletes all room information, including active sessions, room store, and routing info
func (r *RoomManager) deleteRoom(ctx context.Context, roomName livekit.RoomName) error {
logger.Infow("deleting room state", "room", roomName)
@@ -703,6 +732,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, createRoom *livekit.C
r.lock.Unlock()
newRoom.Hold()
r.notifyRoomCreated(newRoom)
r.telemetry.RoomStarted(ctx, newRoom.ToProto())
prometheus.RoomStarted()
+16
View File
@@ -50,6 +50,7 @@ type LivekitServer struct {
ioService *IOInfoService
rtcService *RTCService
whipService *WHIPService
moqService *MoQService
agentService *AgentService
httpServer *http.Server
promServer *http.Server
@@ -186,6 +187,12 @@ func NewLivekitServer(conf *config.Config,
}
}
if conf.MoQ.Enabled {
if s.moqService, err = NewMoQService(conf, keyProvider, roomManager); err != nil {
return nil, err
}
}
if err = router.RemoveDeadNodes(); err != nil {
return
}
@@ -305,6 +312,12 @@ func (s *LivekitServer) Start() error {
if err := s.signalServer.Start(); err != nil {
return err
}
if s.moqService != nil {
if err := s.moqService.Start(); err != nil {
s.signalServer.Stop()
return err
}
}
httpGroup := &errgroup.Group{}
for _, ln := range listeners {
@@ -340,6 +353,9 @@ func (s *LivekitServer) Start() error {
if s.turnServer != nil {
_ = s.turnServer.Close()
}
if s.moqService != nil {
_ = s.moqService.Stop(ctx)
}
s.roomManager.Stop()
s.signalServer.Stop()
+64
View File
@@ -20,8 +20,18 @@ const (
xorByte = 0xFF
envelopeSize = 5 // 1B trailer_len + 4B magic
tagUserTimestampUs = 0x01
tagFrameID = 0x02
)
type Metadata struct {
UserTimestampUs int64
HasUserTimestampUs bool
FrameID uint32
HasFrameID bool
}
// StripTrailer returns the number of bytes to strip from the end of an RTP
// payload if it contains an LKTS trailer. The trailer is located by checking
// for the "LKTS" magic suffix and then reading the XORed trailer_len byte
@@ -44,3 +54,57 @@ func StripTrailer(payload []byte, marker bool) int {
return trailerLen
}
func ParseTrailer(payload []byte, marker bool) (Metadata, int) {
var metadata Metadata
trailerLen := StripTrailer(payload, marker)
if trailerLen == 0 {
return metadata, 0
}
tlv := payload[len(payload)-trailerLen : len(payload)-envelopeSize]
for len(tlv) != 0 {
if len(tlv) < 2 {
return metadata, trailerLen
}
tag := tlv[0] ^ xorByte
valueLen := int(tlv[1] ^ xorByte)
tlv = tlv[2:]
if valueLen > len(tlv) {
return metadata, trailerLen
}
value := tlv[:valueLen]
tlv = tlv[valueLen:]
switch tag {
case tagUserTimestampUs:
if valueLen == 8 {
metadata.UserTimestampUs = int64(xorUint64(value))
metadata.HasUserTimestampUs = true
}
case tagFrameID:
if valueLen == 4 {
metadata.FrameID = xorUint32(value)
metadata.HasFrameID = true
}
}
}
return metadata, trailerLen
}
func xorUint64(value []byte) uint64 {
var out uint64
for _, b := range value {
out = (out << 8) | uint64(b^xorByte)
}
return out
}
func xorUint32(value []byte) uint32 {
var out uint32
for _, b := range value {
out = (out << 8) | uint32(b^xorByte)
}
return out
}
+36 -10
View File
@@ -19,11 +19,6 @@ import (
"testing"
)
const (
tagTimestampUs = 0x01
tagFrameID = 0x02
)
// appendTLV appends a single XORed TLV element to dst.
func appendTLV(dst []byte, tag byte, value []byte) []byte {
dst = append(dst, tag^xorByte, byte(len(value))^xorByte)
@@ -46,7 +41,7 @@ func makeTrailer(timestampUs int64, frameID uint32) []byte {
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs))
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:])
var fidBuf [4]byte
binary.BigEndian.PutUint32(fidBuf[:], frameID)
@@ -71,14 +66,14 @@ func makeTimestampOnlyTrailer(timestampUs int64) []byte {
var trailer []byte
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampUs))
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:])
trailerLen := byte(len(trailer) + envelopeSize)
trailer = appendEnvelope(trailer, trailerLen)
return trailer
}
func TestStripTrailer(t *testing.T) {
fullTrailerSize := 21 // (1+1+8) + (1+1+4) + 5
fullTrailerSize := 21 // (1+1+8) + (1+1+4) + 5
tsOnlyTrailerSize := 15 // (1+1+8) + 5
tests := []struct {
@@ -136,7 +131,7 @@ func TestStripTrailer(t *testing.T) {
var trailer []byte
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], 42)
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:])
trailer = appendEnvelope(trailer, 200)
return trailer
}(),
@@ -150,7 +145,7 @@ func TestStripTrailer(t *testing.T) {
var trailer []byte
var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], 42)
trailer = appendTLV(trailer, tagTimestampUs, tsBuf[:])
trailer = appendTLV(trailer, tagUserTimestampUs, tsBuf[:])
trailer = appendEnvelope(trailer, 3)
return append(video, trailer...)
}(),
@@ -180,3 +175,34 @@ func TestStripTrailer(t *testing.T) {
})
}
}
func TestParseTrailer(t *testing.T) {
payload := makePayloadWithTrailer(20, 1700000000000000, 42)
metadata, strip := ParseTrailer(payload, true)
if strip != 21 {
t.Fatalf("ParseTrailer() strip = %d, want 21", strip)
}
if !metadata.HasUserTimestampUs {
t.Fatal("ParseTrailer() did not parse user timestamp")
}
if metadata.UserTimestampUs != 1700000000000000 {
t.Fatalf("ParseTrailer() user timestamp = %d", metadata.UserTimestampUs)
}
if !metadata.HasFrameID {
t.Fatal("ParseTrailer() did not parse frame id")
}
if metadata.FrameID != 42 {
t.Fatalf("ParseTrailer() frame id = %d", metadata.FrameID)
}
}
func TestParseTrailerAbsent(t *testing.T) {
metadata, strip := ParseTrailer([]byte{0x01, 0x02, 0x03}, true)
if strip != 0 {
t.Fatalf("ParseTrailer() strip = %d, want 0", strip)
}
if metadata.HasUserTimestampUs || metadata.HasFrameID {
t.Fatalf("ParseTrailer() parsed unexpected metadata: %+v", metadata)
}
}
+86
View File
@@ -0,0 +1,86 @@
# local_video MoQ Harness
This harness runs a local LiveKit server with the experimental MoQ/WebTransport
listener enabled, publishes H264 from the Rust SDK `examples/local_video`
publisher, and receives the stream in a browser with the `@moq/net` MoQ client
next to a LiveKit JS SDK WebRTC subscriber for the same room and track.
The current receiver validates transport compatibility and timing:
- WebTransport connect time
- first MoQ object time
- received frame/object count
- total payload bytes
- Annex-B H264 NAL types, including whether an IDR was seen
- WebCodecs H264 decode and canvas render count
- rendered canvas pixel validation to catch blank or flat output
- rendered-frame motion validation for the burned timestamp overlay
- packet-trailer timing metadata over a companion `<track>.timing` MoQ track
- subscriber-style timing overlay with exposure, receive, decode, paint, and
e2e latency calculations
- side-by-side LiveKit JS SDK WebRTC rendering of the same publisher video
- WebRTC connect, first-frame, render, visual, motion, and browser-exposed
video-frame timing metadata
- WebRTC packet-trailer extraction through the LiveKit JS SDK packet trailer
worker, with frame ID and user timestamp lookup via `TimeSyncUpdate`
The MoQ object payload is one Annex-B H264 access unit per MoQ group/frame.
The browser receiver configures WebCodecs from the first SPS, decodes each
access unit, draws decoded frames into a canvas, and validates that nonblank,
moving video is rendered. The WebRTC pane attaches the remote video track with
`livekit-client` and samples the `<video>` element for the same visual and
motion checks. Chromium may expose WebRTC capture and receive timestamps through
`requestVideoFrameCallback`. Packet-trailer frame IDs and user timestamps are
decoded through `Room({ packetTrailer: { worker } })` and
`RemoteVideoTrack.lookupFrameMetadata({ rtpTimestamp })`. The receiver serves a
same-origin `packet-trailer-worker.js` shim because browsers block cross-origin
worker entrypoints; it imports the SDK worker module from esm.sh. The worker URL
can be overridden with the `packetTrailerWorkerUrl` receiver query parameter.
## Run
```bash
./test/moq/local_video/run.sh
```
Then open the printed receiver URL in a Chromium-based browser. Set `OPEN=1` to
open it automatically on macOS. The receiver URL includes both the MoQ token and
a separate WebRTC viewer token, so the page connects to the same room twice and
keeps both videos playing after the initial validation frame count is reached.
The runner defaults to:
- LiveKit HTTP/WebSocket: `127.0.0.1:7880`
- LiveKit RTC TCP/UDP: `127.0.0.1:7881` / `127.0.0.1:7882`
- MoQ WebTransport: `https://127.0.0.1:7883/moq/v1`
- receiver page: `http://127.0.0.1:8899`
- WebRTC compare URL: `ws://127.0.0.1:7880`
- Rust SDK checkout: `../rust-sdks`
- room: `moq-local-video`
- track: `camera`
- publisher: `--test-pattern --attach-timestamp --attach-frame-id --burn-timestamp --disable-dynacast --min-playout-delay 0 --max-playout-delay 1 --codec h264 --encoder software --width 640 --height 480 --fps 60`
`--disable-dynacast` keeps the Rust SDK publisher encoding continuously for
this MoQ proof. This checkout does not currently handle LiveKit
`SubscribedQualityUpdate` on the publisher side, so a pure MoQ subscriber cannot
wake dynacast by itself yet.
Useful overrides:
```bash
RUST_SDKS_DIR=/path/to/rust-sdks ROOM=my-room FRAMES=120 OPEN=1 ./test/moq/local_video/run.sh
WIDTH=1280 HEIGHT=720 FPS=60 PUBLISHER_ARGS="--max-bitrate 2500000" ./test/moq/local_video/run.sh
PUBLISHER_ENCODER=auto ./test/moq/local_video/run.sh
LOW_LATENCY=0 ./test/moq/local_video/run.sh
PUBLISHER_MIN_PLAYOUT_DELAY_MS=0 PUBLISHER_MAX_PLAYOUT_DELAY_MS=50 ./test/moq/local_video/run.sh
PUBLISHER_RUST_LOG=debug ./test/moq/local_video/run.sh
SERVER_BIN=/path/to/livekit-server ./test/moq/local_video/run.sh
```
Low-latency publisher mode is enabled by default and passes
`--min-playout-delay 0 --max-playout-delay 1`. Set `LOW_LATENCY=0` or
`PUBLISHER_LOW_LATENCY=0` to omit those flags, or set
`PUBLISHER_MIN_PLAYOUT_DELAY_MS` / `PUBLISHER_MAX_PLAYOUT_DELAY_MS` to pass
custom publisher playout-delay values.
The receiver uses `@moq/net@0.1.5` from esm.sh and negotiates `moq-lite-04`.
File diff suppressed because it is too large Load Diff
@@ -0,0 +1 @@
import "https://esm.sh/livekit-client@2.19.2/packet-trailer-worker";
+337
View File
@@ -0,0 +1,337 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LIVEKIT_DIR="$(cd "$SCRIPT_DIR/../../.." && pwd)"
WORKSPACE_DIR="$(cd "$LIVEKIT_DIR/.." && pwd)"
RUST_SDKS_DIR="${RUST_SDKS_DIR:-$WORKSPACE_DIR/rust-sdks}"
TMP_DIR="${TMP_DIR:-$(mktemp -d /private/tmp/livekit-moq-local-video.XXXXXX)}"
API_KEY="${API_KEY:-devkey}"
API_SECRET="${API_SECRET:-secret}"
ROOM="${ROOM:-moq-local-video}"
TRACK="${TRACK:-camera}"
PUBLISHER_IDENTITY="${PUBLISHER_IDENTITY:-moq-publisher}"
MOQ_IDENTITY="${MOQ_IDENTITY:-moq-client}"
WEBRTC_IDENTITY="${WEBRTC_IDENTITY:-webrtc-client}"
LIVEKIT_PORT="${LIVEKIT_PORT:-7880}"
LIVEKIT_RTC_TCP_PORT="${LIVEKIT_RTC_TCP_PORT:-7881}"
LIVEKIT_RTC_UDP_PORT="${LIVEKIT_RTC_UDP_PORT:-7882}"
MOQ_PORT="${MOQ_PORT:-7883}"
WEB_PORT="${WEB_PORT:-8899}"
WIDTH="${WIDTH:-640}"
HEIGHT="${HEIGHT:-480}"
FPS="${FPS:-60}"
FRAMES="${FRAMES:-30}"
OPEN="${OPEN:-0}"
PUBLISHER_READY_TIMEOUT="${PUBLISHER_READY_TIMEOUT:-120}"
PUBLISHER_RUST_LOG="${PUBLISHER_RUST_LOG:-info}"
PUBLISHER_ENCODER="${PUBLISHER_ENCODER:-software}"
PUBLISHER_LOW_LATENCY="${PUBLISHER_LOW_LATENCY:-${LOW_LATENCY:-1}}"
PUBLISHER_MIN_PLAYOUT_DELAY_MS="${PUBLISHER_MIN_PLAYOUT_DELAY_MS:-}"
PUBLISHER_MAX_PLAYOUT_DELAY_MS="${PUBLISHER_MAX_PLAYOUT_DELAY_MS:-}"
case "$PUBLISHER_LOW_LATENCY" in
1|true|TRUE|yes|YES|on|ON)
PUBLISHER_MIN_PLAYOUT_DELAY_MS="${PUBLISHER_MIN_PLAYOUT_DELAY_MS:-0}"
PUBLISHER_MAX_PLAYOUT_DELAY_MS="${PUBLISHER_MAX_PLAYOUT_DELAY_MS:-1}"
;;
0|false|FALSE|no|NO|off|OFF)
;;
*)
echo "invalid PUBLISHER_LOW_LATENCY=$PUBLISHER_LOW_LATENCY; expected 1/0, true/false, yes/no, or on/off" >&2
exit 1
;;
esac
SERVER_PID=""
PUBLISHER_PID=""
WEB_PID=""
kill_tree() {
local pid="$1"
local child
if [[ -z "$pid" ]] || ! kill -0 "$pid" >/dev/null 2>&1; then
return
fi
while IFS= read -r child; do
kill_tree "$child"
done < <(pgrep -P "$pid" 2>/dev/null || true)
kill "$pid" >/dev/null 2>&1
wait "$pid" >/dev/null 2>&1
}
cleanup() {
set +e
for pid in "$PUBLISHER_PID" "$WEB_PID" "$SERVER_PID"; do
kill_tree "$pid"
done
}
trap cleanup EXIT INT TERM
require_cmd() {
if ! command -v "$1" >/dev/null 2>&1; then
echo "missing required command: $1" >&2
exit 1
fi
}
wait_for_tcp() {
local host="$1"
local port="$2"
local name="$3"
local deadline=$((SECONDS + 30))
until nc -z "$host" "$port" >/dev/null 2>&1; do
if (( SECONDS >= deadline )); then
echo "timed out waiting for $name on $host:$port" >&2
exit 1
fi
sleep 0.2
done
}
wait_for_log() {
local file="$1"
local pattern="$2"
local name="$3"
local pid="$4"
local timeout="$5"
local deadline=$((SECONDS + timeout))
until grep -q "$pattern" "$file" >/dev/null 2>&1; do
if ! kill -0 "$pid" >/dev/null 2>&1; then
echo "$name exited before becoming ready; see $file" >&2
tail -80 "$file" >&2
exit 1
fi
if (( SECONDS >= deadline )); then
echo "timed out waiting for $name readiness pattern '$pattern'; see $file" >&2
tail -80 "$file" >&2
exit 1
fi
sleep 0.5
done
}
require_cmd cargo
require_cmd go
require_cmd nc
require_cmd openssl
require_cmd python3
require_cmd xxd
if [[ ! -d "$RUST_SDKS_DIR/examples/local_video" ]]; then
echo "RUST_SDKS_DIR does not point at a rust-sdks checkout: $RUST_SDKS_DIR" >&2
exit 1
fi
CERT_FILE="$TMP_DIR/moq-cert.pem"
KEY_FILE="$TMP_DIR/moq-key.pem"
OPENSSL_CONF="$TMP_DIR/openssl.cnf"
CONFIG_FILE="$TMP_DIR/livekit.yaml"
cat > "$OPENSSL_CONF" <<EOF_CONF
[req]
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
CN = localhost
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = localhost
IP.1 = 127.0.0.1
EOF_CONF
openssl req \
-x509 \
-newkey ec \
-pkeyopt ec_paramgen_curve:prime256v1 \
-nodes \
-days 1 \
-keyout "$KEY_FILE" \
-out "$CERT_FILE" \
-config "$OPENSSL_CONF" >/dev/null 2>&1
CERT_HASH="$(openssl x509 -in "$CERT_FILE" -outform DER | openssl dgst -sha256 -binary | xxd -p -c 256)"
cat > "$CONFIG_FILE" <<EOF_CONFIG
development: true
port: $LIVEKIT_PORT
bind_addresses:
- 127.0.0.1
rtc:
use_external_ip: false
tcp_port: $LIVEKIT_RTC_TCP_PORT
udp_port: $LIVEKIT_RTC_UDP_PORT
keys:
"$API_KEY": "$API_SECRET"
moq:
enabled: true
port: $MOQ_PORT
bind_addresses:
- 127.0.0.1
path: /moq/v1
cert_file: "$CERT_FILE"
key_file: "$KEY_FILE"
track_queue_size: 256
cache_max_bytes: 2097152
write_timeout: 2s
EOF_CONFIG
SERVER_BIN="${SERVER_BIN:-}"
if [[ -z "$SERVER_BIN" ]]; then
SERVER_BIN="$TMP_DIR/livekit-server"
echo "building livekit-server..."
(cd "$LIVEKIT_DIR" && go build -o "$SERVER_BIN" ./cmd/server)
fi
TOKEN_OUTPUT="$("$SERVER_BIN" --config "$CONFIG_FILE" create-join-token --room "$ROOM" --identity "$MOQ_IDENTITY")"
TOKEN="$(printf "%s\n" "$TOKEN_OUTPUT" | sed -n "s/^Token: //p" | tail -1)"
if [[ -z "$TOKEN" ]]; then
echo "failed to generate receiver token" >&2
printf "%s\n" "$TOKEN_OUTPUT" >&2
exit 1
fi
WEBRTC_TOKEN_OUTPUT="$("$SERVER_BIN" --config "$CONFIG_FILE" create-join-token --room "$ROOM" --identity "$WEBRTC_IDENTITY")"
WEBRTC_TOKEN="$(printf "%s\n" "$WEBRTC_TOKEN_OUTPUT" | sed -n "s/^Token: //p" | tail -1)"
if [[ -z "$WEBRTC_TOKEN" ]]; then
echo "failed to generate WebRTC viewer token" >&2
printf "%s\n" "$WEBRTC_TOKEN_OUTPUT" >&2
exit 1
fi
echo "starting livekit-server..."
"$SERVER_BIN" --config "$CONFIG_FILE" > "$TMP_DIR/livekit.log" 2>&1 &
SERVER_PID="$!"
wait_for_tcp 127.0.0.1 "$LIVEKIT_PORT" "LiveKit"
publisher_cmd=(
cargo run -p local_video -F desktop --bin publisher --
--test-pattern
--attach-timestamp
--attach-frame-id
--burn-timestamp
--disable-dynacast
--room-name "$ROOM"
--identity "$PUBLISHER_IDENTITY"
--codec h264
--encoder "$PUBLISHER_ENCODER"
--width "$WIDTH"
--height "$HEIGHT"
--fps "$FPS"
--url "ws://127.0.0.1:$LIVEKIT_PORT"
--api-key "$API_KEY"
--api-secret "$API_SECRET"
)
if [[ -n "$PUBLISHER_MIN_PLAYOUT_DELAY_MS" ]]; then
publisher_cmd+=(--min-playout-delay "$PUBLISHER_MIN_PLAYOUT_DELAY_MS")
fi
if [[ -n "$PUBLISHER_MAX_PLAYOUT_DELAY_MS" ]]; then
publisher_cmd+=(--max-playout-delay "$PUBLISHER_MAX_PLAYOUT_DELAY_MS")
fi
if [[ -n "${PUBLISHER_ARGS:-}" ]]; then
extra_args=($PUBLISHER_ARGS)
publisher_cmd+=("${extra_args[@]}")
fi
echo "starting rust-sdks local_video publisher..."
(cd "$RUST_SDKS_DIR" && RUST_LOG="$PUBLISHER_RUST_LOG" "${publisher_cmd[@]}") > "$TMP_DIR/publisher.log" 2>&1 &
PUBLISHER_PID="$!"
wait_for_log "$TMP_DIR/livekit.log" "mediaTrack published" "publisher media track" "$PUBLISHER_PID" "$PUBLISHER_READY_TIMEOUT"
echo "starting receiver web server..."
python3 -m http.server "$WEB_PORT" --bind 127.0.0.1 --directory "$SCRIPT_DIR/receiver" > "$TMP_DIR/web.log" 2>&1 &
WEB_PID="$!"
wait_for_tcp 127.0.0.1 "$WEB_PORT" "receiver web server"
RECEIVER_URL="$(python3 - "$WEB_PORT" "$MOQ_PORT" "$LIVEKIT_PORT" "$TOKEN" "$WEBRTC_TOKEN" "$ROOM" "$TRACK" "$CERT_HASH" "$FRAMES" "$WEBRTC_IDENTITY" <<'PY'
import sys
from urllib.parse import urlencode
web_port, moq_port, livekit_port, token, webrtc_token, room, track, cert_hash, frames, webrtc_identity = sys.argv[1:]
query = urlencode({
"url": f"https://127.0.0.1:{moq_port}/moq/v1",
"token": token,
"liveKitUrl": f"ws://127.0.0.1:{livekit_port}",
"webrtcToken": webrtc_token,
"webrtcIdentity": webrtc_identity,
"room": room,
"track": track,
"certHash": cert_hash,
"frames": frames,
})
print(f"http://127.0.0.1:{web_port}/?{query}")
PY
)"
MEET_URL="$(python3 - "$LIVEKIT_PORT" "$WEBRTC_TOKEN" <<'PY'
import sys
from urllib.parse import urlencode
livekit_port, token = sys.argv[1:]
query = urlencode({
"liveKitUrl": f"ws://127.0.0.1:{livekit_port}",
"token": token,
})
print(f"https://meet.livekit.io/custom/?{query}")
PY
)"
cat <<EOF_STATUS
MoQ local_video harness is running.
Receiver URL:
$RECEIVER_URL
LiveKit Meet URL:
$MEET_URL
WebRTC compare:
URL: ws://127.0.0.1:$LIVEKIT_PORT
Room: $ROOM
Publisher identity: $PUBLISHER_IDENTITY
MoQ identity: $MOQ_IDENTITY
WebRTC identity: $WEBRTC_IDENTITY
Logs:
LiveKit: $TMP_DIR/livekit.log
Publisher: $TMP_DIR/publisher.log
Web: $TMP_DIR/web.log
Press Ctrl-C to stop.
EOF_STATUS
if [[ "$OPEN" == "1" ]]; then
if command -v open >/dev/null 2>&1; then
open "$RECEIVER_URL"
else
echo "OPEN=1 requested, but the macOS 'open' command is unavailable" >&2
fi
fi
while true; do
if ! kill -0 "$SERVER_PID" >/dev/null 2>&1; then
echo "livekit-server exited; see $TMP_DIR/livekit.log" >&2
exit 1
fi
if ! kill -0 "$PUBLISHER_PID" >/dev/null 2>&1; then
echo "publisher exited; see $TMP_DIR/publisher.log" >&2
exit 1
fi
if ! kill -0 "$WEB_PID" >/dev/null 2>&1; then
echo "receiver web server exited; see $TMP_DIR/web.log" >&2
exit 1
fi
sleep 1
done