From f9f89cd7cf366f41979dacbbad36b5c69abf72c1 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Wed, 26 Apr 2023 22:59:39 -0700 Subject: [PATCH] close signal with reliable message (#1658) * close signal with reliable message * update protocol --- go.mod | 8 ++--- go.sum | 16 +++++----- pkg/routing/signal.go | 68 +++++++++++++++++----------------------- pkg/rtc/signalhandler.go | 2 +- pkg/service/signal.go | 15 ++------- 5 files changed, 45 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index 63b40cf5a..ccbf1201a 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 - github.com/livekit/protocol v1.5.6-0.20230424073901-c54f5f7f4182 + github.com/livekit/protocol v1.5.6-0.20230427055046-79477e28a150 github.com/livekit/psrpc v0.3.1-0.20230425025640-5390915734c3 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.14.0 @@ -27,7 +27,7 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/pion/dtls/v2 v2.2.6 github.com/pion/ice/v2 v2.3.2 - github.com/pion/interceptor v0.1.12 + github.com/pion/interceptor v0.1.13 github.com/pion/logging v0.2.2 github.com/pion/rtcp v1.2.10 github.com/pion/rtp v1.7.13 @@ -35,7 +35,7 @@ require ( github.com/pion/stun v0.4.0 github.com/pion/transport/v2 v2.2.0 github.com/pion/turn/v2 v2.1.0 - github.com/pion/webrtc/v3 v3.1.61 + github.com/pion/webrtc/v3 v3.1.62 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.0 github.com/redis/go-redis/v9 v9.0.3 @@ -92,7 +92,7 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.8.0 // indirect - golang.org/x/exp v0.0.0-20230420155640-133eef4313cb // indirect + golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.9.0 // indirect golang.org/x/sys v0.7.0 // indirect diff --git a/go.sum b/go.sum index 144995ed6..f3769ea35 100644 --- a/go.sum +++ b/go.sum @@ -121,8 +121,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 h1:QlQFyMwCDgjyySsrgmrMcVbEBA6KZcyTzvK+z346tUA= github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26/go.mod h1:eDA41kiySZoG+wy4Etsjb3w0jjLx69i/vAmSjG4bteA= -github.com/livekit/protocol v1.5.6-0.20230424073901-c54f5f7f4182 h1:rpaYN8Jy5F1ZhxH4Q61+6Cc42I/evI63SlSXUMZ+VdU= -github.com/livekit/protocol v1.5.6-0.20230424073901-c54f5f7f4182/go.mod h1:B7Ns8diIKB3y39oRHm7ZluU9ZGCxCWQT+uKcbY3MCG4= +github.com/livekit/protocol v1.5.6-0.20230427055046-79477e28a150 h1:jN0fW8H8Qgi5xsmmbk1s2qsXg1Y873zeWghE4mRV1Cc= +github.com/livekit/protocol v1.5.6-0.20230427055046-79477e28a150/go.mod h1:MBW05GWdhbl+o6u2gLLCQtDvr9EvcV4VWckpIYtoM2c= github.com/livekit/psrpc v0.3.1-0.20230425025640-5390915734c3 h1:NXcxrluYLng7LTHcYNOj/MdR4SHWrKQAG2G+U930mTA= github.com/livekit/psrpc v0.3.1-0.20230425025640-5390915734c3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -183,8 +183,8 @@ github.com/pion/dtls/v2 v2.2.6 h1:yXMxKr0Skd+Ub6A8UqXTRLSywskx93ooMRHsQUtd+Z4= github.com/pion/dtls/v2 v2.2.6/go.mod h1:t8fWJCIquY5rlQZwA2yWxUS1+OCrAdXrhVKXB5oD/wY= github.com/pion/ice/v2 v2.3.2 h1:vh+fi4RkZ8H5fB4brZ/jm3j4BqFgMmNs+aB3X52Hu7M= github.com/pion/ice/v2 v2.3.2/go.mod h1:AMIpuJqcpe+UwloocNebmTSWhCZM1TUCo9v7nW50jX0= -github.com/pion/interceptor v0.1.12 h1:CslaNriCFUItiXS5o+hh5lpL0t0ytQkFnUcbbCs2Zq8= -github.com/pion/interceptor v0.1.12/go.mod h1:bDtgAD9dRkBZpWHGKaoKb42FhDHTG2rX8Ii9LRALLVA= +github.com/pion/interceptor v0.1.13 h1:tfJdEqPxnQrlstjd7SCL7B97WdjPkJtg5EpRMgJ61Ms= +github.com/pion/interceptor v0.1.13/go.mod h1:SY8kpmfVBvrbUzvj2bsXz7OJt5JvmVNZ+4Kjq7FcwrI= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.7 h1:P0UB4Sr6xDWEox0kTVxF0LmQihtCbSAdW0H2nEgkA3U= @@ -214,8 +214,8 @@ github.com/pion/turn/v2 v2.1.0 h1:5wGHSgGhJhP/RpabkUb/T9PdsAjkGLS6toYz5HNzoSI= github.com/pion/turn/v2 v2.1.0/go.mod h1:yrT5XbXSGX1VFSF31A3c1kCNB5bBZgk/uu5LET162qs= github.com/pion/udp/v2 v2.0.1 h1:xP0z6WNux1zWEjhC7onRA3EwwSliXqu1ElUZAQhUP54= github.com/pion/udp/v2 v2.0.1/go.mod h1:B7uvTMP00lzWdyMr/1PVZXtV3wpPIxBRd4Wl6AksXn8= -github.com/pion/webrtc/v3 v3.1.61 h1:WG6p786t7jxXO/3miw6HmAQmO3p/n+QLRa2xLaovcr8= -github.com/pion/webrtc/v3 v3.1.61/go.mod h1:uk/4AJmgEUpSExaP7aexCyODwfbHap8hAnQzRV7zKcE= +github.com/pion/webrtc/v3 v3.1.62 h1:B+QYCs+ajtRMJtC3nphzFWWjVoCorugOABu/JD0pJ3c= +github.com/pion/webrtc/v3 v3.1.62/go.mod h1:PaPsj1aigBfWK1jJRZPkWvdiPaAiJwAEMgDKXVO7NjI= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -282,8 +282,8 @@ golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= -golang.org/x/exp v0.0.0-20230420155640-133eef4313cb h1:rhjz/8Mbfa8xROFiH+MQphmAmgqRM0bOMnytznhWEXk= -golang.org/x/exp v0.0.0-20230420155640-133eef4313cb/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o= +golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index dfef77dec..9a83c6054 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -127,17 +127,11 @@ func (r *signalClient) StartParticipantSignal( type signalRequestMessageWriter struct{} -func (e signalRequestMessageWriter) WriteOne(seq uint64, msg proto.Message) *rpc.RelaySignalRequest { - return &rpc.RelaySignalRequest{ - Seq: seq, - Request: msg.(*livekit.SignalRequest), - } -} - -func (e signalRequestMessageWriter) WriteMany(seq uint64, msgs []proto.Message) *rpc.RelaySignalRequest { +func (e signalRequestMessageWriter) Write(seq uint64, close bool, msgs []proto.Message) *rpc.RelaySignalRequest { r := &rpc.RelaySignalRequest{ Seq: seq, Requests: make([]*livekit.SignalRequest, 0, len(msgs)), + Close: close, } for _, m := range msgs { r.Requests = append(r.Requests, m.(*livekit.SignalRequest)) @@ -148,10 +142,7 @@ func (e signalRequestMessageWriter) WriteMany(seq uint64, msgs []proto.Message) type signalResponseMessageReader struct{} func (e signalResponseMessageReader) Read(rm *rpc.RelaySignalResponse) ([]proto.Message, error) { - msgs := make([]proto.Message, 0, len(rm.Responses)+1) - if rm.Response != nil { - msgs = append(msgs, rm.Response) - } + msgs := make([]proto.Message, 0, len(rm.Responses)) for _, m := range rm.Responses { msgs = append(msgs, m) } @@ -161,11 +152,11 @@ func (e signalResponseMessageReader) Read(rm *rpc.RelaySignalResponse) ([]proto. type RelaySignalMessage interface { proto.Message GetSeq() uint64 + GetClose() bool } type SignalMessageWriter[SendType RelaySignalMessage] interface { - WriteOne(seq uint64, msg proto.Message) SendType - WriteMany(seq uint64, msgs []proto.Message) SendType + Write(seq uint64, close bool, msgs []proto.Message) SendType } type SignalMessageReader[RecvType RelaySignalMessage] interface { @@ -196,6 +187,10 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( } prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1) } + + if msg.GetClose() { + return psrpc.ErrStreamClosed + } } return stream.Err() } @@ -212,19 +207,18 @@ func (r *signalMessageReader[SendType, RecvType]) Read(msg RecvType) ([]proto.Me return nil, err } - if r.config.MinVersion >= 1 { - if r.seq < msg.GetSeq() { - return nil, ErrSignalMessageDropped - } - if r.seq > msg.GetSeq() { - n := int(r.seq - msg.GetSeq()) - if n > len(res) { - n = len(res) - } - res = res[n:] - } - r.seq += uint64(len(res)) + if r.seq < msg.GetSeq() { + return nil, ErrSignalMessageDropped } + if r.seq > msg.GetSeq() { + n := int(r.seq - msg.GetSeq()) + if n > len(res) { + n = len(res) + } + res = res[n:] + } + r.seq += uint64(len(res)) + return res, nil } @@ -256,7 +250,8 @@ func (s *signalMessageSink[SendType, RecvType]) Close() { s.mu.Lock() s.draining = true if !s.writing { - s.Stream.Close(nil) + s.writing = true + go s.write() } s.mu.Unlock() @@ -267,16 +262,6 @@ func (s *signalMessageSink[SendType, RecvType]) IsClosed() bool { return s.Stream.Err() != nil } -func (s *signalMessageSink[SendType, RecvType]) nextMessage() (msg SendType, n int) { - if len(s.queue) == 0 { - return - } - if s.Config.MinVersion >= 1 { - return s.Writer.WriteMany(s.seq, s.queue), len(s.queue) - } - return s.Writer.WriteOne(s.seq, s.queue[0]), 1 -} - func (s *signalMessageSink[SendType, RecvType]) write() { interval := s.Config.MinRetryInterval deadline := time.Now().Add(s.Config.RetryTimeout) @@ -284,10 +269,11 @@ func (s *signalMessageSink[SendType, RecvType]) write() { s.mu.Lock() for { - msg, n := s.nextMessage() - if n == 0 || s.IsClosed() { + close := s.draining + if (!close && len(s.queue) == 0) || s.IsClosed() { break } + msg, n := s.Writer.Write(s.seq, close, s.queue), len(s.queue) s.mu.Unlock() err = s.Stream.Send(msg, psrpc.WithTimeout(interval)) @@ -314,6 +300,10 @@ func (s *signalMessageSink[SendType, RecvType]) write() { s.seq += uint64(n) s.queue = s.queue[n:] + + if close { + break + } } } diff --git a/pkg/rtc/signalhandler.go b/pkg/rtc/signalhandler.go index e8749aa64..ee349201f 100644 --- a/pkg/rtc/signalhandler.go +++ b/pkg/rtc/signalhandler.go @@ -10,7 +10,7 @@ import ( func HandleParticipantSignal(room types.Room, participant types.LocalParticipant, req *livekit.SignalRequest, pLogger logger.Logger) error { participant.UpdateLastSeenSignal() - switch msg := req.Message.(type) { + switch msg := req.GetMessage().(type) { case *livekit.SignalRequest_Offer: participant.HandleOffer(FromProtoSessionDescription(msg.Offer)) case *livekit.SignalRequest_Answer: diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 87b29d527..405ae79cb 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -162,17 +162,11 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe type signalResponseMessageWriter struct{} -func (e signalResponseMessageWriter) WriteOne(seq uint64, msg proto.Message) *rpc.RelaySignalResponse { - return &rpc.RelaySignalResponse{ - Seq: seq, - Response: msg.(*livekit.SignalResponse), - } -} - -func (e signalResponseMessageWriter) WriteMany(seq uint64, msgs []proto.Message) *rpc.RelaySignalResponse { +func (e signalResponseMessageWriter) Write(seq uint64, close bool, msgs []proto.Message) *rpc.RelaySignalResponse { r := &rpc.RelaySignalResponse{ Seq: seq, Responses: make([]*livekit.SignalResponse, 0, len(msgs)), + Close: close, } for _, m := range msgs { r.Responses = append(r.Responses, m.(*livekit.SignalResponse)) @@ -183,10 +177,7 @@ func (e signalResponseMessageWriter) WriteMany(seq uint64, msgs []proto.Message) type signalRequestMessageReader struct{} func (e signalRequestMessageReader) Read(rm *rpc.RelaySignalRequest) ([]proto.Message, error) { - msgs := make([]proto.Message, 0, len(rm.Requests)+1) - if rm.Request != nil { - msgs = append(msgs, rm.Request) - } + msgs := make([]proto.Message, 0, len(rm.Requests)) for _, m := range rm.Requests { msgs = append(msgs, m) }