fixed message channel deadlock

This commit is contained in:
David Zhao
2021-02-07 21:58:20 -08:00
parent 33787fd4f2
commit d13a962afd
4 changed files with 29 additions and 10 deletions

View File

@@ -9,4 +9,5 @@ var (
ErrIncorrectRTCNode = errors.New("current node isn't the RTC node for the room")
errInvalidRouterMessage = errors.New("invalid router message")
ErrChannelClosed = errors.New("channel closed")
ErrChannelFull = errors.New("channel is full")
)

View File

@@ -15,7 +15,7 @@ type MessageChannel struct {
func NewMessageChannel() *MessageChannel {
return &MessageChannel{
// allow some buffer to avoid blocked writes
msgChan: make(chan proto.Message, 2),
msgChan: make(chan proto.Message, 10),
}
}
@@ -27,8 +27,15 @@ func (m *MessageChannel) WriteMessage(msg proto.Message) error {
if m.isClosed.Get() {
return ErrChannelClosed
}
m.msgChan <- msg
return nil
select {
case m.msgChan <- msg:
// published
return nil
default:
// channel is full
return ErrChannelFull
}
}
func (m *MessageChannel) ReadChan() <-chan proto.Message {

View File

@@ -59,7 +59,7 @@ func publishRTCMessage(rc *redis.Client, nodeId string, participantKey string, m
return err
}
//logger.Debugw("publishing to", "rtcChannel", rtcNodeChannel(nodeId),
//logger.Debugw("publishing to rtc", "rtcChannel", rtcNodeChannel(nodeId),
// "message", rm.Message)
return rc.Publish(redisCtx, rtcNodeChannel(nodeId), data).Err()
}
@@ -84,6 +84,9 @@ func publishSignalMessage(rc *redis.Client, nodeId string, connectionId string,
if err != nil {
return err
}
//logger.Debugw("publishing to signal", "signalChannel", signalNodeChannel(nodeId),
// "message", rm.Message)
return rc.Publish(redisCtx, signalNodeChannel(nodeId), data).Err()
}

View File

@@ -351,21 +351,27 @@ func (r *RedisRouter) redisWorker() {
func (r *RedisRouter) handleSignalMessage(sm *livekit.SignalNodeMessage) error {
connectionId := sm.ConnectionId
r.lock.Lock()
resSink := r.responseChannels[connectionId]
r.lock.Unlock()
// if a client closed the channel, then sent more messages after that,
if resSink == nil {
return nil
}
switch rmb := sm.Message.(type) {
case *livekit.SignalNodeMessage_Response:
//logger.Debugw("forwarding signal message",
// "connectionId", connectionId,
// "type", fmt.Sprintf("%T", rmb.Response.Message))
// in the event the current node is an Signal node, push to response channels
resSink := r.getOrCreateMessageChannel(r.responseChannels, connectionId)
if err := resSink.WriteMessage(rmb.Response); err != nil {
return err
}
case *livekit.SignalNodeMessage_EndSession:
logger.Debugw("received EndSession, closing signal connection",
"connectionId", connectionId)
resSink := r.getOrCreateMessageChannel(r.responseChannels, connectionId)
//logger.Debugw("received EndSession, closing signal connection",
// "connectionId", connectionId)
resSink.Close()
}
return nil
@@ -382,7 +388,9 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error {
}
case *livekit.RTCNodeMessage_Request:
requestChan := r.getOrCreateMessageChannel(r.requestChannels, pKey)
r.lock.Lock()
requestChan := r.requestChannels[pKey]
r.lock.Unlock()
if err := requestChan.WriteMessage(rmb.Request); err != nil {
return err
}