This commit is contained in:
cnderrauber
2025-08-19 11:36:45 +08:00
parent c58e5d23c5
commit ec87e81734
3 changed files with 23 additions and 4 deletions
+19 -1
View File
@@ -2260,6 +2260,12 @@ func (p *ParticipantImpl) onReceivedDataMessage(kind livekit.DataPacket_Kind, da
return
}
p.params.Logger.Debugw("received msg", "sequence", dp.Sequence)
if c, ok := dp.Value.(*livekit.DataPacket_StreamChunk); ok {
p.params.Logger.Debugw("received stream chunk", "content", c.StreamChunk.Content)
}
dp.ParticipantSid = string(p.ID())
if kind == livekit.DataPacket_RELIABLE && dp.Sequence > 0 {
if p.reliableDataInfo.stopReliableByMigrateOut.Load() {
@@ -3580,8 +3586,14 @@ func (p *ParticipantImpl) SupportsTransceiverReuse() bool {
}
func (p *ParticipantImpl) SendDataMessage(kind livekit.DataPacket_Kind, data []byte, sender livekit.ParticipantID, seq uint32) error {
p.params.Logger.Debugw("trying send data message",
"kind", kind.String(),
"sender", sender,
"seq", seq,
)
if sender == "" || kind != livekit.DataPacket_RELIABLE || seq == 0 {
if p.State() != livekit.ParticipantInfo_ACTIVE {
p.params.Logger.Warnw("send data message failed, participant not active", nil, "state", p.State().String())
return ErrDataChannelUnavailable
}
return p.TransportManager.SendDataMessage(kind, data)
@@ -3590,6 +3602,7 @@ func (p *ParticipantImpl) SendDataMessage(kind livekit.DataPacket_Kind, data []b
p.reliableDataInfo.joiningMessageLock.Lock()
if !p.reliableDataInfo.canWriteReliable {
if _, ok := p.reliableDataInfo.joiningMessageFirstSeqs[sender]; !ok {
p.params.Logger.Debugw("caching joining message", "sender", sender, "seq", seq)
p.reliableDataInfo.joiningMessageFirstSeqs[sender] = seq
}
p.reliableDataInfo.joiningMessageLock.Unlock()
@@ -3609,6 +3622,8 @@ func (p *ParticipantImpl) SendDataMessage(kind livekit.DataPacket_Kind, data []b
p.reliableDataInfo.joiningMessageLock.Unlock()
p.params.Logger.Debugw("sending joining message", "sender", sender, "seq", seq)
return p.TransportManager.SendDataMessage(kind, data)
}
@@ -3687,7 +3702,10 @@ func (p *ParticipantImpl) replayJoiningReliableMessages() {
p.reliableDataInfo.joiningMessageLastWrittenSeqs[msgCache.SenderID] = msgCache.Seq
}
p.TransportManager.SendDataMessage(livekit.DataPacket_RELIABLE, msgCache.Data)
p.params.Logger.Debugw("replaying joining message", "sender", msgCache.SenderID, "seq", msgCache.Seq, "data", msgCache.Data)
if err := p.TransportManager.SendDataMessage(livekit.DataPacket_RELIABLE, msgCache.Data); err != nil {
p.params.Logger.Errorw("failed to replay joining message", err, "sender", msgCache.SenderID, "seq", msgCache.Seq)
}
}
p.reliableDataInfo.joiningMessageFirstSeqs = make(map[livekit.ParticipantID]uint32)
+3 -2
View File
@@ -17,10 +17,11 @@ package signalling
import (
"fmt"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/rtc/types"
)
@@ -55,7 +56,7 @@ func (s *signalhandler) HandleMessage(msg proto.Message) error {
}
s.params.Participant.UpdateLastSeenSignal()
s.params.Logger.Debugw("handling signal request", "request", logger.Proto(req))
// s.params.Logger.Debugw("handling signal request", "request", logger.Proto(req))
switch msg := req.GetMessage().(type) {
case *livekit.SignalRequest_Offer:
s.params.Participant.HandleOffer(protosignalling.FromProtoSessionDescription(msg.Offer))
+1 -1
View File
@@ -536,7 +536,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case *livekit.SignalRequest_Answer:
pLogger.Debugw("received answer", "answer", m)
default:
pLogger.Debugw("received signal request", "request", m)
// pLogger.Debugw("received signal request", "request", m)
}
if err := cr.RequestSink.WriteMessage(req); err != nil {