diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index 9a83c6054..56aa25005 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -111,13 +111,13 @@ func (r *signalClient) StartParticipantSignal( r.active.Inc() defer r.active.Dec() - err = CopySignalStreamToMessageChannel[*rpc.RelaySignalRequest, *rpc.RelaySignalResponse]( + err := CopySignalStreamToMessageChannel[*rpc.RelaySignalRequest, *rpc.RelaySignalResponse]( stream, resChan, signalResponseMessageReader{}, r.config, ) - l.Debugw("participant signal stream closed", "error", err) + l.Infow("signal stream closed", "error", err) resChan.Close() }() @@ -189,7 +189,7 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( } if msg.GetClose() { - return psrpc.ErrStreamClosed + return stream.Close(nil) } } return stream.Err() diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 405ae79cb..a8ce00811 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -152,10 +152,11 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe err = r.sessionHandler(ctx, livekit.RoomName(ss.RoomName), *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink) if err != nil { l.Errorw("could not handle new participant", err) + return } err = routing.CopySignalStreamToMessageChannel[*rpc.RelaySignalResponse, *rpc.RelaySignalRequest](stream, reqChan, signalRequestMessageReader{}, r.config) - l.Debugw("participant signal stream closed", "error", err) + l.Infow("signal stream closed", "error", err) return }