Files
livekit/pkg/service/roommanager_service.go
2025-12-31 15:10:15 +08:00

338 lines
8.8 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"time"
"github.com/pion/webrtc/v4"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
)
const (
whipSessionNotifyInterval = 10 * time.Second
)
type whipService struct {
*RoomManager
ingressRpcCli rpc.IngressHandlerClient
rpc.UnimplementedWHIPServer
}
func newWhipService(rm *RoomManager) (*whipService, error) {
cli, err := rpc.NewIngressHandlerClient(rm.bus, rpc.WithDefaultClientOptions(logger.GetLogger()))
if err != nil {
return nil, err
}
return &whipService{
RoomManager: rm,
ingressRpcCli: cli,
}, nil
}
func (s whipService) Create(ctx context.Context, req *rpc.WHIPCreateRequest) (*rpc.WHIPCreateResponse, error) {
pi, err := routing.ParticipantInitFromStartSession(req.StartSession, s.RoomManager.currentNode.Region())
if err != nil {
logger.Errorw("whip service: could not create participant init", err)
return nil, err
}
prometheus.IncrementParticipantRtcInit(1)
if err = s.RoomManager.StartSession(
ctx,
*pi,
routing.NewNullMessageSource(livekit.ConnectionID(req.StartSession.ConnectionId)), // no requestSource
routing.NewNullMessageSink(livekit.ConnectionID(req.StartSession.ConnectionId)), // no responseSink
true, // useOneShotSignallingMode
); err != nil {
logger.Errorw("whip service: could not start session", err)
return nil, err
}
room := s.RoomManager.GetRoom(ctx, livekit.RoomName(req.StartSession.RoomName))
if room == nil {
logger.Errorw("whip service: could not find room", nil, "room", req.StartSession.RoomName)
return nil, ErrRoomNotFound
}
lp := room.GetParticipant(pi.Identity)
if lp == nil {
room.Logger().Errorw("whip service: could not find local participant", nil, "participant", pi.Identity)
return nil, ErrParticipantNotFound
}
if err := lp.HandleOffer(&livekit.SessionDescription{
Type: webrtc.SDPTypeOffer.String(),
Sdp: req.OfferSdp,
Id: 0,
}); err != nil {
lp.GetLogger().Errorw("whip service: could not handle offer", err)
return nil, err
}
// wait for subscriptions to resolve
// NOTE: this is outside the WHIP spec, but added as a convenience for clients doing
// one-shot signalling (i. e. send an offer and get an answer once) to publish and subscribe to
// well-known tracks (i. e. remote participant identity and track names are well known)
eg, _ := errgroup.WithContext(ctx)
for publisherIdentity, trackList := range req.SubscribedParticipantTracks {
for _, trackName := range trackList.TrackNames {
eg.Go(func() error {
for {
if lp.IsTrackNameSubscribed(livekit.ParticipantIdentity(publisherIdentity), trackName) {
return nil
}
time.Sleep(50 * time.Millisecond)
}
})
}
}
err = eg.Wait()
if err != nil {
lp.GetLogger().Errorw("whip service: could not subscribe to tracks", err)
return nil, err
}
answer, _, err := lp.GetAnswer()
if err != nil {
lp.GetLogger().Errorw("whip service: could not get answer", err)
return nil, err
}
iceSessionID, err := lp.GetPublisherICESessionUfrag()
if err != nil {
lp.GetLogger().Errorw("whip service: could not get ICE session ID", err)
return nil, err
}
if req.FromIngress {
aliveCtx, cancel := context.WithCancel(context.Background())
lp.AddOnClose(types.ParticipantCloseKeyWHIP, func(lp types.LocalParticipant) {
cancel()
go func() {
lp.GetLogger().Debugw("whip service: notify participant closed")
video, audio := getMediaStateForParticipant(lp)
_, err := s.ingressRpcCli.WHIPRTCConnectionNotify(context.Background(), string(lp.ID()), &rpc.WHIPRTCConnectionNotifyRequest{
ParticipantId: string(lp.ID()),
Closed: true,
Audio: audio,
Video: video,
}, psrpc.WithRequestTimeout(rpc.DefaultPSRPCConfig.Timeout))
if err != nil {
lp.GetLogger().Warnw("whip service: could not notify ingress of participant closed", err)
}
}()
})
go func() {
if err := s.notifySession(aliveCtx, lp); err != nil {
cancel()
}
}()
}
var iceServers []*livekit.ICEServer
apiKey, _, err := s.RoomManager.getFirstKeyPair()
if err == nil {
iceServers = s.RoomManager.iceServersForParticipant(
apiKey,
lp,
false,
)
}
return &rpc.WHIPCreateResponse{
AnswerSdp: answer.SDP,
ParticipantId: string(lp.ID()),
IceServers: iceServers,
IceSessionId: iceSessionID,
}, nil
}
func (s whipService) notifySession(ctx context.Context, participant types.Participant) error {
ticker := time.NewTicker(whipSessionNotifyInterval)
defer ticker.Stop()
err := s.sendConnectionNotify(ctx, participant)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
}
for {
select {
case <-ticker.C:
err := s.sendConnectionNotify(ctx, participant)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
}
case <-ctx.Done():
return nil
}
}
}
func (s whipService) sendConnectionNotify(ctx context.Context, participant types.Participant) error {
video, audio := getMediaStateForParticipant(participant)
_, err := s.ingressRpcCli.WHIPRTCConnectionNotify(ctx, string(participant.ID()), &rpc.WHIPRTCConnectionNotifyRequest{
ParticipantId: string(participant.ID()),
Video: video,
Audio: audio,
}, psrpc.WithRequestTimeout(rpc.DefaultPSRPCConfig.Timeout))
return err
}
func getMediaStateForParticipant(participant types.Participant) (*livekit.InputVideoState, *livekit.InputAudioState) {
pParticipant := participant.ToProto()
var video *livekit.InputVideoState
var audio *livekit.InputAudioState
for _, v := range pParticipant.Tracks {
if v == nil {
continue
}
if v.Type != livekit.TrackType_VIDEO {
continue
}
video = &livekit.InputVideoState{}
video.MimeType = v.MimeType
video.Height = v.Height
video.Width = v.Width
break
}
for _, a := range pParticipant.Tracks {
if a == nil {
continue
}
if a.Type != livekit.TrackType_AUDIO {
continue
}
audio = &livekit.InputAudioState{}
audio.MimeType = a.MimeType
audio.Channels = 1
if a.Stereo {
audio.Channels = 2
}
break
}
return video, audio
}
// -------------------------------------------
type whipParticipantService struct {
*RoomManager
}
func (r whipParticipantService) ICETrickle(ctx context.Context, req *rpc.WHIPParticipantICETrickleRequest) (*emptypb.Empty, error) {
room := r.RoomManager.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
lp := room.GetParticipantByID(livekit.ParticipantID(req.ParticipantId))
if lp == nil {
return nil, ErrParticipantNotFound
}
iceSessionID, err := lp.GetPublisherICESessionUfrag()
if err != nil {
lp.GetLogger().Warnw("whipParticipant service ice-trickle: could not get ICE session ufrag", err)
return nil, psrpc.NewError(psrpc.Internal, err)
}
if req.IceSessionId != iceSessionID {
return nil, psrpc.NewError(
psrpc.FailedPrecondition,
fmt.Errorf("ice session mismatch, expected: %s, got: %s", iceSessionID, req.IceSessionId),
)
}
if err := lp.HandleICETrickleSDPFragment(req.SdpFragment); err != nil {
return nil, psrpc.NewError(psrpc.InvalidArgument, err)
}
return &emptypb.Empty{}, nil
}
func (r whipParticipantService) ICERestart(ctx context.Context, req *rpc.WHIPParticipantICERestartRequest) (*rpc.WHIPParticipantICERestartResponse, error) {
room := r.RoomManager.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
lp := room.GetParticipantByID(livekit.ParticipantID(req.ParticipantId))
if lp == nil {
return nil, ErrParticipantNotFound
}
sdpFragment, err := lp.HandleICERestartSDPFragment(req.SdpFragment)
if err != nil {
return nil, psrpc.NewError(psrpc.InvalidArgument, err)
}
iceSessionID, err := lp.GetPublisherICESessionUfrag()
if err != nil {
lp.GetLogger().Warnw("whipParticipant service ice-restart: could not get ICE session ufrag", err)
return nil, psrpc.NewError(psrpc.Internal, err)
}
return &rpc.WHIPParticipantICERestartResponse{
IceSessionId: iceSessionID,
SdpFragment: sdpFragment,
}, nil
}
func (r whipParticipantService) DeleteSession(ctx context.Context, req *rpc.WHIPParticipantDeleteSessionRequest) (*emptypb.Empty, error) {
room := r.RoomManager.GetRoom(ctx, livekit.RoomName(req.Room))
if room == nil {
return nil, ErrRoomNotFound
}
lp := room.GetParticipantByID(livekit.ParticipantID(req.ParticipantId))
if lp != nil {
lp.AddOnClose(types.ParticipantCloseKeyWHIP, nil)
room.RemoveParticipant(
lp.Identity(),
lp.ID(),
types.ParticipantCloseReasonClientRequestLeave,
)
}
return &emptypb.Empty{}, nil
}
// --------------------------------