Move OnDataTrackMessage to participant listener to support replay. (#4178)

This commit is contained in:
Raja Subramanian
2025-12-19 13:08:44 +05:30
committed by GitHub
parent e7601251bc
commit 1df1316b85
4 changed files with 65 additions and 19 deletions

View File

@@ -1910,6 +1910,9 @@ func (l *localParticipantListener) OnDataTrackUnpublished(p types.Participant, t
l.room.onDataTrackUnpublished(p, track)
}
func (l *localParticipantListener) OnDataTrackMessage(_p types.Participant, _data []byte, _packet *datatrack.Packet) {
}
func (l *localParticipantListener) OnMetrics(p types.Participant, dp *livekit.DataPacket) {
l.room.onMetrics(p, dp)
}
@@ -1933,9 +1936,6 @@ func (l *localParticipantListener) OnDataMessageUnlabeled(p types.LocalParticipa
l.room.onDataMessageUnlabeled(p, data)
}
func (l *localParticipantListener) OnDataTrackMessage(_p types.LocalParticipant, _data []byte, _packet *datatrack.Packet) {
}
func (l *localParticipantListener) OnSubscribeStatusChanged(p types.LocalParticipant, publisherID livekit.ParticipantID, subscribed bool) {
l.room.onSubscribeStatusChanged(p, publisherID, subscribed)
}

View File

@@ -557,6 +557,7 @@ type ParticipantListener interface {
OnTrackUnpublished(Participant, MediaTrack)
OnDataTrackPublished(Participant, DataTrack)
OnDataTrackUnpublished(Participant, DataTrack)
OnDataTrackMessage(Participant, []byte, *datatrack.Packet)
OnMetrics(Participant, *livekit.DataPacket)
}
@@ -564,13 +565,14 @@ var _ ParticipantListener = (*NullParticipantListener)(nil)
type NullParticipantListener struct{}
func (*NullParticipantListener) OnParticipantUpdate(Participant) {}
func (*NullParticipantListener) OnTrackPublished(Participant, MediaTrack) {}
func (*NullParticipantListener) OnTrackUpdated(Participant, MediaTrack) {}
func (*NullParticipantListener) OnTrackUnpublished(Participant, MediaTrack) {}
func (*NullParticipantListener) OnDataTrackPublished(Participant, DataTrack) {}
func (*NullParticipantListener) OnDataTrackUnpublished(Participant, DataTrack) {}
func (*NullParticipantListener) OnMetrics(Participant, *livekit.DataPacket) {}
func (*NullParticipantListener) OnParticipantUpdate(Participant) {}
func (*NullParticipantListener) OnTrackPublished(Participant, MediaTrack) {}
func (*NullParticipantListener) OnTrackUpdated(Participant, MediaTrack) {}
func (*NullParticipantListener) OnTrackUnpublished(Participant, MediaTrack) {}
func (*NullParticipantListener) OnDataTrackPublished(Participant, DataTrack) {}
func (*NullParticipantListener) OnDataTrackUnpublished(Participant, DataTrack) {}
func (*NullParticipantListener) OnDataTrackMessage(Participant, []byte, *datatrack.Packet) {}
func (*NullParticipantListener) OnMetrics(Participant, *livekit.DataPacket) {}
// ---------------------------------------------
@@ -583,7 +585,6 @@ type LocalParticipantListener interface {
OnMigrateStateChange(LocalParticipant, MigrateState)
OnDataMessage(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket)
OnDataMessageUnlabeled(LocalParticipant, []byte)
OnDataTrackMessage(LocalParticipant, []byte, *datatrack.Packet)
OnSubscribeStatusChanged(LocalParticipant, livekit.ParticipantID, bool)
OnUpdateSubscriptions(
LocalParticipant,
@@ -610,8 +611,6 @@ func (*NullLocalParticipantListener) OnMigrateStateChange(LocalParticipant, Migr
func (*NullLocalParticipantListener) OnDataMessage(LocalParticipant, livekit.DataPacket_Kind, *livekit.DataPacket) {
}
func (*NullLocalParticipantListener) OnDataMessageUnlabeled(LocalParticipant, []byte) {}
func (*NullLocalParticipantListener) OnDataTrackMessage(LocalParticipant, []byte, *datatrack.Packet) {
}
func (*NullLocalParticipantListener) OnSubscribeStatusChanged(LocalParticipant, livekit.ParticipantID, bool) {
}
func (*NullLocalParticipantListener) OnUpdateSubscriptions(

View File

@@ -23,10 +23,10 @@ type FakeLocalParticipantListener struct {
arg1 types.LocalParticipant
arg2 []byte
}
OnDataTrackMessageStub func(types.LocalParticipant, []byte, *datatrack.Packet)
OnDataTrackMessageStub func(types.Participant, []byte, *datatrack.Packet)
onDataTrackMessageMutex sync.RWMutex
onDataTrackMessageArgsForCall []struct {
arg1 types.LocalParticipant
arg1 types.Participant
arg2 []byte
arg3 *datatrack.Packet
}
@@ -226,7 +226,7 @@ func (fake *FakeLocalParticipantListener) OnDataMessageUnlabeledArgsForCall(i in
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipantListener) OnDataTrackMessage(arg1 types.LocalParticipant, arg2 []byte, arg3 *datatrack.Packet) {
func (fake *FakeLocalParticipantListener) OnDataTrackMessage(arg1 types.Participant, arg2 []byte, arg3 *datatrack.Packet) {
var arg2Copy []byte
if arg2 != nil {
arg2Copy = make([]byte, len(arg2))
@@ -234,7 +234,7 @@ func (fake *FakeLocalParticipantListener) OnDataTrackMessage(arg1 types.LocalPar
}
fake.onDataTrackMessageMutex.Lock()
fake.onDataTrackMessageArgsForCall = append(fake.onDataTrackMessageArgsForCall, struct {
arg1 types.LocalParticipant
arg1 types.Participant
arg2 []byte
arg3 *datatrack.Packet
}{arg1, arg2Copy, arg3})
@@ -252,13 +252,13 @@ func (fake *FakeLocalParticipantListener) OnDataTrackMessageCallCount() int {
return len(fake.onDataTrackMessageArgsForCall)
}
func (fake *FakeLocalParticipantListener) OnDataTrackMessageCalls(stub func(types.LocalParticipant, []byte, *datatrack.Packet)) {
func (fake *FakeLocalParticipantListener) OnDataTrackMessageCalls(stub func(types.Participant, []byte, *datatrack.Packet)) {
fake.onDataTrackMessageMutex.Lock()
defer fake.onDataTrackMessageMutex.Unlock()
fake.OnDataTrackMessageStub = stub
}
func (fake *FakeLocalParticipantListener) OnDataTrackMessageArgsForCall(i int) (types.LocalParticipant, []byte, *datatrack.Packet) {
func (fake *FakeLocalParticipantListener) OnDataTrackMessageArgsForCall(i int) (types.Participant, []byte, *datatrack.Packet) {
fake.onDataTrackMessageMutex.RLock()
defer fake.onDataTrackMessageMutex.RUnlock()
argsForCall := fake.onDataTrackMessageArgsForCall[i]

View File

@@ -4,11 +4,19 @@ package typesfakes
import (
"sync"
"github.com/livekit/livekit-server/pkg/rtc/datatrack"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/protocol/livekit"
)
type FakeParticipantListener struct {
OnDataTrackMessageStub func(types.Participant, []byte, *datatrack.Packet)
onDataTrackMessageMutex sync.RWMutex
onDataTrackMessageArgsForCall []struct {
arg1 types.Participant
arg2 []byte
arg3 *datatrack.Packet
}
OnDataTrackPublishedStub func(types.Participant, types.DataTrack)
onDataTrackPublishedMutex sync.RWMutex
onDataTrackPublishedArgsForCall []struct {
@@ -54,6 +62,45 @@ type FakeParticipantListener struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeParticipantListener) OnDataTrackMessage(arg1 types.Participant, arg2 []byte, arg3 *datatrack.Packet) {
var arg2Copy []byte
if arg2 != nil {
arg2Copy = make([]byte, len(arg2))
copy(arg2Copy, arg2)
}
fake.onDataTrackMessageMutex.Lock()
fake.onDataTrackMessageArgsForCall = append(fake.onDataTrackMessageArgsForCall, struct {
arg1 types.Participant
arg2 []byte
arg3 *datatrack.Packet
}{arg1, arg2Copy, arg3})
stub := fake.OnDataTrackMessageStub
fake.recordInvocation("OnDataTrackMessage", []interface{}{arg1, arg2Copy, arg3})
fake.onDataTrackMessageMutex.Unlock()
if stub != nil {
fake.OnDataTrackMessageStub(arg1, arg2, arg3)
}
}
func (fake *FakeParticipantListener) OnDataTrackMessageCallCount() int {
fake.onDataTrackMessageMutex.RLock()
defer fake.onDataTrackMessageMutex.RUnlock()
return len(fake.onDataTrackMessageArgsForCall)
}
func (fake *FakeParticipantListener) OnDataTrackMessageCalls(stub func(types.Participant, []byte, *datatrack.Packet)) {
fake.onDataTrackMessageMutex.Lock()
defer fake.onDataTrackMessageMutex.Unlock()
fake.OnDataTrackMessageStub = stub
}
func (fake *FakeParticipantListener) OnDataTrackMessageArgsForCall(i int) (types.Participant, []byte, *datatrack.Packet) {
fake.onDataTrackMessageMutex.RLock()
defer fake.onDataTrackMessageMutex.RUnlock()
argsForCall := fake.onDataTrackMessageArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeParticipantListener) OnDataTrackPublished(arg1 types.Participant, arg2 types.DataTrack) {
fake.onDataTrackPublishedMutex.Lock()
fake.onDataTrackPublishedArgsForCall = append(fake.onDataTrackPublishedArgsForCall, struct {