diff --git a/cmd/api/ws.go b/cmd/api/ws.go index c70072f3..04e7142e 100644 --- a/cmd/api/ws.go +++ b/cmd/api/ws.go @@ -11,8 +11,24 @@ import ( // Message - struct for data exchange in Web API type Message struct { - Type string `json:"type"` - Value interface{} `json:"value,omitempty"` + Type string `json:"type"` + Value any `json:"value,omitempty"` +} + +func (m *Message) String() string { + if s, ok := m.Value.(string); ok { + return s + } + return "" +} + +func (m *Message) GetString(key string) string { + if v, ok := m.Value.(map[string]any); ok { + if s, ok := v[key].(string); ok { + return s + } + } + return "" } type WSHandler func(tr *Transport, msg *Message) error diff --git a/cmd/mp4/ws.go b/cmd/mp4/ws.go index ce4acdb6..b42e8bc1 100644 --- a/cmd/mp4/ws.go +++ b/cmd/mp4/ws.go @@ -21,7 +21,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error { UserAgent: tr.Request.UserAgent(), } - if codecs, ok := msg.Value.(string); ok { + if codecs := msg.String(); codecs != "" { log.Trace().Str("codecs", codecs).Msgf("[mp4] new WS/MSE consumer") cons.Medias = parseMedias(codecs, true) } @@ -69,7 +69,7 @@ func handlerWSMP4(tr *api.Transport, msg *api.Message) error { OnlyKeyframe: true, } - if codecs, ok := msg.Value.(string); ok { + if codecs := msg.String(); codecs != "" { log.Trace().Str("codecs", codecs).Msgf("[mp4] new WS/MP4 consumer") cons.Medias = parseMedias(codecs, false) } diff --git a/cmd/webrtc/candidates.go b/cmd/webrtc/candidates.go index 50f0c52f..6199be2b 100644 --- a/cmd/webrtc/candidates.go +++ b/cmd/webrtc/candidates.go @@ -87,10 +87,10 @@ func candidateHandler(tr *api.Transport, msg *api.Message) error { if tr.Consumer == nil { return nil } - if conn := tr.Consumer.(*webrtc.Conn); conn != nil { - s := msg.Value.(string) - log.Trace().Str("candidate", s).Msg("[webrtc] remote") - conn.AddCandidate(s) + if conn := tr.Consumer.(*webrtc.Server); conn != nil { + candidate := msg.String() + log.Trace().Str("candidate", candidate).Msg("[webrtc] remote") + conn.AddCandidate(candidate) } return nil } diff --git a/cmd/webrtc/webrtc.go b/cmd/webrtc/webrtc.go index c029bc56..bd6d60a8 100644 --- a/cmd/webrtc/webrtc.go +++ b/cmd/webrtc/webrtc.go @@ -34,14 +34,16 @@ func Init() { log = app.GetLogger("webrtc") address := cfg.Mod.Listen + + // create pionAPI with custom codecs list and custom network settings pionAPI, err := webrtc.NewAPI(address) if pionAPI == nil { - log.Error().Err(err).Caller().Msg("webrtc.NewAPI") + log.Error().Err(err).Caller().Send() return } if err != nil { - log.Warn().Err(err).Msg("[webrtc] listen") + log.Warn().Err(err).Caller().Send() } else if address != "" { log.Info().Str("addr", address).Msg("[webrtc] listen") _, Port, _ = net.SplitHostPort(address) @@ -52,12 +54,13 @@ func Init() { SDPSemantics: pion.SDPSemanticsUnifiedPlanWithFallback, } - NewPConn = func() (*pion.PeerConnection, error) { + newPeerConnection = func() (*pion.PeerConnection, error) { return pionAPI.NewPeerConnection(pionConf) } candidates = cfg.Mod.Candidates + api.HandleWS("webrtc", asyncHandler) api.HandleWS("webrtc/offer", asyncHandler) api.HandleWS("webrtc/candidate", candidateHandler) @@ -67,7 +70,7 @@ func Init() { var Port string var log zerolog.Logger -var NewPConn func() (*pion.PeerConnection, error) +var newPeerConnection func() (*pion.PeerConnection, error) func asyncHandler(tr *api.Transport, msg *api.Message) error { src := tr.Request.URL.Query().Get("src") @@ -78,22 +81,23 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { log.Debug().Str("url", src).Msg("[webrtc] new consumer") - var err error - - // create new webrtc instance - conn := new(webrtc.Conn) - conn.Conn, err = NewPConn() + // create new PeerConnection instance + pc, err := newPeerConnection() if err != nil { log.Error().Err(err).Caller().Send() return err } - conn.UserAgent = tr.Request.UserAgent() - conn.Listen(func(msg interface{}) { + // apiV2 - json/object exchange, V2 - raw SDP and raw Candidates exchange + apiV2 := msg.Type == "webrtc" + + cons := webrtc.NewServer(pc) + cons.UserAgent = tr.Request.UserAgent() + cons.Listen(func(msg any) { switch msg := msg.(type) { case pion.PeerConnectionState: if msg == pion.PeerConnectionStateClosed { - stream.RemoveConsumer(conn) + stream.RemoveConsumer(cons) } case *pion.ICECandidate: if msg != nil { @@ -105,25 +109,29 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { }) // 1. SetOffer, so we can get remote client codecs - offer := msg.Value.(string) + var offer string + if apiV2 { + offer = msg.GetString("sdp") + } else { + offer = msg.String() + } + log.Trace().Msgf("[webrtc] offer:\n%s", offer) - if err = conn.SetOffer(offer); err != nil { + if err = cons.SetOffer(offer); err != nil { log.Warn().Err(err).Caller().Send() return err } // 2. AddConsumer, so we get new tracks - if err = stream.AddConsumer(conn); err != nil { + if err = stream.AddConsumer(cons); err != nil { log.Debug().Err(err).Msg("[webrtc] add consumer") - _ = conn.Conn.Close() + _ = cons.Close() return err } - conn.Init() - // 3. Exchange SDP without waiting all candidates - answer, err := conn.GetAnswer() + answer, err := cons.GetAnswer() log.Trace().Msgf("[webrtc] answer\n%s", answer) if err != nil { @@ -131,7 +139,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { return err } - tr.Consumer = conn + tr.Consumer = cons tr.Write(&api.Message{Type: "webrtc/answer", Value: answer}) @@ -140,11 +148,6 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { return nil } -type SDP struct { - Type string `json:"type"` - Sdp string `json:"sdp"` -} - // syncHandler func syncHandler(w http.ResponseWriter, r *http.Request) { url := r.URL.Query().Get("src") @@ -153,21 +156,23 @@ func syncHandler(w http.ResponseWriter, r *http.Request) { return } - var offer string - ct := r.Header.Get("Content-Type") if ct != "" { ct, _, _ = mime.ParseMediaType(ct) } - if ct == "application/json" { - var v SDP - if err := json.NewDecoder(r.Body).Decode(&v); err != nil { + // apiV2 - json/object exchange, V1 - raw SDP exchange + apiV2 := ct == "application/json" + + var offer string + if apiV2 { + var sd pion.SessionDescription + if err := json.NewDecoder(r.Body).Decode(&sd); err != nil { log.Error().Err(err).Caller().Send() http.Error(w, err.Error(), http.StatusBadRequest) return } - offer = v.Sdp + offer = sd.SDP } else { body, err := io.ReadAll(r.Body) if err != nil { @@ -186,10 +191,12 @@ func syncHandler(w http.ResponseWriter, r *http.Request) { } // send SDP to client - if ct == "application/json" { + if apiV2 { w.Header().Set("Content-Type", ct) - v := SDP{Sdp: answer, Type: "answer"} + v := pion.SessionDescription{ + Type: pion.SDPTypeAnswer, SDP: answer, + } if err = json.NewEncoder(w).Encode(v); err != nil { log.Error().Err(err).Caller().Send() http.Error(w, err.Error(), http.StatusInternalServerError) @@ -201,17 +208,15 @@ func syncHandler(w http.ResponseWriter, r *http.Request) { } } -func ExchangeSDP( - stream *streams.Stream, offer string, userAgent string, -) (answer string, err error) { - // create new webrtc instance - conn := new(webrtc.Conn) - conn.Conn, err = NewPConn() +func ExchangeSDP(stream *streams.Stream, offer string, userAgent string) (answer string, err error) { + pc, err := newPeerConnection() if err != nil { log.Error().Err(err).Caller().Msg("NewPConn") return } + // create new webrtc instance + conn := webrtc.NewServer(pc) conn.UserAgent = userAgent conn.Listen(func(msg interface{}) { switch msg := msg.(type) { @@ -233,12 +238,10 @@ func ExchangeSDP( // 2. AddConsumer, so we get new tracks if err = stream.AddConsumer(conn); err != nil { log.Warn().Err(err).Caller().Msg("stream.AddConsumer") - _ = conn.Conn.Close() + _ = conn.Close() return } - conn.Init() - // exchange sdp without waiting all candidates //answer, err := conn.ExchangeSDP(offer, false) answer, err = conn.GetCompleteAnswer() diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index 785ef2f5..f013274d 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -9,13 +9,11 @@ import ( "github.com/pion/webrtc/v3" ) -// Consumer - -func (c *Conn) GetMedias() []*streamer.Media { +func (c *Server) GetMedias() []*streamer.Media { return c.medias } -func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { +func (c *Server) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { switch track.Direction { // send our track to WebRTC consumer case streamer.DirectionSendonly: @@ -43,7 +41,7 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer. return nil } - if _, err = c.Conn.AddTrack(trackLocal); err != nil { + if _, err = c.conn.AddTrack(trackLocal); err != nil { return nil } @@ -80,7 +78,7 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer. // receive track from WebRTC consumer (microphone, backchannel, two way audio) case streamer.DirectionRecvonly: - for _, tr := range c.Conn.GetTransceivers() { + for _, tr := range c.conn.GetTransceivers() { if tr.Mid() != media.MID { continue } @@ -106,7 +104,7 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer. panic("wrong direction") } -func (c *Conn) MarshalJSON() ([]byte, error) { +func (c *Server) MarshalJSON() ([]byte, error) { info := &streamer.Info{ Type: "WebRTC client", RemoteAddr: c.remote(), diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/server.go similarity index 68% rename from pkg/webrtc/conn.go rename to pkg/webrtc/server.go index 313a3fb4..098c1f53 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/server.go @@ -6,12 +6,12 @@ import ( "github.com/pion/webrtc/v3" ) -type Conn struct { +type Server struct { streamer.Element UserAgent string - Conn *webrtc.PeerConnection + conn *webrtc.PeerConnection medias []*streamer.Media tracks []*streamer.Track @@ -20,12 +20,14 @@ type Conn struct { send int } -func (c *Conn) Init() { - c.Conn.OnICECandidate(func(candidate *webrtc.ICECandidate) { +func NewServer(conn *webrtc.PeerConnection) *Server { + c := &Server{conn: conn} + + conn.OnICECandidate(func(candidate *webrtc.ICECandidate) { c.Fire(candidate) }) - c.Conn.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + conn.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { for _, track := range c.tracks { if track.Direction != streamer.DirectionRecvonly { continue @@ -50,7 +52,7 @@ func (c *Conn) Init() { //fmt.Printf("TODO: webrtc ontrack %+v\n", remote) }) - c.Conn.OnDataChannel(func(channel *webrtc.DataChannel) { + conn.OnDataChannel(func(channel *webrtc.DataChannel) { c.Fire(channel) }) @@ -63,7 +65,7 @@ func (c *Conn) Init() { // Fail connection: // 14:53:08 ICE connection state changed: checking // 14:53:39 peer connection state changed: failed - c.Conn.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + conn.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { c.Fire(state) // TODO: remove @@ -74,25 +76,24 @@ func (c *Conn) Init() { c.Fire(streamer.StateNull) // TODO: remove // disconnect event comes earlier, than failed // but it comes only for success connections - _ = c.Conn.Close() - c.Conn = nil + _ = conn.Close() case webrtc.PeerConnectionStateFailed: - if c.Conn != nil { - _ = c.Conn.Close() - } + _ = conn.Close() } }) + + return c } -func (c *Conn) SetOffer(offer string) (err error) { - sdOffer := webrtc.SessionDescription{ +func (c *Server) SetOffer(offer string) (err error) { + desc := webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: offer, } - if err = c.Conn.SetRemoteDescription(sdOffer); err != nil { + if err = c.conn.SetRemoteDescription(desc); err != nil { return } - rawSDP := []byte(c.Conn.RemoteDescription().SDP) + rawSDP := []byte(c.conn.RemoteDescription().SDP) sd := &sdp.SessionDescription{} if err = sd.Unmarshal(rawSDP); err != nil { return @@ -116,8 +117,8 @@ func (c *Conn) SetOffer(offer string) (err error) { return } -func (c *Conn) GetAnswer() (answer string, err error) { - for _, tr := range c.Conn.GetTransceivers() { +func (c *Server) GetAnswer() (answer string, err error) { + for _, tr := range c.conn.GetTransceivers() { if tr.Direction() != webrtc.RTPTransceiverDirectionSendonly { continue } @@ -133,37 +134,42 @@ func (c *Conn) GetAnswer() (answer string, err error) { } var sdAnswer webrtc.SessionDescription - sdAnswer, err = c.Conn.CreateAnswer(nil) + sdAnswer, err = c.conn.CreateAnswer(nil) if err != nil { return } - if err = c.Conn.SetLocalDescription(sdAnswer); err != nil { + if err = c.conn.SetLocalDescription(sdAnswer); err != nil { return } return sdAnswer.SDP, nil } -func (c *Conn) GetCompleteAnswer() (answer string, err error) { +func (c *Server) GetCompleteAnswer() (answer string, err error) { if _, err = c.GetAnswer(); err != nil { return } - <-webrtc.GatheringCompletePromise(c.Conn) - return c.Conn.LocalDescription().SDP, nil + <-webrtc.GatheringCompletePromise(c.conn) + return c.conn.LocalDescription().SDP, nil } -func (c *Conn) AddCandidate(candidate string) { - _ = c.Conn.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}) +func (c *Server) Close() error { + return c.conn.Close() } -func (c *Conn) remote() string { - if c.Conn == nil { +func (c *Server) AddCandidate(candidate string) { + // pion uses only candidate value from json/object candidate struct + _ = c.conn.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}) +} + +func (c *Server) remote() string { + if c.conn == nil { return "" } - for _, trans := range c.Conn.GetTransceivers() { + for _, trans := range c.conn.GetTransceivers() { if trans == nil { continue }