Use wrapped join request to be able to support compressed and uncompressed. (#3838)

* Do no gzip join request

* wrapped join request

* deps
This commit is contained in:
Raja Subramanian
2025-08-07 22:37:08 +05:30
committed by GitHub
parent 5ca1626439
commit 5d44cf6d57
4 changed files with 42 additions and 31 deletions

4
go.mod
View File

@@ -23,7 +23,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
github.com/livekit/protocol v1.39.4-0.20250807053007-7f6468a6a059
github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -58,7 +58,7 @@ require (
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792
golang.org/x/mod v0.26.0
golang.org/x/sync v0.16.0
google.golang.org/protobuf v1.36.6
google.golang.org/protobuf v1.36.7
gopkg.in/yaml.v3 v3.0.1
)

8
go.sum
View File

@@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.39.4-0.20250807053007-7f6468a6a059 h1:z9J/0wbTfbJ5QmFhPoYNR+VR3vaOaifQfNYELJHjTZs=
github.com/livekit/protocol v1.39.4-0.20250807053007-7f6468a6a059/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54 h1:Bh4IzI1UUqyxUnAJScR2rT/7V8mJqkzFSoEuMfm/T5k=
github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54/go.mod h1:YlgUxAegtU8jZ0tVXoIV/4fHeHqqLvS+6JnPKDbpFPU=
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 h1:KKL1u94l6dF9u4cBwnnfozk27GH1txWy2SlvkfgmzoY=
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -477,8 +477,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4=
google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A=
google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

View File

@@ -120,8 +120,8 @@ func (s *RTCService) validateInternal(lgr logger.Logger, r *http.Request, strict
var params ValidateConnectRequestParams
joinRequest := &livekit.JoinRequest{}
joinRequestBase64 := r.FormValue("join_request")
if joinRequestBase64 == "" {
wrappedJoinRequestBase64 := r.FormValue("join_request")
if wrappedJoinRequestBase64 == "" {
params.publish = r.FormValue("publish")
attributesStrParam := r.FormValue("attributes")
@@ -137,26 +137,39 @@ func (s *RTCService) validateInternal(lgr logger.Logger, r *http.Request, strict
params.attributes = attrs
}
} else {
if compressedBytes, err := base64.URLEncoding.DecodeString(joinRequestBase64); err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot base64 decode join request")
if wrappedProtoBytes, err := base64.URLEncoding.DecodeString(wrappedJoinRequestBase64); err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot base64 decode wrapped join request")
} else {
b := bytes.NewReader(compressedBytes)
if reader, err := gzip.NewReader(b); err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot decompress join request")
} else {
protoBytes, err := io.ReadAll(reader)
reader.Close()
if err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot read decompressed join request")
}
wrappedJoinRequest := &livekit.WrappedJoinRequest{}
if err := proto.Unmarshal(wrappedProtoBytes, wrappedJoinRequest); err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot unmarshal wrapped join request")
}
if err := proto.Unmarshal(protoBytes, joinRequest); err != nil {
switch wrappedJoinRequest.Compression {
case livekit.WrappedJoinRequest_NONE:
if err := proto.Unmarshal(wrappedJoinRequest.JoinRequest, joinRequest); err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot unmarshal join request")
}
params.metadata = joinRequest.Metadata
params.attributes = joinRequest.ParticipantAttributes
case livekit.WrappedJoinRequest_GZIP:
b := bytes.NewReader(wrappedJoinRequest.JoinRequest)
if reader, err := gzip.NewReader(b); err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot decompress join request")
} else {
protoBytes, err := io.ReadAll(reader)
reader.Close()
if err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot read decompressed join request")
}
if err := proto.Unmarshal(protoBytes, joinRequest); err != nil {
return "", routing.ParticipantInit{}, http.StatusBadRequest, errors.New("cannot unmarshal join request")
}
}
}
params.metadata = joinRequest.Metadata
params.attributes = joinRequest.ParticipantAttributes
}
}
@@ -180,7 +193,7 @@ func (s *RTCService) validateInternal(lgr logger.Logger, r *http.Request, strict
CreateRoom: res.createRoomRequest,
}
if joinRequestBase64 == "" {
if wrappedJoinRequestBase64 == "" {
pi.Reconnect = boolValue(r.FormValue("reconnect"))
pi.Client = ParseClientInfo(r)
pi.AutoSubscribe = true

View File

@@ -15,8 +15,6 @@
package client
import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
@@ -158,12 +156,12 @@ func NewWebSocketConn(host, token string, opts *Options) (*websocket.Conn, error
}
if marshalled, err := proto.Marshal(joinRequest); err == nil {
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
writer.Write(marshalled)
writer.Close()
connectUrl += fmt.Sprintf("?join_request=%s", base64.URLEncoding.EncodeToString(buf.Bytes()))
wrapped := &livekit.WrappedJoinRequest{
JoinRequest: marshalled,
}
if marshalled, err := proto.Marshal(wrapped); err == nil {
connectUrl += fmt.Sprintf("?join_request=%s", base64.URLEncoding.EncodeToString(marshalled))
}
}
} else {
connectUrl += fmt.Sprintf("?protocol=%d", types.CurrentProtocol)