close signal with reliable message (#1658)

* close signal with reliable message

* update protocol
This commit is contained in:
Paul Wells
2023-04-26 22:59:39 -07:00
committed by GitHub
parent 11eedf4514
commit f9f89cd7cf
5 changed files with 45 additions and 64 deletions

8
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26
github.com/livekit/protocol v1.5.6-0.20230424073901-c54f5f7f4182
github.com/livekit/protocol v1.5.6-0.20230427055046-79477e28a150
github.com/livekit/psrpc v0.3.1-0.20230425025640-5390915734c3
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.14.0
@@ -27,7 +27,7 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/pion/dtls/v2 v2.2.6
github.com/pion/ice/v2 v2.3.2
github.com/pion/interceptor v0.1.12
github.com/pion/interceptor v0.1.13
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.10
github.com/pion/rtp v1.7.13
@@ -35,7 +35,7 @@ require (
github.com/pion/stun v0.4.0
github.com/pion/transport/v2 v2.2.0
github.com/pion/turn/v2 v2.1.0
github.com/pion/webrtc/v3 v3.1.61
github.com/pion/webrtc/v3 v3.1.62
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.15.0
github.com/redis/go-redis/v9 v9.0.3
@@ -92,7 +92,7 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb // indirect
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect

16
go.sum
View File

@@ -121,8 +121,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 h1:QlQFyMwCDgjyySsrgmrMcVbEBA6KZcyTzvK+z346tUA=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26/go.mod h1:eDA41kiySZoG+wy4Etsjb3w0jjLx69i/vAmSjG4bteA=
github.com/livekit/protocol v1.5.6-0.20230424073901-c54f5f7f4182 h1:rpaYN8Jy5F1ZhxH4Q61+6Cc42I/evI63SlSXUMZ+VdU=
github.com/livekit/protocol v1.5.6-0.20230424073901-c54f5f7f4182/go.mod h1:B7Ns8diIKB3y39oRHm7ZluU9ZGCxCWQT+uKcbY3MCG4=
github.com/livekit/protocol v1.5.6-0.20230427055046-79477e28a150 h1:jN0fW8H8Qgi5xsmmbk1s2qsXg1Y873zeWghE4mRV1Cc=
github.com/livekit/protocol v1.5.6-0.20230427055046-79477e28a150/go.mod h1:MBW05GWdhbl+o6u2gLLCQtDvr9EvcV4VWckpIYtoM2c=
github.com/livekit/psrpc v0.3.1-0.20230425025640-5390915734c3 h1:NXcxrluYLng7LTHcYNOj/MdR4SHWrKQAG2G+U930mTA=
github.com/livekit/psrpc v0.3.1-0.20230425025640-5390915734c3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
@@ -183,8 +183,8 @@ github.com/pion/dtls/v2 v2.2.6 h1:yXMxKr0Skd+Ub6A8UqXTRLSywskx93ooMRHsQUtd+Z4=
github.com/pion/dtls/v2 v2.2.6/go.mod h1:t8fWJCIquY5rlQZwA2yWxUS1+OCrAdXrhVKXB5oD/wY=
github.com/pion/ice/v2 v2.3.2 h1:vh+fi4RkZ8H5fB4brZ/jm3j4BqFgMmNs+aB3X52Hu7M=
github.com/pion/ice/v2 v2.3.2/go.mod h1:AMIpuJqcpe+UwloocNebmTSWhCZM1TUCo9v7nW50jX0=
github.com/pion/interceptor v0.1.12 h1:CslaNriCFUItiXS5o+hh5lpL0t0ytQkFnUcbbCs2Zq8=
github.com/pion/interceptor v0.1.12/go.mod h1:bDtgAD9dRkBZpWHGKaoKb42FhDHTG2rX8Ii9LRALLVA=
github.com/pion/interceptor v0.1.13 h1:tfJdEqPxnQrlstjd7SCL7B97WdjPkJtg5EpRMgJ61Ms=
github.com/pion/interceptor v0.1.13/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.7 h1:P0UB4Sr6xDWEox0kTVxF0LmQihtCbSAdW0H2nEgkA3U=
@@ -214,8 +214,8 @@ github.com/pion/turn/v2 v2.1.0 h1:5wGHSgGhJhP/RpabkUb/T9PdsAjkGLS6toYz5HNzoSI=
github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs=
github.com/pion/udp/v2 v2.0.1 h1:xP0z6WNux1zWEjhC7onRA3EwwSliXqu1ElUZAQhUP54=
github.com/pion/udp/v2 v2.0.1/go.mod h1:B7uvTMP00lzWdyMr/1PVZXtV3wpPIxBRd4Wl6AksXn8=
github.com/pion/webrtc/v3 v3.1.61 h1:WG6p786t7jxXO/3miw6HmAQmO3p/n+QLRa2xLaovcr8=
github.com/pion/webrtc/v3 v3.1.61/go.mod h1:uk/4AJmgEUpSExaP7aexCyODwfbHap8hAnQzRV7zKcE=
github.com/pion/webrtc/v3 v3.1.62 h1:B+QYCs+ajtRMJtC3nphzFWWjVoCorugOABu/JD0pJ3c=
github.com/pion/webrtc/v3 v3.1.62/go.mod h1:PaPsj1aigBfWK1jJRZPkWvdiPaAiJwAEMgDKXVO7NjI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -282,8 +282,8 @@ golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb h1:rhjz/8Mbfa8xROFiH+MQphmAmgqRM0bOMnytznhWEXk=
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=

View File

@@ -127,17 +127,11 @@ func (r *signalClient) StartParticipantSignal(
type signalRequestMessageWriter struct{}
func (e signalRequestMessageWriter) WriteOne(seq uint64, msg proto.Message) *rpc.RelaySignalRequest {
return &rpc.RelaySignalRequest{
Seq: seq,
Request: msg.(*livekit.SignalRequest),
}
}
func (e signalRequestMessageWriter) WriteMany(seq uint64, msgs []proto.Message) *rpc.RelaySignalRequest {
func (e signalRequestMessageWriter) Write(seq uint64, close bool, msgs []proto.Message) *rpc.RelaySignalRequest {
r := &rpc.RelaySignalRequest{
Seq: seq,
Requests: make([]*livekit.SignalRequest, 0, len(msgs)),
Close: close,
}
for _, m := range msgs {
r.Requests = append(r.Requests, m.(*livekit.SignalRequest))
@@ -148,10 +142,7 @@ func (e signalRequestMessageWriter) WriteMany(seq uint64, msgs []proto.Message)
type signalResponseMessageReader struct{}
func (e signalResponseMessageReader) Read(rm *rpc.RelaySignalResponse) ([]proto.Message, error) {
msgs := make([]proto.Message, 0, len(rm.Responses)+1)
if rm.Response != nil {
msgs = append(msgs, rm.Response)
}
msgs := make([]proto.Message, 0, len(rm.Responses))
for _, m := range rm.Responses {
msgs = append(msgs, m)
}
@@ -161,11 +152,11 @@ func (e signalResponseMessageReader) Read(rm *rpc.RelaySignalResponse) ([]proto.
type RelaySignalMessage interface {
proto.Message
GetSeq() uint64
GetClose() bool
}
type SignalMessageWriter[SendType RelaySignalMessage] interface {
WriteOne(seq uint64, msg proto.Message) SendType
WriteMany(seq uint64, msgs []proto.Message) SendType
Write(seq uint64, close bool, msgs []proto.Message) SendType
}
type SignalMessageReader[RecvType RelaySignalMessage] interface {
@@ -196,6 +187,10 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](
}
prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1)
}
if msg.GetClose() {
return psrpc.ErrStreamClosed
}
}
return stream.Err()
}
@@ -212,19 +207,18 @@ func (r *signalMessageReader[SendType, RecvType]) Read(msg RecvType) ([]proto.Me
return nil, err
}
if r.config.MinVersion >= 1 {
if r.seq < msg.GetSeq() {
return nil, ErrSignalMessageDropped
}
if r.seq > msg.GetSeq() {
n := int(r.seq - msg.GetSeq())
if n > len(res) {
n = len(res)
}
res = res[n:]
}
r.seq += uint64(len(res))
if r.seq < msg.GetSeq() {
return nil, ErrSignalMessageDropped
}
if r.seq > msg.GetSeq() {
n := int(r.seq - msg.GetSeq())
if n > len(res) {
n = len(res)
}
res = res[n:]
}
r.seq += uint64(len(res))
return res, nil
}
@@ -256,7 +250,8 @@ func (s *signalMessageSink[SendType, RecvType]) Close() {
s.mu.Lock()
s.draining = true
if !s.writing {
s.Stream.Close(nil)
s.writing = true
go s.write()
}
s.mu.Unlock()
@@ -267,16 +262,6 @@ func (s *signalMessageSink[SendType, RecvType]) IsClosed() bool {
return s.Stream.Err() != nil
}
func (s *signalMessageSink[SendType, RecvType]) nextMessage() (msg SendType, n int) {
if len(s.queue) == 0 {
return
}
if s.Config.MinVersion >= 1 {
return s.Writer.WriteMany(s.seq, s.queue), len(s.queue)
}
return s.Writer.WriteOne(s.seq, s.queue[0]), 1
}
func (s *signalMessageSink[SendType, RecvType]) write() {
interval := s.Config.MinRetryInterval
deadline := time.Now().Add(s.Config.RetryTimeout)
@@ -284,10 +269,11 @@ func (s *signalMessageSink[SendType, RecvType]) write() {
s.mu.Lock()
for {
msg, n := s.nextMessage()
if n == 0 || s.IsClosed() {
close := s.draining
if (!close && len(s.queue) == 0) || s.IsClosed() {
break
}
msg, n := s.Writer.Write(s.seq, close, s.queue), len(s.queue)
s.mu.Unlock()
err = s.Stream.Send(msg, psrpc.WithTimeout(interval))
@@ -314,6 +300,10 @@ func (s *signalMessageSink[SendType, RecvType]) write() {
s.seq += uint64(n)
s.queue = s.queue[n:]
if close {
break
}
}
}

View File

@@ -10,7 +10,7 @@ import (
func HandleParticipantSignal(room types.Room, participant types.LocalParticipant, req *livekit.SignalRequest, pLogger logger.Logger) error {
participant.UpdateLastSeenSignal()
switch msg := req.Message.(type) {
switch msg := req.GetMessage().(type) {
case *livekit.SignalRequest_Offer:
participant.HandleOffer(FromProtoSessionDescription(msg.Offer))
case *livekit.SignalRequest_Answer:

View File

@@ -162,17 +162,11 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
type signalResponseMessageWriter struct{}
func (e signalResponseMessageWriter) WriteOne(seq uint64, msg proto.Message) *rpc.RelaySignalResponse {
return &rpc.RelaySignalResponse{
Seq: seq,
Response: msg.(*livekit.SignalResponse),
}
}
func (e signalResponseMessageWriter) WriteMany(seq uint64, msgs []proto.Message) *rpc.RelaySignalResponse {
func (e signalResponseMessageWriter) Write(seq uint64, close bool, msgs []proto.Message) *rpc.RelaySignalResponse {
r := &rpc.RelaySignalResponse{
Seq: seq,
Responses: make([]*livekit.SignalResponse, 0, len(msgs)),
Close: close,
}
for _, m := range msgs {
r.Responses = append(r.Responses, m.(*livekit.SignalResponse))
@@ -183,10 +177,7 @@ func (e signalResponseMessageWriter) WriteMany(seq uint64, msgs []proto.Message)
type signalRequestMessageReader struct{}
func (e signalRequestMessageReader) Read(rm *rpc.RelaySignalRequest) ([]proto.Message, error) {
msgs := make([]proto.Message, 0, len(rm.Requests)+1)
if rm.Request != nil {
msgs = append(msgs, rm.Request)
}
msgs := make([]proto.Message, 0, len(rm.Requests))
for _, m := range rm.Requests {
msgs = append(msgs, m)
}