feat: add whip client

This commit is contained in:
langhuihui
2025-09-23 17:40:11 +08:00
parent 29e2142787
commit f3a7503323
5 changed files with 175 additions and 98 deletions

View File

@@ -2,15 +2,11 @@ package webrtc
import (
"errors"
"io"
"net/http"
"strings"
. "github.com/pion/webrtc/v4"
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/util"
)
const (
@@ -43,92 +39,6 @@ func (c *Client) Start() (err error) {
return c.MultipleConnection.Start()
}
// WHIPClient is a client that pushes media to the server
type WHIPClient struct {
Client
pushCtx m7s.PushJob
}
func (c *WHIPClient) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}
// WHEPClient is a client that pulls media from the server
type WHEPClient struct {
Client
pullCtx m7s.PullJob
}
func (c *WHEPClient) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *WHEPClient) Start() (err error) {
err = c.pullCtx.Publish()
if err != nil {
return
}
c.Publisher = c.pullCtx.Publisher
c.pullCtx.GoToStepConst(StepWebRTCInit)
err = c.Client.Start()
if err != nil {
return
}
// u, _ := url.Parse(c.pullCtx.RemoteURL)
// c.ApiBase, _, _ = strings.Cut(c.pullCtx.RemoteURL, "?")
c.Receive()
if c.pullCtx.PublishConfig.PubVideo {
var transeiver *RTPTransceiver
transeiver, err = c.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{
Direction: RTPTransceiverDirectionRecvonly,
})
if err != nil {
return
}
c.Info("webrtc add video transceiver", "transceiver", transeiver.Mid())
}
if c.pullCtx.PublishConfig.PubAudio {
var transeiver *RTPTransceiver
transeiver, err = c.AddTransceiverFromKind(RTPCodecTypeAudio, RTPTransceiverInit{
Direction: RTPTransceiverDirectionRecvonly,
})
if err != nil {
return
}
c.Info("webrtc add audio transceiver", "transceiver", transeiver.Mid())
}
c.pullCtx.GoToStepConst(StepOfferCreate)
var sdpBody SDPBody
sdpBody.SessionDescription, err = c.GetOffer()
if err != nil {
return
}
c.pullCtx.GoToStepConst(StepSessionCreate)
var res *http.Response
res, err = http.DefaultClient.Post(c.pullCtx.RemoteURL, "application/sdp", strings.NewReader(sdpBody.SessionDescription.SDP))
if err != nil {
return
}
c.pullCtx.GoToStepConst(StepNegotiation)
if res.StatusCode != http.StatusCreated && res.StatusCode != http.StatusOK {
err = errors.New(res.Status)
return
}
var sd SessionDescription
sd.Type = SDPTypeAnswer
var body util.Buffer
io.Copy(&body, res.Body)
sd.SDP = string(body)
if err = c.SetRemoteDescription(sd); err != nil {
return
}
c.pullCtx.GoToStepConst(pkg.StepStreaming)
return
}
func NewPuller(conf config.Pull) m7s.IPuller {
if strings.HasPrefix(conf.URL, "https://rtc.live.cloudflare.com") {
return NewCFClient(DIRECTION_PULL)

View File

@@ -0,0 +1,88 @@
package webrtc
import (
"errors"
"io"
"net/http"
"strings"
. "github.com/pion/webrtc/v4"
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/util"
)
// WHEPClient is a client that pulls media from the server
type WHEPClient struct {
Client
pullCtx m7s.PullJob
}
func (c *WHEPClient) GetPullJob() *m7s.PullJob {
return &c.pullCtx
}
func (c *WHEPClient) Start() (err error) {
err = c.pullCtx.Publish()
if err != nil {
return
}
c.Publisher = c.pullCtx.Publisher
c.pullCtx.GoToStepConst(StepWebRTCInit)
err = c.Client.Start()
if err != nil {
return
}
// u, _ := url.Parse(c.pullCtx.RemoteURL)
// c.ApiBase, _, _ = strings.Cut(c.pullCtx.RemoteURL, "?")
if c.pullCtx.PublishConfig.PubVideo {
var transeiver *RTPTransceiver
transeiver, err = c.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{
Direction: RTPTransceiverDirectionRecvonly,
})
if err != nil {
return
}
c.Info("webrtc add video transceiver", "transceiver", transeiver.Mid())
}
if c.pullCtx.PublishConfig.PubAudio {
var transeiver *RTPTransceiver
transeiver, err = c.AddTransceiverFromKind(RTPCodecTypeAudio, RTPTransceiverInit{
Direction: RTPTransceiverDirectionRecvonly,
})
if err != nil {
return
}
c.Info("webrtc add audio transceiver", "transceiver", transeiver.Mid())
}
c.pullCtx.GoToStepConst(StepOfferCreate)
var sdpBody SDPBody
sdpBody.SessionDescription, err = c.GetOffer()
if err != nil {
return
}
c.pullCtx.GoToStepConst(StepSessionCreate)
var res *http.Response
res, err = http.DefaultClient.Post(c.pullCtx.RemoteURL, "application/sdp", strings.NewReader(sdpBody.SessionDescription.SDP))
if err != nil {
return
}
c.pullCtx.GoToStepConst(StepNegotiation)
if res.StatusCode != http.StatusCreated && res.StatusCode != http.StatusOK {
err = errors.New(res.Status)
return
}
var sd SessionDescription
sd.Type = SDPTypeAnswer
var body util.Buffer
io.Copy(&body, res.Body)
sd.SDP = string(body)
if err = c.SetRemoteDescription(sd); err != nil {
return
}
c.pullCtx.GoToStepConst(pkg.StepStreaming)
return
}

View File

@@ -0,0 +1,83 @@
package webrtc
import (
"errors"
"io"
"net/http"
"strings"
. "github.com/pion/webrtc/v4"
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/util"
)
// WHIP push steps definition
var webrtcPushSteps = []pkg.StepDef{
{Name: pkg.StepPublish, Description: "Publishing stream"},
{Name: StepWebRTCInit, Description: "Initializing WebRTC connection"},
{Name: StepOfferCreate, Description: "Creating WebRTC offer"},
{Name: StepSessionCreate, Description: "Creating session with server"},
{Name: StepTrackSetup, Description: "Setting up media tracks"},
{Name: StepNegotiation, Description: "Completing WebRTC negotiation"},
{Name: pkg.StepStreaming, Description: "Pushing media stream"},
}
// WHIPClient is a client that pushes media to the server
type WHIPClient struct {
Client
pushCtx m7s.PushJob
}
func (c *WHIPClient) GetPushJob() *m7s.PushJob {
return &c.pushCtx
}
func (c *WHIPClient) Start() (err error) {
err = c.pushCtx.Subscribe()
if err != nil {
return
}
c.Subscriber = c.pushCtx.Subscriber
c.Info("Initializing WHIP WebRTC connection")
err = c.Client.Start()
if err != nil {
return
}
c.Info("Creating WebRTC offer")
var sdpBody SDPBody
sdpBody.SessionDescription, err = c.GetOffer()
if err != nil {
return
}
// Send offer to WHIP endpoint
c.Info("Sending offer to WHIP endpoint", "url", c.pushCtx.RemoteURL)
c.Debug("sdp", sdpBody.SessionDescription.SDP)
var res *http.Response
res, err = http.DefaultClient.Post(c.pushCtx.RemoteURL, "application/sdp", strings.NewReader(sdpBody.SessionDescription.SDP))
if err != nil {
return
}
var body util.Buffer
io.Copy(&body, res.Body)
if res.StatusCode != http.StatusCreated && res.StatusCode != http.StatusOK {
err = errors.New(res.Status + string(body))
return
}
// Parse answer from server
c.Info("Processing WHIP answer from server")
var sd SessionDescription
sd.Type = SDPTypeAnswer
sd.SDP = string(body)
if err = c.SetRemoteDescription(sd); err != nil {
return
}
c.Info("WHIP negotiation completed, ready to push media")
return
}

View File

@@ -314,10 +314,8 @@ func (s *Server) AddPullProxy(ctx context.Context, req *pb.PullProxyInfo) (res *
return
}
switch u.Scheme {
case "srt", "rtsp", "rtmp":
case "srt", "rtsp", "rtmp", "webrtc":
pullProxyConfig.Type = u.Scheme
case "whep":
pullProxyConfig.Type = "webrtc"
default:
ext := filepath.Ext(u.Path)
switch ext {
@@ -422,10 +420,8 @@ func (s *Server) UpdatePullProxy(ctx context.Context, req *pb.UpdatePullProxyReq
return
}
switch u.Scheme {
case "srt", "rtsp", "rtmp":
case "srt", "rtsp", "rtmp", "webrtc":
target.Type = u.Scheme
case "whep":
target.Type = "webrtc"
default:
ext := filepath.Ext(u.Path)
switch ext {

View File

@@ -196,7 +196,7 @@ func (d *PushProxyConfig) InitializeWithServer(s *Server) {
return
}
switch u.Scheme {
case "srt", "rtsp", "rtmp":
case "srt", "rtsp", "rtmp", "webrtc":
d.Type = u.Scheme
default:
ext := filepath.Ext(u.Path)
@@ -272,7 +272,7 @@ func (s *Server) AddPushProxy(ctx context.Context, req *pb.PushProxyInfo) (res *
return
}
switch u.Scheme {
case "srt", "rtsp", "rtmp":
case "srt", "rtsp", "rtmp", "webrtc":
device.Type = u.Scheme
default:
ext := filepath.Ext(u.Path)